#!/usr/bin/env python3 """ Website NIP Scanner - skanuje strony www firm w poszukiwaniu NIP/REGON Dla firm bez NIP w bazie - pobiera stronę www (z domeny email) i szuka numerów NIP/REGON w treści. Usage: python scripts/scan_websites_for_nip.py # Skanuj wszystkie python scripts/scan_websites_for_nip.py --id 119 # Skanuj konkretną firmę python scripts/scan_websites_for_nip.py --apply # Zapisz znalezione NIP do bazy """ import os import sys import re import argparse import time import json from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict from typing import Optional, List, Tuple import requests from urllib.parse import urlparse # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from database import SessionLocal, Company # Output directory for scan results RESULTS_DIR = Path(__file__).parent.parent / "data" / "nip_scan_results" RESULTS_DIR.mkdir(parents=True, exist_ok=True) # Domains to skip (public email providers) SKIP_DOMAINS = { 'gmail.com', 'wp.pl', 'onet.pl', 'op.pl', 'interia.pl', 'o2.pl', 'poczta.fm', 'yahoo.com', 'hotmail.com', 'outlook.com' } # Request timeout REQUEST_TIMEOUT = 15 # User agent USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" @dataclass class ScanResult: """Wynik skanowania strony www""" company_id: int company_name: str domain: str url_scanned: str nip_found: Optional[str] = None regon_found: Optional[str] = None nips_all: List[str] = None # Wszystkie znalezione NIP (może być wiele) regons_all: List[str] = None phone_found: Optional[str] = None address_found: Optional[str] = None confidence: str = "low" # low, medium, high error: Optional[str] = None scanned_at: str = "" def __post_init__(self): if self.nips_all is None: self.nips_all = [] if self.regons_all is None: self.regons_all = [] if not self.scanned_at: self.scanned_at = datetime.now().isoformat() def to_dict(self): return asdict(self) def extract_domain_from_email(email: str) -> Optional[str]: """Wyciąga domenę z adresu email""" if not email or '@' not in email: return None domain = email.split('@')[1].lower() if domain in SKIP_DOMAINS: return None return domain def normalize_nip(nip: str) -> str: """Normalizuje NIP do 10 cyfr""" return re.sub(r'[^0-9]', '', nip) def validate_nip(nip: str) -> bool: """Waliduje NIP (checksum)""" nip = normalize_nip(nip) if len(nip) != 10: return False weights = [6, 5, 7, 2, 3, 4, 5, 6, 7] try: checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11 return checksum == int(nip[9]) except (ValueError, IndexError): return False def validate_regon(regon: str) -> bool: """Waliduje REGON (9 lub 14 cyfr)""" regon = re.sub(r'[^0-9]', '', regon) if len(regon) == 9: weights = [8, 9, 2, 3, 4, 5, 6, 7] checksum = sum(int(regon[i]) * weights[i] for i in range(8)) % 11 if checksum == 10: checksum = 0 return checksum == int(regon[8]) elif len(regon) == 14: # Validate first 9 digits weights9 = [8, 9, 2, 3, 4, 5, 6, 7] checksum9 = sum(int(regon[i]) * weights9[i] for i in range(8)) % 11 if checksum9 == 10: checksum9 = 0 if checksum9 != int(regon[8]): return False # Validate full 14 digits weights14 = [2, 4, 8, 5, 0, 9, 7, 3, 6, 1, 2, 4, 8] checksum14 = sum(int(regon[i]) * weights14[i] for i in range(13)) % 11 if checksum14 == 10: checksum14 = 0 return checksum14 == int(regon[13]) return False def find_nips_in_text(text: str) -> List[str]: """Znajduje wszystkie NIP-y w tekście""" # Patterns for NIP patterns = [ r'NIP[:\s]*(\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2})', # NIP: 123-456-78-90 r'NIP[:\s]*(\d{10})', # NIP: 1234567890 r'numer\s+identyfikacji\s+podatkowej[:\s]*(\d{10})', ] nips = [] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: nip = normalize_nip(match) if validate_nip(nip) and nip not in nips: nips.append(nip) return nips def find_regons_in_text(text: str) -> List[str]: """Znajduje wszystkie REGON-y w tekście""" patterns = [ r'REGON[:\s]*(\d{9,14})', r'rejestr\s+gospodarczy[:\s]*(\d{9,14})', ] regons = [] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: regon = re.sub(r'[^0-9]', '', match) if validate_regon(regon) and regon not in regons: regons.append(regon) return regons def fetch_website(url: str) -> Tuple[Optional[str], Optional[str]]: """ Pobiera zawartość strony www. Returns: (content, error) - treść strony lub błąd """ headers = { 'User-Agent': USER_AGENT, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'pl-PL,pl;q=0.9,en;q=0.8', } try: response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT, allow_redirects=True) response.raise_for_status() return response.text, None except requests.exceptions.Timeout: return None, "Timeout" except requests.exceptions.ConnectionError: return None, "Connection error" except requests.exceptions.HTTPError as e: return None, f"HTTP {e.response.status_code}" except Exception as e: return None, str(e) def scan_company_website(company: Company) -> ScanResult: """ Skanuje stronę www firmy w poszukiwaniu NIP/REGON. """ # Get domain from email or website domain = None if company.website: parsed = urlparse(company.website if company.website.startswith('http') else f'https://{company.website}') domain = parsed.netloc or parsed.path.split('/')[0] elif company.email: domain = extract_domain_from_email(company.email) if not domain: return ScanResult( company_id=company.id, company_name=company.name, domain="", url_scanned="", error="No domain available" ) # Clean domain domain = domain.lower().replace('www.', '') # Try different URL variants urls_to_try = [ f"https://{domain}", f"https://www.{domain}", f"https://{domain}/kontakt", f"https://{domain}/o-nas", f"https://{domain}/contact", f"https://{domain}/about", ] result = ScanResult( company_id=company.id, company_name=company.name, domain=domain, url_scanned="" ) all_nips = [] all_regons = [] for url in urls_to_try: print(f" Scanning: {url}") content, error = fetch_website(url) if error: continue result.url_scanned = url # Find NIPs and REGONs nips = find_nips_in_text(content) regons = find_regons_in_text(content) all_nips.extend([n for n in nips if n not in all_nips]) all_regons.extend([r for r in regons if r not in all_regons]) # If found, set confidence if nips or regons: print(f" Found NIP: {nips}, REGON: {regons}") break time.sleep(0.5) # Rate limiting # Set results if all_nips: result.nips_all = all_nips result.nip_found = all_nips[0] # Primary NIP result.confidence = "high" if len(all_nips) == 1 else "medium" if all_regons: result.regons_all = all_regons result.regon_found = all_regons[0] if not all_nips and not all_regons and not result.error: result.error = "NIP/REGON not found on website" result.confidence = "low" return result def get_companies_without_nip(db, company_id: int = None) -> List[Company]: """Pobiera firmy bez NIP z domeną firmową""" query = db.query(Company).filter( (Company.nip == None) | (Company.nip == '') ) if company_id: query = query.filter(Company.id == company_id) companies = query.order_by(Company.name).all() # Filter out companies with public email domains result = [] for c in companies: domain = extract_domain_from_email(c.email) if c.email else None if domain or c.website: result.append(c) return result def main(): parser = argparse.ArgumentParser(description="Scan websites for NIP/REGON") parser.add_argument('--id', type=int, help="Scan specific company ID") parser.add_argument('--apply', action='store_true', help="Apply found NIPs to database") parser.add_argument('--output', type=str, help="Output JSON file path") args = parser.parse_args() db = SessionLocal() try: companies = get_companies_without_nip(db, args.id) print(f"\n=== Skanowanie {len(companies)} firm bez NIP ===\n") results = [] found_count = 0 for i, company in enumerate(companies, 1): print(f"[{i}/{len(companies)}] {company.name}") result = scan_company_website(company) results.append(result) if result.nip_found: found_count += 1 print(f" ✓ NIP: {result.nip_found} (confidence: {result.confidence})") if args.apply and result.confidence in ('high', 'medium'): company.nip = result.nip_found if result.regon_found and not company.regon: company.regon = result.regon_found db.commit() print(f" → Zapisano do bazy") elif result.error: print(f" ✗ {result.error}") time.sleep(1) # Rate limiting between companies # Save results to JSON output_file = args.output or (RESULTS_DIR / f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2) print(f"\n=== Podsumowanie ===") print(f"Przeskanowano: {len(companies)} firm") print(f"Znaleziono NIP: {found_count}") print(f"Wyniki zapisane: {output_file}") if found_count > 0 and not args.apply: print(f"\nUżyj --apply aby zapisać znalezione NIP do bazy") finally: db.close() if __name__ == "__main__": main()