nordabiz/zopk_content_scraper.py
Maciej Pienczyn 172f2085db
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: add local image caching for ZOPK news thumbnails
Source servers return 503 (Cloudflare) for cross-origin image requests
from browsers. Solution: download and cache images server-side during
scraping, serve from /static/uploads/zopk/.

- Scraper now downloads og:image and stores locally during article
  scraping (max 2MB, supports jpg/png/webp)
- Backfill script downloads images for all existing articles server-side
- Template fallback shows domain initial letter when image unavailable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 09:08:03 +01:00

1083 lines
37 KiB
Python

"""
ZOPK Content Scraper - Pobieranie pełnej treści artykułów dla bazy wiedzy.
Scraper respektuje robots.txt i stosuje rate limiting.
Obsługuje główne polskie portale newsowe.
Usage:
from zopk_content_scraper import ZOPKContentScraper
scraper = ZOPKContentScraper(db_session)
result = scraper.scrape_article(news_id=123)
# lub batch:
result = scraper.batch_scrape(limit=50)
"""
import re
import time
import logging
import hashlib
import base64
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Callable, Any
from urllib.parse import urlparse, parse_qs, unquote
from dataclasses import dataclass, field
import requests
from bs4 import BeautifulSoup, Comment, NavigableString
from database import ZOPKNews
# Configure logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURATION
# ============================================================
# User-Agent identifying the bot
USER_AGENT = 'NordaBizBot/1.0 (+https://nordabiznes.pl/bot; kontakt@nordabiznes.pl)'
# Request timeout in seconds
REQUEST_TIMEOUT = 15
# Maximum content length (chars) to avoid memory issues
MAX_CONTENT_LENGTH = 100000 # ~100KB of text
# Rate limiting: seconds between requests per domain
RATE_LIMITS = {
'trojmiasto.pl': 2.0,
'dziennikbaltycki.pl': 2.0,
'nordafm.pl': 1.5,
'ttm24.pl': 1.5,
'radiogdansk.pl': 1.5,
'portalmorski.pl': 1.5,
'biznes.pap.pl': 2.0,
'default': 3.0
}
# Maximum retry attempts
MAX_RETRY_ATTEMPTS = 3
# ============================================================
# CONTENT SELECTORS PER DOMAIN
# ============================================================
# CSS selectors for article content extraction
# Order matters - first match wins
CONTENT_SELECTORS = {
'trojmiasto.pl': [
'article.article-content',
'div.article-body',
'div.article__content',
'div[itemprop="articleBody"]',
],
'nordafm.pl': [
'div.entry-content',
'article.post-content',
'div.post-body',
],
'ttm24.pl': [
'div.post-content',
'article.entry-content',
'div.article-content',
],
'radiogdansk.pl': [
'div.elementor-widget-theme-post-content',
'div.entry-content',
'article.post',
],
'portalmorski.pl': [
'div.article-content',
'div.entry-content',
'article.post-content',
],
'nadmorski24.pl': [
'div#articleMainText',
'div.articleMainText',
'div.staticArticle',
],
'portalkomunalny.pl': [
'div.article-post-content',
'div.article-content',
],
'weekendfm.pl': [
'div.article_content',
'div.article',
],
'globenergia.pl': [
'div.single-content',
'article',
],
'polskieradio24.pl': [
'section.span-9',
'main',
],
'gov.pl': [
'div.article-content',
'main.main-content',
'div.content',
],
'default': [
'div[itemprop="articleBody"]',
'div.article-content',
'div.article-body',
'div.entry-content',
'div.post-content',
'div.single-content',
'article',
'main.content',
'main',
]
}
# Minimum text length for a selector match to be accepted
# Elements with less text are skipped, trying next selector
MIN_SELECTOR_TEXT = 200
# Elements to remove from content
ELEMENTS_TO_REMOVE = [
'script', 'style', 'nav', 'header', 'footer', 'aside',
'form', 'iframe', 'noscript', 'svg', 'canvas',
'.advertisement', '.ad', '.ads', '.advert', '.banner',
'.social-share', '.share-buttons', '.sharing',
'.related-articles', '.related-posts', '.recommendations',
'.comments', '.comment-section', '#comments',
'.newsletter', '.subscription', '.subscribe',
'.cookie-notice', '.cookie-banner', '.gdpr',
'.popup', '.modal', '.overlay',
'.sidebar', '.widget', '.navigation',
'.breadcrumb', '.breadcrumbs',
'.author-bio', '.author-box',
'.tags', '.tag-list', '.categories',
'.pagination', '.pager',
'[data-ad]', '[data-advertisement]',
]
# Domains that are not scrapeable (paywalls, dynamic content, etc.)
SKIP_DOMAINS = [
# Social media
'facebook.com',
'twitter.com',
'x.com',
'linkedin.com',
'youtube.com',
'instagram.com',
# Paywalled news sites (require login, return cookie dialogs)
'wyborcza.pl', # Gazeta Wyborcza paywall
'rp.pl', # Rzeczpospolita paywall
'wnp.pl', # WNP paywall (treść za subskrypcją)
# JS-rendered SPA (no content in HTML)
'tvp.pl', # TVP — cała treść renderowana JS
'tvp.info', # TVP Info — j.w.
# Cloudflare-protected (blokują boty)
'gp24.pl',
'strefaobrony.pl',
'dziennikbaltycki.pl',
# Blocked/no content for bots
'pap.pl', # PAP — blokuje boty (212B response)
'obserwatorfinansowy.pl', # Blokuje boty
'cire.pl', # Brak treści w HTML
# Aggregators (no original content)
'wykop.pl', # Social news aggregator
'reddit.com',
# Google News aggregator (URLs need decoding first)
'news.google.com',
]
# ============================================================
# GOOGLE NEWS URL DECODING
# ============================================================
# Headers for Google News requests
GOOGLE_NEWS_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'pl,en;q=0.5',
'Cookie': 'CONSENT=YES+cb.20210720-07-p0.en+FX+410'
}
def decode_google_news_url(google_url: str, max_depth: int = 3) -> Optional[str]:
"""
Decode Google News URL to original source URL.
Google News uses Protocol Buffer encoding (not simple Base64).
The googlenewsdecoder library handles this correctly.
Args:
google_url: URL to decode
max_depth: Maximum recursion depth (protection against infinite loops)
Returns:
Original URL or None if decoding failed
"""
if max_depth <= 0:
return None
# Method 1: Use googlenewsdecoder library (PREFERRED - handles Protocol Buffer encoding)
# This is the most reliable method for modern Google News URLs
decoded = decode_google_news_url_with_library(google_url)
if decoded:
logger.debug(f"googlenewsdecoder succeeded: {decoded[:80]}...")
return decoded
# Method 2: Try Base64 decode (fallback for older URL formats)
try:
# Find encoded part (supports both /articles/ and /rss/articles/)
match = re.search(r'/(?:rss/)?articles/([A-Za-z0-9_-]+)', google_url)
if match:
encoded = match.group(1)
# Add padding
padding = 4 - len(encoded) % 4
if padding != 4:
encoded += '=' * padding
# Decode
try:
decoded_bytes = base64.urlsafe_b64decode(encoded)
# Find URLs in decoded data
urls = re.findall(rb'https?://[^\x00-\x1f\s"\'<>]+', decoded_bytes)
for url in urls:
try:
url_str = url.decode('utf-8', errors='ignore').rstrip('/')
# Skip Google URLs
if 'google.' not in url_str and len(url_str) > 20:
# Clean URL
url_str = url_str.split('\x00')[0]
url_str = url_str.split('\r')[0]
url_str = url_str.split('\n')[0]
if url_str.startswith('http'):
logger.debug(f"Base64 decode succeeded: {url_str[:80]}...")
return url_str
except:
continue
except:
pass
except Exception:
pass
# Method 3: Follow redirects (last resort - often fails due to consent.google.com)
# Only try this if we haven't exhausted max_depth significantly
if max_depth >= 2:
try:
response = requests.get(
google_url,
headers=GOOGLE_NEWS_HEADERS,
timeout=10,
allow_redirects=True
)
final_url = response.url
response.close()
# If it's not Google, we have the original URL
if 'google.com' not in final_url:
logger.debug(f"Redirect follow succeeded: {final_url[:80]}...")
return final_url
# If we landed on consent.google.com, don't recurse - it doesn't help
# The consent page doesn't redirect to the actual article
except Exception as e:
logger.debug(f"Redirect follow failed: {e}")
logger.warning(f"All Google News URL decoding methods failed for: {google_url[:80]}...")
return None
def is_google_news_url(url: str) -> bool:
"""Check if URL is a Google News URL that needs decoding."""
if not url:
return False
return 'news.google.com' in url.lower()
def decode_google_news_url_with_library(google_url: str) -> Optional[str]:
"""
Decode Google News URL using googlenewsdecoder library.
This is a fallback method when Base64 decoding fails.
The library handles Protocol Buffer encoded URLs.
Args:
google_url: Google News URL to decode
Returns:
Original URL or None if decoding failed
"""
try:
from googlenewsdecoder import gnewsdecoder
result = gnewsdecoder(google_url, interval=0.5)
if result and result.get('status') and result.get('decoded_url'):
return result['decoded_url']
except ImportError:
logger.warning("googlenewsdecoder library not installed")
except Exception as e:
logger.debug(f"googlenewsdecoder failed: {e}")
return None
# ============================================================
# DATA CLASSES
# ============================================================
@dataclass
class ScrapeResult:
"""Result of scraping an article."""
success: bool
content: Optional[str] = None
word_count: int = 0
error: Optional[str] = None
status: str = 'pending' # scraped, failed, skipped
@dataclass
class ProgressUpdate:
"""Progress update for batch operations."""
current: int
total: int
percent: float
stage: str # 'scraping', 'extracting', 'embedding'
status: str # 'processing', 'success', 'failed', 'complete'
message: str
details: Dict[str, Any] = field(default_factory=dict)
article_id: Optional[int] = None
article_title: Optional[str] = None
# Type alias for progress callback
ProgressCallback = Optional[Callable[[ProgressUpdate], None]]
# ============================================================
# SCRAPER CLASS
# ============================================================
class ZOPKContentScraper:
"""
Scraper for ZOPK news article content.
Features:
- Domain-specific content selectors
- Rate limiting per domain
- HTML cleaning (removes ads, navigation, etc.)
- Retry logic with exponential backoff
- robots.txt respect (via User-Agent)
"""
def __init__(self, db_session, user_id: Optional[int] = None):
"""
Initialize scraper.
Args:
db_session: SQLAlchemy database session
user_id: Optional user ID for audit logging
"""
self.db = db_session
self.user_id = user_id
self._last_request_time: Dict[str, float] = {}
self._session = self._create_session()
def _create_session(self) -> requests.Session:
"""Create requests session with proper headers."""
session = requests.Session()
session.headers.update({
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'pl-PL,pl;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
return session
def _get_domain(self, url: str) -> str:
"""Extract domain from URL."""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www. prefix
if domain.startswith('www.'):
domain = domain[4:]
return domain
except Exception:
return 'unknown'
def _get_rate_limit(self, domain: str) -> float:
"""Get rate limit for domain."""
# Check exact domain first
if domain in RATE_LIMITS:
return RATE_LIMITS[domain]
# Check if domain ends with known domain
for known_domain, limit in RATE_LIMITS.items():
if domain.endswith(known_domain):
return limit
return RATE_LIMITS['default']
def _wait_for_rate_limit(self, domain: str) -> None:
"""Wait if needed to respect rate limiting."""
limit = self._get_rate_limit(domain)
last_time = self._last_request_time.get(domain, 0)
elapsed = time.time() - last_time
if elapsed < limit:
wait_time = limit - elapsed
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s for {domain}")
time.sleep(wait_time)
self._last_request_time[domain] = time.time()
def _should_skip_domain(self, domain: str) -> bool:
"""Check if domain should be skipped."""
for skip in SKIP_DOMAINS:
if skip in domain:
return True
return False
def _get_content_selectors(self, domain: str) -> List[str]:
"""Get CSS selectors for domain."""
# Check exact domain
if domain in CONTENT_SELECTORS:
return CONTENT_SELECTORS[domain]
# Check if domain ends with known domain
for known_domain, selectors in CONTENT_SELECTORS.items():
if known_domain != 'default' and domain.endswith(known_domain):
return selectors
return CONTENT_SELECTORS['default']
def _fetch_html(self, url: str) -> Tuple[Optional[str], Optional[str]]:
"""
Fetch HTML content from URL.
Returns:
Tuple of (html_content, error_message)
"""
domain = self._get_domain(url)
# Check if domain should be skipped
if self._should_skip_domain(domain):
return None, f"Domena {domain} nie do scrapowania (social media/paywall)"
# Apply rate limiting
self._wait_for_rate_limit(domain)
try:
response = self._session.get(
url,
timeout=REQUEST_TIMEOUT,
allow_redirects=True
)
response.raise_for_status()
# Check content type
content_type = response.headers.get('Content-Type', '')
if 'text/html' not in content_type and 'application/xhtml' not in content_type:
return None, f"Nie jest HTML: {content_type}"
# Detect encoding
response.encoding = response.apparent_encoding or 'utf-8'
return response.text, None
except requests.exceptions.Timeout:
return None, "Przekroczono czas połączenia"
except requests.exceptions.TooManyRedirects:
return None, "Zbyt wiele przekierowań"
except requests.exceptions.HTTPError as e:
code = e.response.status_code
if code == 403:
return None, "Strona blokuje pobieranie (403)"
elif code == 404:
return None, "Strona nie istnieje (404)"
elif code == 429:
return None, "Za dużo zapytań, spróbuj później (429)"
else:
return None, f"Strona niedostępna ({code})"
except requests.exceptions.ConnectionError:
return None, "Nie udało się połączyć z serwerem"
except requests.exceptions.RequestException as e:
return None, f"Błąd pobierania: {str(e)}"
def _clean_html(self, soup: BeautifulSoup) -> BeautifulSoup:
"""Remove unwanted elements from HTML."""
# Remove comments
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# Remove unwanted elements
for selector in ELEMENTS_TO_REMOVE:
if selector.startswith('.') or selector.startswith('#') or selector.startswith('['):
# CSS selector
for element in soup.select(selector):
element.decompose()
else:
# Tag name
for element in soup.find_all(selector):
element.decompose()
return soup
def _extract_content(self, html: str, domain: str) -> Tuple[Optional[str], Optional[str]]:
"""
Extract article content from HTML.
Returns:
Tuple of (content_text, error_message)
"""
try:
soup = BeautifulSoup(html, 'html.parser')
# Clean HTML first
soup = self._clean_html(soup)
# Try domain-specific selectors
selectors = self._get_content_selectors(domain)
content_element = None
for selector in selectors:
el = soup.select_one(selector)
if el and len(el.get_text(strip=True)) >= MIN_SELECTOR_TEXT:
content_element = el
logger.debug(f"Found content with selector: {selector} ({len(el.get_text(strip=True))} chars)")
break
elif el:
logger.debug(f"Skipping selector {selector}: only {len(el.get_text(strip=True))} chars")
if not content_element:
# Fallback: try to find largest text block
content_element = self._find_largest_text_block(soup)
if not content_element:
return None, "Nie znaleziono treści artykułu"
# Extract text
text = self._extract_text(content_element)
if not text or len(text) < 500:
return None, f"Treść artykułu za krótka ({len(text) if text else 0} znaków, min. 500)"
# Truncate if too long
if len(text) > MAX_CONTENT_LENGTH:
text = text[:MAX_CONTENT_LENGTH] + "..."
logger.warning(f"Content truncated to {MAX_CONTENT_LENGTH} chars")
return text, None
except Exception as e:
logger.error(f"Error extracting content: {e}")
return None, f"Błąd przetwarzania: {str(e)}"
def _find_largest_text_block(self, soup: BeautifulSoup) -> Optional[BeautifulSoup]:
"""Find the largest text block in the page (fallback method)."""
candidates = soup.find_all(['article', 'main', 'div', 'section'])
best_element = None
best_score = 0
for element in candidates:
# Skip small elements
text = element.get_text(strip=True)
if len(text) < 200:
continue
# Calculate score based on text density and paragraph count
paragraphs = len(element.find_all('p'))
text_length = len(text)
# Prefer elements with many paragraphs
score = text_length + (paragraphs * 100)
if score > best_score:
best_score = score
best_element = element
return best_element
def _extract_text(self, element: BeautifulSoup) -> str:
"""Extract clean text from element."""
# Get text with proper spacing
lines = []
for child in element.descendants:
if isinstance(child, NavigableString):
text = str(child).strip()
if text:
lines.append(text)
elif child.name in ['br', 'p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']:
lines.append('\n')
# Join and clean
text = ' '.join(lines)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'\n\s*\n', '\n\n', text)
text = text.strip()
return text
def _extract_og_image(self, html: str) -> Optional[str]:
"""Extract og:image URL from HTML meta tags."""
try:
soup = BeautifulSoup(html, 'html.parser')
# Try og:image first
og = soup.find('meta', property='og:image')
if og and og.get('content'):
url = og['content'].strip()
if url.startswith('http') and len(url) < 1000:
return url
# Try twitter:image as fallback
tw = soup.find('meta', attrs={'name': 'twitter:image'})
if tw and tw.get('content'):
url = tw['content'].strip()
if url.startswith('http') and len(url) < 1000:
return url
except Exception as e:
logger.debug(f"og:image extraction failed: {e}")
return None
def _download_and_cache_image(self, image_url: str, news_id: int) -> Optional[str]:
"""Download image and cache locally. Returns local static path or None."""
import os
cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'uploads', 'zopk')
os.makedirs(cache_dir, exist_ok=True)
try:
resp = self._session.get(image_url, timeout=10, stream=True)
if resp.status_code != 200:
logger.debug(f"Image download failed ({resp.status_code}): {image_url[:80]}")
return None
content_type = resp.headers.get('Content-Type', '')
if 'image' not in content_type and not image_url.lower().endswith(('.jpg', '.jpeg', '.png', '.webp', '.gif')):
return None
# Determine extension
ext = '.jpg'
if '.png' in image_url.lower() or 'png' in content_type:
ext = '.png'
elif '.webp' in image_url.lower() or 'webp' in content_type:
ext = '.webp'
filename = f'{news_id}{ext}'
filepath = os.path.join(cache_dir, filename)
# Download (max 2MB)
max_size = 2 * 1024 * 1024
size = 0
with open(filepath, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
size += len(chunk)
if size > max_size:
f.close()
os.remove(filepath)
logger.debug(f"Image too large (>{max_size}B): {image_url[:80]}")
return None
f.write(chunk)
if size < 500: # Too small, probably an error page
os.remove(filepath)
return None
logger.info(f"Cached image for news {news_id}: {filename} ({size} bytes)")
return f'/static/uploads/zopk/{filename}'
except Exception as e:
logger.debug(f"Image cache failed for news {news_id}: {e}")
return None
def _count_words(self, text: str) -> int:
"""Count words in text."""
if not text:
return 0
words = re.findall(r'\b\w+\b', text)
return len(words)
def scrape_article(self, news_id: int) -> ScrapeResult:
"""
Scrape content for a single article.
Args:
news_id: ID of ZOPKNews record
Returns:
ScrapeResult with content or error
"""
# Get news record
news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first()
if not news:
return ScrapeResult(
success=False,
error=f"News record {news_id} not found",
status='failed'
)
# Check if already scraped
if news.scrape_status == 'scraped' and news.full_content:
return ScrapeResult(
success=True,
content=news.full_content,
word_count=news.content_word_count or 0,
status='scraped'
)
url = news.url
original_google_url = None
# Handle Google News URLs - decode to original source
if is_google_news_url(url):
logger.info(f"Decoding Google News URL for article {news_id}")
original_google_url = url
decoded_url = decode_google_news_url(url)
if decoded_url:
url = decoded_url
logger.info(f"Decoded to: {url}")
# Update news record with original URL and domain
parsed = urlparse(url)
real_domain = parsed.netloc.lower()
if real_domain.startswith('www.'):
real_domain = real_domain[4:]
news.url = url
news.source_domain = real_domain
# Commit the URL update immediately
self.db.commit()
else:
# Could not decode - mark as failed
news.scrape_status = 'failed'
news.scrape_error = 'Could not decode Google News URL'
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
return ScrapeResult(
success=False,
error='Could not decode Google News URL',
status='failed'
)
domain = self._get_domain(url)
logger.info(f"Scraping article {news_id}: {url}")
# Check if should skip
if self._should_skip_domain(domain):
news.scrape_status = 'skipped'
news.scrape_error = f"Domena {domain} — pominięta"
self.db.commit()
return ScrapeResult(
success=False,
error=f"Domena {domain} — pominięta",
status='skipped'
)
# Fetch HTML
html, fetch_error = self._fetch_html(url)
if fetch_error:
news.scrape_status = 'failed'
news.scrape_error = fetch_error
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
return ScrapeResult(
success=False,
error=fetch_error,
status='failed'
)
# Extract content
content, extract_error = self._extract_content(html, domain)
if extract_error:
news.scrape_status = 'failed'
news.scrape_error = extract_error
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
return ScrapeResult(
success=False,
error=extract_error,
status='failed'
)
# Extract og:image and cache locally for reliable display
og_image = self._extract_og_image(html)
image_to_cache = og_image or news.image_url
if image_to_cache and not (news.image_url or '').startswith('/static/'):
local_path = self._download_and_cache_image(image_to_cache, news_id)
if local_path:
news.image_url = local_path
elif og_image:
news.image_url = og_image
# Success - update database
word_count = self._count_words(content)
news.full_content = content
news.content_word_count = word_count
news.content_scraped_at = datetime.now()
news.scrape_status = 'scraped'
news.scrape_error = None
news.scrape_attempts = (news.scrape_attempts or 0) + 1
self.db.commit()
logger.info(f"Successfully scraped article {news_id}: {word_count} words")
return ScrapeResult(
success=True,
content=content,
word_count=word_count,
status='scraped'
)
def batch_scrape(
self,
limit: int = 50,
status_filter: Optional[str] = None,
force: bool = False,
progress_callback: ProgressCallback = None
) -> Dict:
"""
Batch scrape articles.
Args:
limit: Maximum number of articles to scrape
status_filter: Filter by approval status (approved, auto_approved)
force: If True, re-scrape even already scraped articles
progress_callback: Optional callback for progress updates
Returns:
Dict with statistics
"""
logger.info(f"Starting batch scrape: limit={limit}, force={force}")
# Build query
query = self.db.query(ZOPKNews)
# Filter by approval status
if status_filter:
query = query.filter(ZOPKNews.status == status_filter)
else:
# Default: only approved/auto_approved articles
query = query.filter(ZOPKNews.status.in_(['approved', 'auto_approved']))
# Filter by scrape status
if not force:
query = query.filter(ZOPKNews.scrape_status.in_(['pending', 'failed']))
# Limit retry attempts for failed
query = query.filter(
(ZOPKNews.scrape_status == 'pending') |
((ZOPKNews.scrape_status == 'failed') & (ZOPKNews.scrape_attempts < MAX_RETRY_ATTEMPTS))
)
# Order by creation date (newest first)
query = query.order_by(ZOPKNews.created_at.desc())
# Limit
articles = query.limit(limit).all()
total = len(articles)
# Statistics
stats = {
'total': total,
'scraped': 0,
'failed': 0,
'skipped': 0,
'errors': [],
'scraped_articles': [],
'processing_time': 0
}
# Send initial progress
if progress_callback and total > 0:
progress_callback(ProgressUpdate(
current=0,
total=total,
percent=0.0,
stage='scraping',
status='processing',
message=f'Rozpoczynam scraping {total} artykułów...',
details={'scraped': 0, 'failed': 0, 'skipped': 0}
))
start_time = time.time()
for idx, article in enumerate(articles, 1):
# Send progress update before processing
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round((idx - 1) / total * 100, 1),
stage='scraping',
status='processing',
message=f'Pobieram treść: {article.title[:50]}...',
article_id=article.id,
article_title=article.title[:80],
details={
'scraped': stats['scraped'],
'failed': stats['failed'],
'skipped': stats['skipped'],
'source': article.source_name or 'nieznane'
}
))
result = self.scrape_article(article.id)
if result.status == 'scraped':
stats['scraped'] += 1
stats['scraped_articles'].append({
'id': article.id,
'title': article.title[:100],
'word_count': result.word_count,
'source': article.source_name
})
# Send success progress
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='scraping',
status='success',
message=f'✓ Pobrano {result.word_count} słów: {article.title[:40]}...',
article_id=article.id,
article_title=article.title[:80],
details={
'scraped': stats['scraped'],
'failed': stats['failed'],
'skipped': stats['skipped'],
'word_count': result.word_count
}
))
elif result.status == 'skipped':
stats['skipped'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='scraping',
status='skipped',
message=f'⊘ Pominięto: {article.title[:40]}...',
article_id=article.id,
details={'scraped': stats['scraped'], 'failed': stats['failed'], 'skipped': stats['skipped']}
))
else:
stats['failed'] += 1
stats['errors'].append({
'id': article.id,
'url': article.url,
'error': result.error
})
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='scraping',
status='failed',
message=f'✗ Błąd: {result.error[:50]}...' if result.error else '✗ Błąd',
article_id=article.id,
article_title=article.title[:80],
details={
'scraped': stats['scraped'],
'failed': stats['failed'],
'skipped': stats['skipped'],
'error': result.error
}
))
stats['processing_time'] = round(time.time() - start_time, 2)
# Send completion progress
if progress_callback:
progress_callback(ProgressUpdate(
current=total,
total=total,
percent=100.0,
stage='scraping',
status='complete',
message=f'Zakończono: {stats["scraped"]} pobrano, {stats["failed"]} błędów, {stats["skipped"]} pominięto',
details={
'scraped': stats['scraped'],
'failed': stats['failed'],
'skipped': stats['skipped'],
'processing_time': stats['processing_time']
}
))
logger.info(
f"Batch scrape complete: {stats['scraped']} scraped, "
f"{stats['failed']} failed, {stats['skipped']} skipped "
f"in {stats['processing_time']}s"
)
return stats
def get_scrape_statistics(self) -> Dict:
"""Get scraping statistics."""
from sqlalchemy import func
# Count by scrape_status
status_counts = self.db.query(
ZOPKNews.scrape_status,
func.count(ZOPKNews.id)
).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).group_by(ZOPKNews.scrape_status).all()
status_dict = {status: count for status, count in status_counts}
# Total approved articles
total_approved = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).scalar()
# Articles ready for knowledge extraction
ready_for_extraction = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).scalar()
# Average word count
avg_word_count = self.db.query(func.avg(ZOPKNews.content_word_count)).filter(
ZOPKNews.scrape_status == 'scraped'
).scalar()
return {
'total_approved': total_approved or 0,
'scraped': status_dict.get('scraped', 0),
'pending': status_dict.get('pending', 0) + status_dict.get(None, 0),
'failed': status_dict.get('failed', 0),
'skipped': status_dict.get('skipped', 0),
'ready_for_extraction': ready_for_extraction or 0,
'avg_word_count': round(avg_word_count or 0, 0)
}
# ============================================================
# STANDALONE FUNCTIONS FOR CRON/CLI
# ============================================================
def scrape_pending_articles(db_session, limit: int = 50) -> Dict:
"""
Convenience function for cron jobs.
Usage:
from zopk_content_scraper import scrape_pending_articles
result = scrape_pending_articles(db_session, limit=50)
"""
scraper = ZOPKContentScraper(db_session)
return scraper.batch_scrape(limit=limit)
def get_scrape_stats(db_session) -> Dict:
"""
Get scraping statistics for monitoring.
"""
scraper = ZOPKContentScraper(db_session)
return scraper.get_scrape_statistics()