#!/usr/bin/env python3 """ CEIDG Search by Name - wyszukuje firmy w CEIDG po nazwie Dla firm bez NIP w bazie - szuka w portalu CEIDG po nazwie firmy i weryfikuje wyniki przez porównanie adresu/telefonu. Portal CEIDG: https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx Usage: python scripts/search_ceidg_by_name.py # Szukaj wszystkich python scripts/search_ceidg_by_name.py --id 119 # Szukaj konkretnej firmy python scripts/search_ceidg_by_name.py --apply # Zapisz znalezione NIP """ import os import sys import re import argparse import time import json from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict, field from typing import Optional, List from difflib import SequenceMatcher # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout except ImportError: print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium") sys.exit(1) from database import SessionLocal, Company # Output directory RESULTS_DIR = Path(__file__).parent.parent / "data" / "ceidg_search_results" RESULTS_DIR.mkdir(parents=True, exist_ok=True) # Domains to skip (public email providers) SKIP_DOMAINS = { 'gmail.com', 'wp.pl', 'onet.pl', 'op.pl', 'interia.pl', 'o2.pl', 'poczta.fm', 'yahoo.com', 'hotmail.com', 'outlook.com' } @dataclass class CEIDGSearchResult: """Wynik wyszukiwania w CEIDG""" company_id: int company_name: str search_query: str # Znalezione dane found_nip: Optional[str] = None found_regon: Optional[str] = None found_name: Optional[str] = None found_owner: Optional[str] = None found_address: Optional[str] = None found_status: Optional[str] = None # Weryfikacja matches: List[str] = field(default_factory=list) # Co się zgadza confidence: str = "low" # low, medium, high verified: bool = False error: Optional[str] = None searched_at: str = "" def __post_init__(self): if not self.searched_at: self.searched_at = datetime.now().isoformat() def to_dict(self): return asdict(self) def normalize_phone(phone: str) -> str: """Normalizuje numer telefonu do samych cyfr""" if not phone: return "" return re.sub(r'[^0-9]', '', phone) def normalize_address(address: str) -> str: """Normalizuje adres do porównania""" if not address: return "" # Lowercase, usuń znaki specjalne addr = address.lower() addr = re.sub(r'[^\w\s]', ' ', addr) addr = re.sub(r'\s+', ' ', addr).strip() return addr def similarity(a: str, b: str) -> float: """Oblicza podobieństwo dwóch stringów (0-1)""" if not a or not b: return 0.0 return SequenceMatcher(None, a.lower(), b.lower()).ratio() def validate_nip(nip: str) -> bool: """Waliduje NIP (checksum)""" nip = re.sub(r'[^0-9]', '', nip) if len(nip) != 10: return False weights = [6, 5, 7, 2, 3, 4, 5, 6, 7] try: checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11 return checksum == int(nip[9]) except (ValueError, IndexError): return False def extract_nip_from_text(text: str) -> Optional[str]: """Wyciąga NIP z tekstu""" patterns = [ r'NIP[:\s]*(\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2})', r'NIP[:\s]*(\d{10})', r'\b(\d{10})\b', # Standalone 10 digits ] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: nip = re.sub(r'[^0-9]', '', match) if validate_nip(nip): return nip return None def extract_regon_from_text(text: str) -> Optional[str]: """Wyciąga REGON z tekstu""" patterns = [ r'REGON[:\s]*(\d{9,14})', ] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: regon = re.sub(r'[^0-9]', '', match) if len(regon) in (9, 14): return regon return None def search_ceidg(company: Company) -> CEIDGSearchResult: """ Szuka firmy w CEIDG po nazwie. Portal CEIDG: https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx """ # Prepare search query search_name = company.name # Remove common suffixes (CEIDG is for sole proprietorships, not companies) for suffix in [' sp. z o.o.', ' sp.z o.o.', ' s.c.', ' s.j.']: search_name = search_name.replace(suffix, '').replace(suffix.upper(), '') search_name = search_name.strip() result = CEIDGSearchResult( company_id=company.id, company_name=company.name, search_query=search_name ) print(f" Szukam w CEIDG: '{search_name}'") with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) page = context.new_page() page.set_default_timeout(60000) # 60 seconds default timeout try: # Go to CEIDG search page print(" → Ładuję stronę CEIDG...") page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=60000) time.sleep(3) # Wait for page to be ready page.wait_for_load_state("domcontentloaded", timeout=30000) print(" → Strona załadowana") # Try multiple selectors for company name field firma_input = None selectors = [ "input[id*='txtFirma']", "input[id*='Firma']", "input[name*='Firma']", "#ctl00_MainContent_txtFirma", "input[placeholder*='Nazwa firmy']", ] for selector in selectors: try: elem = page.locator(selector).first if elem.is_visible(timeout=2000): firma_input = elem print(f" → Znaleziono pole wyszukiwania: {selector}") break except: continue if not firma_input: # Take screenshot for debugging screenshot_path = RESULTS_DIR / f"ceidg_debug_{company.id}.png" page.screenshot(path=str(screenshot_path)) result.error = f"Nie znaleziono pola wyszukiwania. Screenshot: {screenshot_path}" return result # Fill in company name firma_input.fill(search_name) print(f" → Wpisano: '{search_name}'") time.sleep(1) # Add city if available if company.address_city: city_selectors = [ "input[id*='txtMiasto']", "input[id*='Miasto']", "#ctl00_MainContent_txtMiasto", ] for selector in city_selectors: try: city_input = page.locator(selector).first if city_input.is_visible(timeout=2000): city_input.fill(company.address_city) print(f" → Dodano miasto: '{company.address_city}'") break except: continue # Find and click search button search_selectors = [ "input[id*='btnSearch']", "input[value='Szukaj']", "button:has-text('Szukaj')", "#ctl00_MainContent_btnSearch", ] search_clicked = False for selector in search_selectors: try: btn = page.locator(selector).first if btn.is_visible(timeout=2000): btn.click() search_clicked = True print(" → Kliknięto Szukaj") break except: continue if not search_clicked: page.keyboard.press("Enter") print(" → Wysłano Enter") # Wait for results time.sleep(5) page.wait_for_load_state("networkidle", timeout=30000) print(" → Wyniki załadowane") # Check for "no results" message page_text_check = page.inner_text("body") if "Brak wyników" in page_text_check or "nie znaleziono" in page_text_check.lower(): result.error = "Nie znaleziono w CEIDG" return result # Find details link details_selectors = [ "a:has-text('Szczegóły')", "a[href*='SearchDetails']", "a[id*='Details']", "a.details-link", ] details_link = None for selector in details_selectors: try: link = page.locator(selector).first if link.is_visible(timeout=3000): details_link = link break except: continue if not details_link: # Maybe direct results page? page_text = page.inner_text("body") nip = extract_nip_from_text(page_text) if nip: result.found_nip = nip result.found_regon = extract_regon_from_text(page_text) result = verify_result(result, company) return result result.error = "Brak linku do szczegółów" screenshot_path = RESULTS_DIR / f"ceidg_results_{company.id}.png" page.screenshot(path=str(screenshot_path)) return result # Click details link details_link.click() print(" → Kliknięto Szczegóły") time.sleep(4) page.wait_for_load_state("networkidle", timeout=30000) # Extract data from details page page_text = page.inner_text("body") # Extract NIP result.found_nip = extract_nip_from_text(page_text) result.found_regon = extract_regon_from_text(page_text) # Extract owner name owner_match = re.search(r'Imię i nazwisko[:\s]*([A-ZĄĆĘŁŃÓŚŹŻ][a-ząćęłńóśźż]+\s+[A-ZĄĆĘŁŃÓŚŹŻ][a-ząćęłńóśźż]+)', page_text) if owner_match: result.found_owner = owner_match.group(1).strip() # Extract company name from CEIDG firma_match = re.search(r'Firma przedsiębiorcy[:\s]*(.+?)(?:\n|Adres|Status)', page_text, re.DOTALL) if firma_match: result.found_name = firma_match.group(1).strip()[:200] # Extract address addr_match = re.search(r'Adres[:\s]*(.+?)(?:\n\n|Status|Data)', page_text, re.DOTALL) if addr_match: result.found_address = addr_match.group(1).strip()[:200] # Extract status if 'AKTYWNY' in page_text.upper(): result.found_status = 'AKTYWNY' elif 'ZAWIESZONY' in page_text.upper(): result.found_status = 'ZAWIESZONY' elif 'WYKREŚLONY' in page_text.upper(): result.found_status = 'WYKREŚLONY' # Verify the result result = verify_result(result, company) except PlaywrightTimeout: result.error = "Timeout" except Exception as e: result.error = str(e)[:200] finally: browser.close() return result def verify_result(result: CEIDGSearchResult, company: Company) -> CEIDGSearchResult: """ Weryfikuje czy znaleziony wynik pasuje do naszej firmy. """ if not result.found_nip: result.error = "NIP nie znaleziony na stronie szczegółów" return result matches = [] # 1. Sprawdź podobieństwo nazwy if result.found_name: name_sim = similarity(company.name, result.found_name) if name_sim > 0.7: matches.append(f"nazwa ({name_sim:.0%})") elif name_sim > 0.5: matches.append(f"nazwa częściowa ({name_sim:.0%})") # 2. Sprawdź adres/miasto if result.found_address and company.address_city: if company.address_city.lower() in result.found_address.lower(): matches.append("miasto") if result.found_address and company.address_street: if company.address_street.lower()[:10] in result.found_address.lower(): matches.append("ulica") # 3. Sprawdź właściciela (jeśli mamy w nazwie) if result.found_owner: owner_parts = result.found_owner.lower().split() company_name_lower = company.name.lower() for part in owner_parts: if len(part) > 3 and part in company_name_lower: matches.append("właściciel w nazwie") break # Determine confidence result.matches = matches if len(matches) >= 2: result.confidence = "high" result.verified = True elif len(matches) == 1 and "nazwa" in matches[0]: result.confidence = "medium" result.verified = True elif len(matches) == 1: result.confidence = "low" result.verified = False else: result.confidence = "low" result.verified = False return result def get_companies_without_nip(db, company_id: int = None) -> List[Company]: """Pobiera firmy bez NIP""" query = db.query(Company).filter( (Company.nip == None) | (Company.nip == '') ) if company_id: query = query.filter(Company.id == company_id) return query.order_by(Company.name).all() def main(): parser = argparse.ArgumentParser(description="Search CEIDG by company name") parser.add_argument('--id', type=int, help="Search specific company ID") parser.add_argument('--apply', action='store_true', help="Apply found NIPs to database") parser.add_argument('--limit', type=int, default=50, help="Limit number of companies to search") parser.add_argument('--output', type=str, help="Output JSON file path") args = parser.parse_args() db = SessionLocal() try: companies = get_companies_without_nip(db, args.id) if not args.id: companies = companies[:args.limit] print(f"\n=== Wyszukiwanie {len(companies)} firm w CEIDG ===\n") results = [] found_count = 0 verified_count = 0 for i, company in enumerate(companies, 1): print(f"[{i}/{len(companies)}] {company.name}") result = search_ceidg(company) results.append(result) if result.found_nip: found_count += 1 status = "✓" if result.verified else "?" print(f" {status} NIP: {result.found_nip} (confidence: {result.confidence})") print(f" Matches: {', '.join(result.matches) if result.matches else 'brak'}") if result.verified: verified_count += 1 if args.apply: company.nip = result.found_nip if result.found_regon and not company.regon: company.regon = result.found_regon db.commit() print(f" → Zapisano do bazy") elif result.error: print(f" ✗ {result.error}") # Rate limiting - CEIDG może blokować time.sleep(3) # Save results to JSON output_file = args.output or (RESULTS_DIR / f"ceidg_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2) print(f"\n=== Podsumowanie ===") print(f"Przeszukano: {len(companies)} firm") print(f"Znaleziono NIP: {found_count}") print(f"Zweryfikowano: {verified_count}") print(f"Wyniki zapisane: {output_file}") if verified_count > 0 and not args.apply: print(f"\nUżyj --apply aby zapisać zweryfikowane NIP do bazy") finally: db.close() if __name__ == "__main__": main()