nordabiz/scripts/fetch_news_images.py

358 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Skrypt do pobierania obrazków dla newsów ZOPK.
Strategia:
1. Rozwiń URL Google News do oryginalnego źródła
2. Pobierz og:image z oryginalnego artykułu
3. Jeśli brak og:image, użyj favicon domeny jako fallback
Użycie:
python scripts/fetch_news_images.py --dry-run # Test bez zapisu
python scripts/fetch_news_images.py # Produkcja
python scripts/fetch_news_images.py --limit 10 # Ogranicz do 10 newsów
"""
import os
import sys
import re
import argparse
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
import time
# Dodaj ścieżkę projektu
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, PROJECT_ROOT)
# Załaduj .env
from dotenv import load_dotenv
load_dotenv(os.path.join(PROJECT_ROOT, '.env'))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Konfiguracja bazy danych - hasło z .env
DATABASE_URL = os.getenv('DATABASE_URL')
if not DATABASE_URL:
print("❌ Błąd: Brak zmiennej DATABASE_URL w .env")
sys.exit(1)
# User-Agent do requestów
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'pl,en;q=0.5',
}
# Timeout dla requestów
REQUEST_TIMEOUT = 10
def is_google_news_url(url: str) -> bool:
"""Sprawdź czy URL to Google News (RSS lub web)."""
return 'news.google.com' in url or 'google.com/rss' in url
def decode_google_news_url(google_url: str) -> str | None:
"""
Próba dekodowania URL Google News do oryginalnego źródła.
Google News koduje URL-e w Base64, ale format się zmienia.
Zwraca None jeśli nie udało się zdekodować.
"""
import base64
import re
try:
# Format: https://news.google.com/rss/articles/CBMi...
# Próbujemy wyciągnąć zakodowaną część
match = re.search(r'/articles/([A-Za-z0-9_-]+)', google_url)
if not match:
return None
encoded = match.group(1)
# Dodaj padding jeśli potrzebny
padding = 4 - len(encoded) % 4
if padding != 4:
encoded += '=' * padding
# Dekoduj Base64 (URL-safe)
try:
decoded = base64.urlsafe_b64decode(encoded)
# Szukaj URL-a w zdekodowanych danych
urls = re.findall(rb'https?://[^\s<>"\']+', decoded)
for url in urls:
url_str = url.decode('utf-8', errors='ignore')
# Pomijamy URL-e Google
if 'google.com' not in url_str and len(url_str) > 20:
return url_str
except:
pass
return None
except Exception as e:
print(f" ⚠ Nie można zdekodować URL Google News: {e}")
return None
def extract_og_image(url: str) -> str | None:
"""
Pobierz og:image z podanej strony.
"""
try:
response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Szukaj og:image
og_image = soup.find('meta', property='og:image')
if og_image and og_image.get('content'):
image_url = og_image['content']
# Upewnij się że URL jest absolutny
if not image_url.startswith('http'):
image_url = urljoin(url, image_url)
return image_url
# Fallback: twitter:image
twitter_image = soup.find('meta', attrs={'name': 'twitter:image'})
if twitter_image and twitter_image.get('content'):
image_url = twitter_image['content']
if not image_url.startswith('http'):
image_url = urljoin(url, image_url)
return image_url
return None
except Exception as e:
print(f" ⚠ Nie można pobrać og:image: {e}")
return None
def get_favicon_url(url: str) -> str:
"""
Pobierz URL favicona dla domeny używając Google Favicon API.
"""
try:
parsed = urlparse(url)
domain = parsed.netloc
# Google Favicon API - zwraca wysokiej jakości favicon
return f"https://www.google.com/s2/favicons?domain={domain}&sz=128"
except:
return None
def get_domain_logo(url: str) -> str | None:
"""
Spróbuj pobrać logo domeny z Clearbit lub podobnego serwisu.
"""
try:
parsed = urlparse(url)
domain = parsed.netloc.replace('www.', '')
# Clearbit Logo API
logo_url = f"https://logo.clearbit.com/{domain}"
# Sprawdź czy logo istnieje
response = requests.head(logo_url, timeout=5)
if response.status_code == 200:
return logo_url
except:
pass
return None
def fetch_image_for_news(news_url: str, source_domain: str = None) -> dict:
"""
Pobierz obrazek dla newsa. Zwraca dict z image_url i image_source.
Args:
news_url: URL newsa (może być Google News RSS)
source_domain: Domena źródłowa (np. 'gazeta.pl') - używana dla Google News
"""
result = {
'image_url': None,
'image_source': None,
'resolved_url': news_url
}
# 1. Sprawdź czy to Google News
if is_google_news_url(news_url):
print(f" → URL Google News - próba dekodowania...")
# Spróbuj zdekodować oryginalny URL
decoded_url = decode_google_news_url(news_url)
if decoded_url:
print(f" → Zdekodowano: {decoded_url[:60]}...")
result['resolved_url'] = decoded_url
# Pobierz og:image z oryginalnego artykułu
print(f" → Pobieranie og:image z oryginalnego artykułu...")
og_image = extract_og_image(decoded_url)
if og_image and 'google.com' not in og_image:
result['image_url'] = og_image
result['image_source'] = 'og:image'
print(f" ✓ Znaleziono og:image")
return result
# Spróbuj logo domeny artykułu
domain_logo = get_domain_logo(decoded_url)
if domain_logo:
result['image_url'] = domain_logo
result['image_source'] = 'domain_logo'
print(f" ✓ Znaleziono logo domeny")
return result
# Fallback dla Google News: użyj source_domain z bazy
if source_domain:
print(f" → Używanie source_domain: {source_domain}")
# Logo domeny źródłowej
domain_logo = get_domain_logo(f"https://{source_domain}")
if domain_logo:
result['image_url'] = domain_logo
result['image_source'] = 'domain_logo'
print(f" ✓ Użyto logo źródła: {source_domain}")
return result
# Favicon źródła
favicon = get_favicon_url(f"https://{source_domain}")
if favicon:
result['image_url'] = favicon
result['image_source'] = 'favicon'
print(f" ✓ Użyto favicon źródła: {source_domain}")
return result
print(f" ✗ Nie udało się pobrać obrazka dla Google News")
return result
# 2. Bezpośredni URL (nie Google News) - pobierz og:image
print(f" → Pobieranie og:image...")
og_image = extract_og_image(news_url)
if og_image:
result['image_url'] = og_image
result['image_source'] = 'og:image'
print(f" ✓ Znaleziono og:image")
return result
# 3. Spróbuj logo domeny (Clearbit)
print(f" → Szukanie logo domeny...")
domain_logo = get_domain_logo(news_url)
if domain_logo:
result['image_url'] = domain_logo
result['image_source'] = 'domain_logo'
print(f" ✓ Znaleziono logo domeny")
return result
# 4. Fallback: favicon
print(f" → Używanie favicon jako fallback...")
favicon = get_favicon_url(news_url)
if favicon:
result['image_url'] = favicon
result['image_source'] = 'favicon'
print(f" ✓ Użyto favicon")
return result
print(f" ✗ Nie znaleziono żadnego obrazka")
return result
def main():
parser = argparse.ArgumentParser(description='Pobierz obrazki dla newsów ZOPK')
parser.add_argument('--dry-run', action='store_true', help='Tryb testowy - nie zapisuj do bazy')
parser.add_argument('--limit', type=int, default=None, help='Ogranicz liczbę newsów do przetworzenia')
parser.add_argument('--force', action='store_true', help='Nadpisz istniejące obrazki')
args = parser.parse_args()
print("=" * 60)
print("ZOPK News Image Fetcher")
print("=" * 60)
if args.dry_run:
print("🔍 TRYB TESTOWY - zmiany NIE będą zapisane\n")
# Połączenie z bazą
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Import modelu
from database import ZOPKNews
# Pobierz newsy bez obrazków
query = session.query(ZOPKNews).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
)
if not args.force:
query = query.filter(
(ZOPKNews.image_url.is_(None)) | (ZOPKNews.image_url == '')
)
query = query.order_by(ZOPKNews.published_at.desc())
if args.limit:
query = query.limit(args.limit)
news_items = query.all()
print(f"📰 Znaleziono {len(news_items)} newsów do przetworzenia\n")
stats = {
'processed': 0,
'og_image': 0,
'domain_logo': 0,
'favicon': 0,
'failed': 0
}
for i, news in enumerate(news_items, 1):
print(f"[{i}/{len(news_items)}] {news.title[:60]}...")
print(f" Źródło: {news.source_domain or 'nieznane'}")
result = fetch_image_for_news(news.url, news.source_domain)
if result['image_url']:
stats['processed'] += 1
stats[result['image_source']] = stats.get(result['image_source'], 0) + 1
if not args.dry_run:
news.image_url = result['image_url']
# Zapisz też resolved_url jeśli się zmienił
if result['resolved_url'] != news.url and 'google.com' not in result['resolved_url']:
# Można by zapisać oryginalny URL, ale zostawiamy jak jest
pass
session.commit()
print(f" 💾 Zapisano do bazy\n")
else:
print(f" [DRY-RUN] Obrazek: {result['image_url'][:60]}...\n")
else:
stats['failed'] += 1
print()
# Pauza między requestami żeby nie przeciążyć serwerów
time.sleep(0.5)
print("=" * 60)
print("PODSUMOWANIE")
print("=" * 60)
print(f"Przetworzono: {stats['processed']}")
print(f" - og:image: {stats['og_image']}")
print(f" - logo domeny: {stats['domain_logo']}")
print(f" - favicon: {stats['favicon']}")
print(f"Nieudane: {stats['failed']}")
if args.dry_run:
print("\n⚠️ To był tryb testowy. Uruchom bez --dry-run aby zapisać zmiany.")
except Exception as e:
print(f"❌ Błąd: {e}")
import traceback
traceback.print_exc()
session.rollback()
finally:
session.close()
if __name__ == '__main__':
main()