#!/usr/bin/env python3 """ Skrypt do pobierania obrazków dla newsów ZOPK. Strategia: 1. Rozwiń URL Google News do oryginalnego źródła 2. Pobierz og:image z oryginalnego artykułu 3. Jeśli brak og:image, użyj favicon domeny jako fallback Użycie: python scripts/fetch_news_images.py --dry-run # Test bez zapisu python scripts/fetch_news_images.py # Produkcja python scripts/fetch_news_images.py --limit 10 # Ogranicz do 10 newsów """ import os import sys import re import argparse import requests from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup import time # Dodaj ścieżkę projektu PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, PROJECT_ROOT) # Załaduj .env from dotenv import load_dotenv load_dotenv(os.path.join(PROJECT_ROOT, '.env')) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # Konfiguracja bazy danych - hasło z .env DATABASE_URL = os.getenv('DATABASE_URL') if not DATABASE_URL: print("❌ Błąd: Brak zmiennej DATABASE_URL w .env") sys.exit(1) # User-Agent do requestów HEADERS = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'pl,en;q=0.5', } # Timeout dla requestów REQUEST_TIMEOUT = 10 def is_google_news_url(url: str) -> bool: """Sprawdź czy URL to Google News (RSS lub web).""" return 'news.google.com' in url or 'google.com/rss' in url def decode_google_news_url(google_url: str) -> str | None: """ Próba dekodowania URL Google News do oryginalnego źródła. Google News koduje URL-e w Base64, ale format się zmienia. Zwraca None jeśli nie udało się zdekodować. """ import base64 import re try: # Format: https://news.google.com/rss/articles/CBMi... # Próbujemy wyciągnąć zakodowaną część match = re.search(r'/articles/([A-Za-z0-9_-]+)', google_url) if not match: return None encoded = match.group(1) # Dodaj padding jeśli potrzebny padding = 4 - len(encoded) % 4 if padding != 4: encoded += '=' * padding # Dekoduj Base64 (URL-safe) try: decoded = base64.urlsafe_b64decode(encoded) # Szukaj URL-a w zdekodowanych danych urls = re.findall(rb'https?://[^\s<>"\']+', decoded) for url in urls: url_str = url.decode('utf-8', errors='ignore') # Pomijamy URL-e Google if 'google.com' not in url_str and len(url_str) > 20: return url_str except: pass return None except Exception as e: print(f" ⚠ Nie można zdekodować URL Google News: {e}") return None def extract_og_image(url: str) -> str | None: """ Pobierz og:image z podanej strony. """ try: response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Szukaj og:image og_image = soup.find('meta', property='og:image') if og_image and og_image.get('content'): image_url = og_image['content'] # Upewnij się że URL jest absolutny if not image_url.startswith('http'): image_url = urljoin(url, image_url) return image_url # Fallback: twitter:image twitter_image = soup.find('meta', attrs={'name': 'twitter:image'}) if twitter_image and twitter_image.get('content'): image_url = twitter_image['content'] if not image_url.startswith('http'): image_url = urljoin(url, image_url) return image_url return None except Exception as e: print(f" ⚠ Nie można pobrać og:image: {e}") return None def get_favicon_url(url: str) -> str: """ Pobierz URL favicona dla domeny używając Google Favicon API. """ try: parsed = urlparse(url) domain = parsed.netloc # Google Favicon API - zwraca wysokiej jakości favicon return f"https://www.google.com/s2/favicons?domain={domain}&sz=128" except: return None def get_domain_logo(url: str) -> str | None: """ Spróbuj pobrać logo domeny z Clearbit lub podobnego serwisu. """ try: parsed = urlparse(url) domain = parsed.netloc.replace('www.', '') # Clearbit Logo API logo_url = f"https://logo.clearbit.com/{domain}" # Sprawdź czy logo istnieje response = requests.head(logo_url, timeout=5) if response.status_code == 200: return logo_url except: pass return None def fetch_image_for_news(news_url: str, source_domain: str = None) -> dict: """ Pobierz obrazek dla newsa. Zwraca dict z image_url i image_source. Args: news_url: URL newsa (może być Google News RSS) source_domain: Domena źródłowa (np. 'gazeta.pl') - używana dla Google News """ result = { 'image_url': None, 'image_source': None, 'resolved_url': news_url } # 1. Sprawdź czy to Google News if is_google_news_url(news_url): print(f" → URL Google News - próba dekodowania...") # Spróbuj zdekodować oryginalny URL decoded_url = decode_google_news_url(news_url) if decoded_url: print(f" → Zdekodowano: {decoded_url[:60]}...") result['resolved_url'] = decoded_url # Pobierz og:image z oryginalnego artykułu print(f" → Pobieranie og:image z oryginalnego artykułu...") og_image = extract_og_image(decoded_url) if og_image and 'google.com' not in og_image: result['image_url'] = og_image result['image_source'] = 'og:image' print(f" ✓ Znaleziono og:image") return result # Spróbuj logo domeny artykułu domain_logo = get_domain_logo(decoded_url) if domain_logo: result['image_url'] = domain_logo result['image_source'] = 'domain_logo' print(f" ✓ Znaleziono logo domeny") return result # Fallback dla Google News: użyj source_domain z bazy if source_domain: print(f" → Używanie source_domain: {source_domain}") # Logo domeny źródłowej domain_logo = get_domain_logo(f"https://{source_domain}") if domain_logo: result['image_url'] = domain_logo result['image_source'] = 'domain_logo' print(f" ✓ Użyto logo źródła: {source_domain}") return result # Favicon źródła favicon = get_favicon_url(f"https://{source_domain}") if favicon: result['image_url'] = favicon result['image_source'] = 'favicon' print(f" ✓ Użyto favicon źródła: {source_domain}") return result print(f" ✗ Nie udało się pobrać obrazka dla Google News") return result # 2. Bezpośredni URL (nie Google News) - pobierz og:image print(f" → Pobieranie og:image...") og_image = extract_og_image(news_url) if og_image: result['image_url'] = og_image result['image_source'] = 'og:image' print(f" ✓ Znaleziono og:image") return result # 3. Spróbuj logo domeny (Clearbit) print(f" → Szukanie logo domeny...") domain_logo = get_domain_logo(news_url) if domain_logo: result['image_url'] = domain_logo result['image_source'] = 'domain_logo' print(f" ✓ Znaleziono logo domeny") return result # 4. Fallback: favicon print(f" → Używanie favicon jako fallback...") favicon = get_favicon_url(news_url) if favicon: result['image_url'] = favicon result['image_source'] = 'favicon' print(f" ✓ Użyto favicon") return result print(f" ✗ Nie znaleziono żadnego obrazka") return result def main(): parser = argparse.ArgumentParser(description='Pobierz obrazki dla newsów ZOPK') parser.add_argument('--dry-run', action='store_true', help='Tryb testowy - nie zapisuj do bazy') parser.add_argument('--limit', type=int, default=None, help='Ogranicz liczbę newsów do przetworzenia') parser.add_argument('--force', action='store_true', help='Nadpisz istniejące obrazki') args = parser.parse_args() print("=" * 60) print("ZOPK News Image Fetcher") print("=" * 60) if args.dry_run: print("🔍 TRYB TESTOWY - zmiany NIE będą zapisane\n") # Połączenie z bazą engine = create_engine(DATABASE_URL) Session = sessionmaker(bind=engine) session = Session() try: # Import modelu from database import ZOPKNews # Pobierz newsy bez obrazków query = session.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ) if not args.force: query = query.filter( (ZOPKNews.image_url.is_(None)) | (ZOPKNews.image_url == '') ) query = query.order_by(ZOPKNews.published_at.desc()) if args.limit: query = query.limit(args.limit) news_items = query.all() print(f"📰 Znaleziono {len(news_items)} newsów do przetworzenia\n") stats = { 'processed': 0, 'og_image': 0, 'domain_logo': 0, 'favicon': 0, 'failed': 0 } for i, news in enumerate(news_items, 1): print(f"[{i}/{len(news_items)}] {news.title[:60]}...") print(f" Źródło: {news.source_domain or 'nieznane'}") result = fetch_image_for_news(news.url, news.source_domain) if result['image_url']: stats['processed'] += 1 stats[result['image_source']] = stats.get(result['image_source'], 0) + 1 if not args.dry_run: news.image_url = result['image_url'] # Zapisz też resolved_url jeśli się zmienił if result['resolved_url'] != news.url and 'google.com' not in result['resolved_url']: # Można by zapisać oryginalny URL, ale zostawiamy jak jest pass session.commit() print(f" 💾 Zapisano do bazy\n") else: print(f" [DRY-RUN] Obrazek: {result['image_url'][:60]}...\n") else: stats['failed'] += 1 print() # Pauza między requestami żeby nie przeciążyć serwerów time.sleep(0.5) print("=" * 60) print("PODSUMOWANIE") print("=" * 60) print(f"Przetworzono: {stats['processed']}") print(f" - og:image: {stats['og_image']}") print(f" - logo domeny: {stats['domain_logo']}") print(f" - favicon: {stats['favicon']}") print(f"Nieudane: {stats['failed']}") if args.dry_run: print("\n⚠️ To był tryb testowy. Uruchom bez --dry-run aby zapisać zmiany.") except Exception as e: print(f"❌ Błąd: {e}") import traceback traceback.print_exc() session.rollback() finally: session.close() if __name__ == '__main__': main()