nordabiz/scripts/search_ceidg_by_name.py
2026-02-01 07:22:32 +01:00

494 lines
16 KiB
Python

#!/usr/bin/env python3
"""
CEIDG Search by Name - wyszukuje firmy w CEIDG po nazwie
Dla firm bez NIP w bazie - szuka w portalu CEIDG po nazwie firmy
i weryfikuje wyniki przez porównanie adresu/telefonu.
Portal CEIDG: https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx
Usage:
python scripts/search_ceidg_by_name.py # Szukaj wszystkich
python scripts/search_ceidg_by_name.py --id 119 # Szukaj konkretnej firmy
python scripts/search_ceidg_by_name.py --apply # Zapisz znalezione NIP
"""
import os
import sys
import re
import argparse
import time
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict, field
from typing import Optional, List
from difflib import SequenceMatcher
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
except ImportError:
print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium")
sys.exit(1)
from database import SessionLocal, Company
# Output directory
RESULTS_DIR = Path(__file__).parent.parent / "data" / "ceidg_search_results"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
# Domains to skip (public email providers)
SKIP_DOMAINS = {
'gmail.com', 'wp.pl', 'onet.pl', 'op.pl', 'interia.pl',
'o2.pl', 'poczta.fm', 'yahoo.com', 'hotmail.com', 'outlook.com'
}
@dataclass
class CEIDGSearchResult:
"""Wynik wyszukiwania w CEIDG"""
company_id: int
company_name: str
search_query: str
# Znalezione dane
found_nip: Optional[str] = None
found_regon: Optional[str] = None
found_name: Optional[str] = None
found_owner: Optional[str] = None
found_address: Optional[str] = None
found_status: Optional[str] = None
# Weryfikacja
matches: List[str] = field(default_factory=list) # Co się zgadza
confidence: str = "low" # low, medium, high
verified: bool = False
error: Optional[str] = None
searched_at: str = ""
def __post_init__(self):
if not self.searched_at:
self.searched_at = datetime.now().isoformat()
def to_dict(self):
return asdict(self)
def normalize_phone(phone: str) -> str:
"""Normalizuje numer telefonu do samych cyfr"""
if not phone:
return ""
return re.sub(r'[^0-9]', '', phone)
def normalize_address(address: str) -> str:
"""Normalizuje adres do porównania"""
if not address:
return ""
# Lowercase, usuń znaki specjalne
addr = address.lower()
addr = re.sub(r'[^\w\s]', ' ', addr)
addr = re.sub(r'\s+', ' ', addr).strip()
return addr
def similarity(a: str, b: str) -> float:
"""Oblicza podobieństwo dwóch stringów (0-1)"""
if not a or not b:
return 0.0
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def validate_nip(nip: str) -> bool:
"""Waliduje NIP (checksum)"""
nip = re.sub(r'[^0-9]', '', nip)
if len(nip) != 10:
return False
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
try:
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
return checksum == int(nip[9])
except (ValueError, IndexError):
return False
def extract_nip_from_text(text: str) -> Optional[str]:
"""Wyciąga NIP z tekstu"""
patterns = [
r'NIP[:\s]*(\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2})',
r'NIP[:\s]*(\d{10})',
r'\b(\d{10})\b', # Standalone 10 digits
]
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
nip = re.sub(r'[^0-9]', '', match)
if validate_nip(nip):
return nip
return None
def extract_regon_from_text(text: str) -> Optional[str]:
"""Wyciąga REGON z tekstu"""
patterns = [
r'REGON[:\s]*(\d{9,14})',
]
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
regon = re.sub(r'[^0-9]', '', match)
if len(regon) in (9, 14):
return regon
return None
def search_ceidg(company: Company) -> CEIDGSearchResult:
"""
Szuka firmy w CEIDG po nazwie.
Portal CEIDG: https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx
"""
# Prepare search query
search_name = company.name
# Remove common suffixes (CEIDG is for sole proprietorships, not companies)
for suffix in [' sp. z o.o.', ' sp.z o.o.', ' s.c.', ' s.j.']:
search_name = search_name.replace(suffix, '').replace(suffix.upper(), '')
search_name = search_name.strip()
result = CEIDGSearchResult(
company_id=company.id,
company_name=company.name,
search_query=search_name
)
print(f" Szukam w CEIDG: '{search_name}'")
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
page = context.new_page()
page.set_default_timeout(60000) # 60 seconds default timeout
try:
# Go to CEIDG search page
print(" → Ładuję stronę CEIDG...")
page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=60000)
time.sleep(3)
# Wait for page to be ready
page.wait_for_load_state("domcontentloaded", timeout=30000)
print(" → Strona załadowana")
# Try multiple selectors for company name field
firma_input = None
selectors = [
"input[id*='txtFirma']",
"input[id*='Firma']",
"input[name*='Firma']",
"#ctl00_MainContent_txtFirma",
"input[placeholder*='Nazwa firmy']",
]
for selector in selectors:
try:
elem = page.locator(selector).first
if elem.is_visible(timeout=2000):
firma_input = elem
print(f" → Znaleziono pole wyszukiwania: {selector}")
break
except:
continue
if not firma_input:
# Take screenshot for debugging
screenshot_path = RESULTS_DIR / f"ceidg_debug_{company.id}.png"
page.screenshot(path=str(screenshot_path))
result.error = f"Nie znaleziono pola wyszukiwania. Screenshot: {screenshot_path}"
return result
# Fill in company name
firma_input.fill(search_name)
print(f" → Wpisano: '{search_name}'")
time.sleep(1)
# Add city if available
if company.address_city:
city_selectors = [
"input[id*='txtMiasto']",
"input[id*='Miasto']",
"#ctl00_MainContent_txtMiasto",
]
for selector in city_selectors:
try:
city_input = page.locator(selector).first
if city_input.is_visible(timeout=2000):
city_input.fill(company.address_city)
print(f" → Dodano miasto: '{company.address_city}'")
break
except:
continue
# Find and click search button
search_selectors = [
"input[id*='btnSearch']",
"input[value='Szukaj']",
"button:has-text('Szukaj')",
"#ctl00_MainContent_btnSearch",
]
search_clicked = False
for selector in search_selectors:
try:
btn = page.locator(selector).first
if btn.is_visible(timeout=2000):
btn.click()
search_clicked = True
print(" → Kliknięto Szukaj")
break
except:
continue
if not search_clicked:
page.keyboard.press("Enter")
print(" → Wysłano Enter")
# Wait for results
time.sleep(5)
page.wait_for_load_state("networkidle", timeout=30000)
print(" → Wyniki załadowane")
# Check for "no results" message
page_text_check = page.inner_text("body")
if "Brak wyników" in page_text_check or "nie znaleziono" in page_text_check.lower():
result.error = "Nie znaleziono w CEIDG"
return result
# Find details link
details_selectors = [
"a:has-text('Szczegóły')",
"a[href*='SearchDetails']",
"a[id*='Details']",
"a.details-link",
]
details_link = None
for selector in details_selectors:
try:
link = page.locator(selector).first
if link.is_visible(timeout=3000):
details_link = link
break
except:
continue
if not details_link:
# Maybe direct results page?
page_text = page.inner_text("body")
nip = extract_nip_from_text(page_text)
if nip:
result.found_nip = nip
result.found_regon = extract_regon_from_text(page_text)
result = verify_result(result, company)
return result
result.error = "Brak linku do szczegółów"
screenshot_path = RESULTS_DIR / f"ceidg_results_{company.id}.png"
page.screenshot(path=str(screenshot_path))
return result
# Click details link
details_link.click()
print(" → Kliknięto Szczegóły")
time.sleep(4)
page.wait_for_load_state("networkidle", timeout=30000)
# Extract data from details page
page_text = page.inner_text("body")
# Extract NIP
result.found_nip = extract_nip_from_text(page_text)
result.found_regon = extract_regon_from_text(page_text)
# Extract owner name
owner_match = re.search(r'Imię i nazwisko[:\s]*([A-ZĄĆĘŁŃÓŚŹŻ][a-ząćęłńóśźż]+\s+[A-ZĄĆĘŁŃÓŚŹŻ][a-ząćęłńóśźż]+)', page_text)
if owner_match:
result.found_owner = owner_match.group(1).strip()
# Extract company name from CEIDG
firma_match = re.search(r'Firma przedsiębiorcy[:\s]*(.+?)(?:\n|Adres|Status)', page_text, re.DOTALL)
if firma_match:
result.found_name = firma_match.group(1).strip()[:200]
# Extract address
addr_match = re.search(r'Adres[:\s]*(.+?)(?:\n\n|Status|Data)', page_text, re.DOTALL)
if addr_match:
result.found_address = addr_match.group(1).strip()[:200]
# Extract status
if 'AKTYWNY' in page_text.upper():
result.found_status = 'AKTYWNY'
elif 'ZAWIESZONY' in page_text.upper():
result.found_status = 'ZAWIESZONY'
elif 'WYKREŚLONY' in page_text.upper():
result.found_status = 'WYKREŚLONY'
# Verify the result
result = verify_result(result, company)
except PlaywrightTimeout:
result.error = "Timeout"
except Exception as e:
result.error = str(e)[:200]
finally:
browser.close()
return result
def verify_result(result: CEIDGSearchResult, company: Company) -> CEIDGSearchResult:
"""
Weryfikuje czy znaleziony wynik pasuje do naszej firmy.
"""
if not result.found_nip:
result.error = "NIP nie znaleziony na stronie szczegółów"
return result
matches = []
# 1. Sprawdź podobieństwo nazwy
if result.found_name:
name_sim = similarity(company.name, result.found_name)
if name_sim > 0.7:
matches.append(f"nazwa ({name_sim:.0%})")
elif name_sim > 0.5:
matches.append(f"nazwa częściowa ({name_sim:.0%})")
# 2. Sprawdź adres/miasto
if result.found_address and company.address_city:
if company.address_city.lower() in result.found_address.lower():
matches.append("miasto")
if result.found_address and company.address_street:
if company.address_street.lower()[:10] in result.found_address.lower():
matches.append("ulica")
# 3. Sprawdź właściciela (jeśli mamy w nazwie)
if result.found_owner:
owner_parts = result.found_owner.lower().split()
company_name_lower = company.name.lower()
for part in owner_parts:
if len(part) > 3 and part in company_name_lower:
matches.append("właściciel w nazwie")
break
# Determine confidence
result.matches = matches
if len(matches) >= 2:
result.confidence = "high"
result.verified = True
elif len(matches) == 1 and "nazwa" in matches[0]:
result.confidence = "medium"
result.verified = True
elif len(matches) == 1:
result.confidence = "low"
result.verified = False
else:
result.confidence = "low"
result.verified = False
return result
def get_companies_without_nip(db, company_id: int = None) -> List[Company]:
"""Pobiera firmy bez NIP"""
query = db.query(Company).filter(
(Company.nip == None) | (Company.nip == '')
)
if company_id:
query = query.filter(Company.id == company_id)
return query.order_by(Company.name).all()
def main():
parser = argparse.ArgumentParser(description="Search CEIDG by company name")
parser.add_argument('--id', type=int, help="Search specific company ID")
parser.add_argument('--apply', action='store_true', help="Apply found NIPs to database")
parser.add_argument('--limit', type=int, default=50, help="Limit number of companies to search")
parser.add_argument('--output', type=str, help="Output JSON file path")
args = parser.parse_args()
db = SessionLocal()
try:
companies = get_companies_without_nip(db, args.id)
if not args.id:
companies = companies[:args.limit]
print(f"\n=== Wyszukiwanie {len(companies)} firm w CEIDG ===\n")
results = []
found_count = 0
verified_count = 0
for i, company in enumerate(companies, 1):
print(f"[{i}/{len(companies)}] {company.name}")
result = search_ceidg(company)
results.append(result)
if result.found_nip:
found_count += 1
status = "" if result.verified else "?"
print(f" {status} NIP: {result.found_nip} (confidence: {result.confidence})")
print(f" Matches: {', '.join(result.matches) if result.matches else 'brak'}")
if result.verified:
verified_count += 1
if args.apply:
company.nip = result.found_nip
if result.found_regon and not company.regon:
company.regon = result.found_regon
db.commit()
print(f" → Zapisano do bazy")
elif result.error:
print(f"{result.error}")
# Rate limiting - CEIDG może blokować
time.sleep(3)
# Save results to JSON
output_file = args.output or (RESULTS_DIR / f"ceidg_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2)
print(f"\n=== Podsumowanie ===")
print(f"Przeszukano: {len(companies)} firm")
print(f"Znaleziono NIP: {found_count}")
print(f"Zweryfikowano: {verified_count}")
print(f"Wyniki zapisane: {output_file}")
if verified_count > 0 and not args.apply:
print(f"\nUżyj --apply aby zapisać zweryfikowane NIP do bazy")
finally:
db.close()
if __name__ == "__main__":
main()