nordabiz/scripts/website_content_updater.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

647 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Website Content Updater - Cykliczna aktualizacja danych ze stron www firm
=========================================================================
Pobiera treść stron www firm członkowskich i ekstrahuje:
- services_extracted: lista usług oferowanych przez firmę
- main_keywords: główne słowa kluczowe opisujące działalność
Używa Gemini 3 Flash (darmowy plan) do inteligentnej ekstrakcji.
Uruchamianie:
python scripts/website_content_updater.py # Wszystkie firmy
python scripts/website_content_updater.py --company-id 26 # Konkretna firma
python scripts/website_content_updater.py --batch 1-10 # Batch firm
python scripts/website_content_updater.py --stale-days 30 # Tylko starsze niż 30 dni
python scripts/website_content_updater.py --dry-run # Podgląd bez zmian
Cron (raz w miesiącu, 1-ego o 3:00):
0 3 1 * * cd /var/www/nordabiznes && /var/www/nordabiznes/venv/bin/python3 scripts/website_content_updater.py --stale-days 30 >> /var/log/nordabiznes/website_updater.log 2>&1
Exit codes:
0 - Sukces
1 - Błąd argumentów
2 - Częściowe błędy (niektóre firmy nie zaktualizowane)
3 - Wszystkie aktualizacje nieudane
4 - Błąd bazy danych
5 - Błąd API Gemini
Author: Maciej Pienczyn, InPi sp. z o.o.
Date: 2026-02-01
"""
import os
import sys
import json
import argparse
import logging
import time
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Tuple, Any
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Load .env from project root
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env'))
from database import Company, CompanyWebsiteAnalysis, CompanyContact, SessionLocal
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Exit codes
EXIT_SUCCESS = 0
EXIT_ARGUMENT_ERROR = 1
EXIT_PARTIAL_FAILURES = 2
EXIT_ALL_FAILED = 3
EXIT_DATABASE_ERROR = 4
EXIT_API_ERROR = 5
# Configuration
REQUEST_TIMEOUT = 30
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 NordaBiznes-Crawler/1.0'
MAX_CONTENT_LENGTH = 50000 # Max chars to send to AI
RATE_LIMIT_DELAY = 2 # Seconds between API calls (respect free tier limits)
class WebsiteContentUpdater:
"""
Aktualizuje dane o usługach i słowach kluczowych ze stron www firm.
"""
def __init__(self, db_session, dry_run: bool = False):
"""
Args:
db_session: SQLAlchemy session
dry_run: Jeśli True, nie zapisuje zmian do bazy
"""
self.db = db_session
self.dry_run = dry_run
self.gemini_service = None
self._init_gemini()
# Statistics
self.stats = {
'processed': 0,
'updated': 0,
'skipped': 0,
'errors': 0,
'no_website': 0,
}
def _init_gemini(self):
"""Initialize Gemini service for AI extraction."""
try:
from gemini_service import GeminiService
# Use Gemini 3 Flash (free tier)
self.gemini_service = GeminiService(
model='3-flash',
thinking_level='low', # Fast extraction, nie potrzebujemy głębokiego reasoning
include_thoughts=False
)
logger.info("Gemini 3 Flash initialized (free tier)")
except Exception as e:
logger.error(f"Failed to initialize Gemini: {e}")
self.gemini_service = None
def fetch_website_content(self, url: str) -> Tuple[Optional[str], Optional[str]]:
"""
Pobiera treść strony www.
Returns:
Tuple (raw_text, error_message)
"""
if not url:
return None, "Brak URL"
# Normalize URL
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
try:
headers = {
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'pl,en;q=0.5',
}
response = requests.get(
url,
headers=headers,
timeout=REQUEST_TIMEOUT,
allow_redirects=True,
verify=True
)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script, style, nav, footer elements
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'noscript']):
element.decompose()
# Extract text
text = soup.get_text(separator=' ', strip=True)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
# Limit length
if len(text) > MAX_CONTENT_LENGTH:
text = text[:MAX_CONTENT_LENGTH]
return text, None
except requests.exceptions.Timeout:
return None, "Timeout"
except requests.exceptions.SSLError:
return None, "Błąd SSL"
except requests.exceptions.ConnectionError:
return None, "Błąd połączenia"
except requests.exceptions.HTTPError as e:
return None, f"HTTP {e.response.status_code}"
except Exception as e:
return None, str(e)[:100]
def extract_with_gemini(self, company_name: str, website_text: str) -> Dict[str, Any]:
"""
Używa Gemini 3 Flash do ekstrakcji usług i słów kluczowych.
Returns:
Dict z kluczami: services, keywords, summary
"""
if not self.gemini_service:
return {'services': [], 'keywords': [], 'summary': None, 'error': 'Gemini not available'}
prompt = f"""Przeanalizuj DOKŁADNIE treść strony internetowej firmy "{company_name}" i wyodrębnij WSZYSTKIE informacje.
TREŚĆ STRONY:
{website_text[:20000]}
ZADANIE:
Zwróć odpowiedź w formacie JSON (tylko JSON, bez markdown):
{{
"services": ["usługa 1", "usługa 2", ...],
"products": ["produkt 1", "produkt 2", ...],
"keywords": ["słowo kluczowe 1", "słowo kluczowe 2", ...],
"brands": ["marka/partner 1", "marka/partner 2", ...],
"specializations": ["specjalizacja 1", "specjalizacja 2", ...],
"target_customers": ["klient docelowy 1", "klient docelowy 2", ...],
"regions": ["region 1", "region 2", ...],
"summary": "Szczegółowe podsumowanie działalności firmy (2-3 zdania)",
"contact_phone": "numer telefonu firmy jeśli widoczny na stronie, w formacie +48XXXXXXXXX lub oryginalnym",
"contact_email": "adres email kontaktowy firmy jeśli widoczny na stronie"
}}
ZASADY - WYODRĘBNIJ WSZYSTKO, BEZ LIMITÓW:
1. services: WSZYSTKIE usługi oferowane przez firmę (bez limitu ilości)
2. products: WSZYSTKIE produkty/rozwiązania (fizyczne lub cyfrowe)
3. keywords: WSZYSTKIE słowa kluczowe opisujące branżę, technologie, specjalizację
4. brands: Partnerzy, certyfikaty, marki z którymi firma współpracuje (np. VMware, Microsoft, Veeam)
5. specializations: Konkretne specjalizacje i kompetencje (np. "backup danych", "monitoring 24/7")
6. target_customers: Typy klientów (np. "MŚP", "korporacje", "sektor publiczny")
7. regions: Obszar działania geograficzny (miasta, regiony)
8. summary: Pełne podsumowanie czym zajmuje się firma
9. contact_phone: Numer telefonu firmy (najlepiej główny/biurowy)
10. contact_email: Adres email firmy (najlepiej ogólny/biurowy, nie osobisty)
WAŻNE:
- Wyodrębnij WSZYSTKIE informacje bez ograniczeń ilościowych
- Używaj języka polskiego
- Bazuj TYLKO na treści strony - nie wymyślaj
- Każda kategoria może mieć 0-50 elementów
- Im więcej szczegółów, tym lepiej
ODPOWIEDŹ (tylko JSON):"""
try:
response = self.gemini_service.generate_text(
prompt=prompt,
temperature=0.3, # Niska temperatura dla precyzyjnej ekstrakcji
feature='website_extraction',
)
if not response:
return {'services': [], 'keywords': [], 'summary': None, 'error': 'Empty response'}
# Parse JSON from response
# Handle potential markdown code blocks
json_text = response.strip()
if json_text.startswith('```'):
json_text = re.sub(r'^```(?:json)?\n?', '', json_text)
json_text = re.sub(r'\n?```$', '', json_text)
data = json.loads(json_text)
# Combine all extracted data into comprehensive lists (no limits)
all_services = data.get('services', [])
all_products = data.get('products', [])
all_keywords = data.get('keywords', [])
all_brands = data.get('brands', [])
all_specializations = data.get('specializations', [])
all_target_customers = data.get('target_customers', [])
all_regions = data.get('regions', [])
# Merge services + products + specializations into services_extracted
merged_services = list(dict.fromkeys(all_services + all_products + all_specializations))
# Merge keywords + brands + target_customers + regions into main_keywords
merged_keywords = list(dict.fromkeys(all_keywords + all_brands + all_target_customers + all_regions))
contact_phone = data.get('contact_phone', '')
contact_email = data.get('contact_email', '')
return {
'services': merged_services, # No limit
'keywords': merged_keywords, # No limit
'summary': data.get('summary', '')[:1000] if data.get('summary') else None,
'contact_phone': contact_phone,
'contact_email': contact_email,
'raw_data': {
'services': all_services,
'products': all_products,
'keywords': all_keywords,
'brands': all_brands,
'specializations': all_specializations,
'target_customers': all_target_customers,
'regions': all_regions,
},
'error': None
}
except json.JSONDecodeError as e:
logger.warning(f"JSON parse error: {e}")
return {'services': [], 'keywords': [], 'summary': None, 'error': f'JSON parse error: {str(e)[:50]}'}
except Exception as e:
logger.error(f"Gemini extraction error: {e}")
return {'services': [], 'keywords': [], 'summary': None, 'error': str(e)[:100]}
def extract_contacts_regex(self, html_text: str) -> Dict[str, List[str]]:
"""Extract phone numbers and emails from raw website text using regex."""
contacts = {'phones': [], 'emails': []}
# Email extraction
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
emails = re.findall(email_pattern, html_text)
# Filter out common non-contact emails
skip_domains = {'example.com', 'sentry.io', 'wixpress.com', 'wordpress.org', 'w3.org', 'schema.org', 'googleapis.com'}
contacts['emails'] = list(dict.fromkeys(
e.lower() for e in emails
if not any(d in e.lower() for d in skip_domains)
))[:5] # Max 5 emails
# Phone extraction - Polish patterns
phone_patterns = [
r'(?:\+48\s?)?\d{2}[\s-]?\d{3}[\s-]?\d{2}[\s-]?\d{2}', # +48 XX XXX XX XX
r'(?:\+48\s?)?\d{3}[\s-]?\d{3}[\s-]?\d{3}', # +48 XXX XXX XXX
r'\(\d{2}\)\s?\d{3}[\s-]?\d{2}[\s-]?\d{2}', # (XX) XXX XX XX
r'(?:tel|phone|telefon)[.:]\s*[\+]?\d[\d\s\-]{7,14}', # tel: +48...
]
for pattern in phone_patterns:
matches = re.findall(pattern, html_text, re.IGNORECASE)
for m in matches:
# Clean up
clean = re.sub(r'(?:tel|phone|telefon)[.:]?\s*', '', m, flags=re.IGNORECASE).strip()
digits = re.sub(r'\D', '', clean)
if 9 <= len(digits) <= 12:
contacts['phones'].append(clean)
contacts['phones'] = list(dict.fromkeys(contacts['phones']))[:5]
return contacts
def update_company(self, company: Company) -> bool:
"""
Aktualizuje dane jednej firmy.
Returns:
True jeśli sukces, False jeśli błąd
"""
self.stats['processed'] += 1
if not company.website:
logger.info(f"[{company.id}] {company.name}: Brak strony www - pomijam")
self.stats['no_website'] += 1
return True # Nie jest błędem
logger.info(f"[{company.id}] {company.name}: Pobieram {company.website}")
# Fetch website content
text, error = self.fetch_website_content(company.website)
if error:
logger.warning(f"[{company.id}] {company.name}: Błąd pobierania - {error}")
self.stats['errors'] += 1
return False
if not text or len(text) < 100:
logger.warning(f"[{company.id}] {company.name}: Za mało treści ({len(text) if text else 0} znaków)")
self.stats['skipped'] += 1
return True
logger.info(f"[{company.id}] {company.name}: Pobrano {len(text)} znaków, ekstrakcja AI...")
# Extract with Gemini
extracted = self.extract_with_gemini(company.name, text)
if extracted.get('error'):
logger.warning(f"[{company.id}] {company.name}: Błąd AI - {extracted['error']}")
self.stats['errors'] += 1
return False
services = extracted.get('services', [])
keywords = extracted.get('keywords', [])
summary = extracted.get('summary')
raw_data = extracted.get('raw_data', {})
logger.info(f"[{company.id}] {company.name}: Wyodrębniono {len(services)} usług/produktów, {len(keywords)} słów kluczowych")
if raw_data:
logger.debug(f" - Usługi: {len(raw_data.get('services', []))}")
logger.debug(f" - Produkty: {len(raw_data.get('products', []))}")
logger.debug(f" - Specjalizacje: {len(raw_data.get('specializations', []))}")
logger.debug(f" - Marki/Partnerzy: {len(raw_data.get('brands', []))}")
logger.debug(f" - Klienci docelowi: {len(raw_data.get('target_customers', []))}")
logger.debug(f" - Regiony: {len(raw_data.get('regions', []))}")
if self.dry_run:
logger.info(f"[DRY-RUN] Usługi/Produkty ({len(services)}): {services}")
logger.info(f"[DRY-RUN] Słowa kluczowe ({len(keywords)}): {keywords}")
if summary:
logger.info(f"[DRY-RUN] Podsumowanie: {summary}")
self.stats['updated'] += 1
return True
# Update or create CompanyWebsiteAnalysis
try:
analysis = self.db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company.id
).first()
if not analysis:
analysis = CompanyWebsiteAnalysis(company_id=company.id)
self.db.add(analysis)
# Update fields
analysis.services_extracted = services if services else None
analysis.main_keywords = keywords if keywords else None
analysis.content_summary = summary
analysis.analyzed_at = datetime.utcnow()
analysis.website_url = company.website
self.db.commit()
self.stats['updated'] += 1
# --- Contact extraction ---
all_phones = []
all_emails = []
# From Gemini
if extracted.get('contact_phone'):
all_phones.append(extracted['contact_phone'])
if extracted.get('contact_email'):
all_emails.append(extracted['contact_email'])
# From regex fallback
regex_contacts = self.extract_contacts_regex(text)
all_phones.extend(regex_contacts.get('phones', []))
all_emails.extend(regex_contacts.get('emails', []))
# Deduplicate
all_phones = list(dict.fromkeys(all_phones))
all_emails = list(dict.fromkeys(all_emails))
# Save to CompanyContact (source='website')
contacts_added = 0
for phone in all_phones[:3]: # Max 3 phones
existing = self.db.query(CompanyContact).filter_by(
company_id=company.id, contact_type='phone', value=phone
).first()
if not existing:
self.db.add(CompanyContact(
company_id=company.id,
contact_type='phone',
value=phone,
source='website',
source_url=company.website,
source_date=datetime.now().date(),
is_verified=False,
))
contacts_added += 1
logger.info(f" [{company.id}] Found phone: {phone}")
for email in all_emails[:3]: # Max 3 emails
existing = self.db.query(CompanyContact).filter_by(
company_id=company.id, contact_type='email', value=email
).first()
if not existing:
self.db.add(CompanyContact(
company_id=company.id,
contact_type='email',
value=email,
source='website',
source_url=company.website,
source_date=datetime.now().date(),
is_verified=False,
))
contacts_added += 1
logger.info(f" [{company.id}] Found email: {email}")
if contacts_added > 0:
self.db.commit()
logger.info(f" [{company.id}] Saved {contacts_added} new contacts")
logger.info(f"[{company.id}] {company.name}: ✓ Zaktualizowano")
return True
except Exception as e:
self.db.rollback()
logger.error(f"[{company.id}] {company.name}: Błąd zapisu do bazy - {e}")
self.stats['errors'] += 1
return False
def update_all(self, stale_days: Optional[int] = None, batch_range: Optional[str] = None) -> Dict:
"""
Aktualizuje wszystkie firmy (lub batch).
Args:
stale_days: Aktualizuj tylko firmy nieaktualizowane od X dni
batch_range: String "start-end" dla batch processing
Returns:
Dict ze statystykami
"""
query = self.db.query(Company).filter(Company.status == 'active')
if batch_range:
start, end = map(int, batch_range.split('-'))
query = query.filter(Company.id >= start, Company.id <= end)
companies = query.order_by(Company.id).all()
# Filter by stale_days if specified
if stale_days:
cutoff = datetime.utcnow() - timedelta(days=stale_days)
filtered = []
for company in companies:
analysis = self.db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company.id
).first()
if not analysis or not analysis.analyzed_at or analysis.analyzed_at < cutoff:
filtered.append(company)
companies = filtered
logger.info(f"Filtrowanie: {len(companies)} firm starszych niż {stale_days} dni")
total = len(companies)
logger.info(f"Rozpoczynam aktualizację {total} firm...")
for i, company in enumerate(companies, 1):
logger.info(f"--- [{i}/{total}] ---")
self.update_company(company)
# Rate limiting for Gemini free tier
if i < total and self.gemini_service:
time.sleep(RATE_LIMIT_DELAY)
return self.stats
def update_single(self, company_id: int) -> bool:
"""Aktualizuje pojedynczą firmę po ID."""
company = self.db.query(Company).filter(Company.id == company_id).first()
if not company:
logger.error(f"Firma o ID {company_id} nie istnieje")
return False
return self.update_company(company)
def main():
parser = argparse.ArgumentParser(
description='Aktualizacja danych ze stron www firm (usługi, słowa kluczowe)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Przykłady:
%(prog)s # Wszystkie firmy
%(prog)s --company-id 26 # Konkretna firma
%(prog)s --batch 1-50 # Firmy o ID 1-50
%(prog)s --stale-days 30 # Tylko firmy nieaktualizowane 30+ dni
%(prog)s --dry-run # Podgląd bez zapisywania
"""
)
parser.add_argument(
'--company-id',
type=int,
help='ID konkretnej firmy do aktualizacji'
)
parser.add_argument(
'--batch',
type=str,
help='Zakres ID firm (np. "1-50")'
)
parser.add_argument(
'--stale-days',
type=int,
default=None,
help='Aktualizuj tylko firmy nieaktualizowane od X dni'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Podgląd bez zapisywania do bazy'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Szczegółowe logi'
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Database session
try:
db = SessionLocal()
except Exception as e:
logger.error(f"Błąd połączenia z bazą danych: {e}")
return EXIT_DATABASE_ERROR
try:
updater = WebsiteContentUpdater(db, dry_run=args.dry_run)
if not updater.gemini_service:
logger.error("Gemini service not available - cannot proceed")
return EXIT_API_ERROR
if args.company_id:
# Single company
success = updater.update_single(args.company_id)
return EXIT_SUCCESS if success else EXIT_ALL_FAILED
else:
# All companies (with optional filters)
stats = updater.update_all(
stale_days=args.stale_days,
batch_range=args.batch
)
# Print summary
print("\n" + "="*50)
print("PODSUMOWANIE")
print("="*50)
print(f"Przetworzono: {stats['processed']}")
print(f"Zaktualizowano: {stats['updated']}")
print(f"Pominięto: {stats['skipped']}")
print(f"Bez strony www: {stats['no_website']}")
print(f"Błędy: {stats['errors']}")
print("="*50)
if stats['errors'] == stats['processed']:
return EXIT_ALL_FAILED
elif stats['errors'] > 0:
return EXIT_PARTIAL_FAILURES
else:
return EXIT_SUCCESS
except KeyboardInterrupt:
logger.info("Przerwano przez użytkownika")
return EXIT_ARGUMENT_ERROR
except Exception as e:
logger.error(f"Nieoczekiwany błąd: {e}")
return EXIT_ALL_FAILED
finally:
db.close()
if __name__ == '__main__':
sys.exit(main())