494 lines
16 KiB
Python
494 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CEIDG Search by Name - wyszukuje firmy w CEIDG po nazwie
|
|
|
|
Dla firm bez NIP w bazie - szuka w portalu CEIDG po nazwie firmy
|
|
i weryfikuje wyniki przez porównanie adresu/telefonu.
|
|
|
|
Portal CEIDG: https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx
|
|
|
|
Usage:
|
|
python scripts/search_ceidg_by_name.py # Szukaj wszystkich
|
|
python scripts/search_ceidg_by_name.py --id 119 # Szukaj konkretnej firmy
|
|
python scripts/search_ceidg_by_name.py --apply # Zapisz znalezione NIP
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import argparse
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict, field
|
|
from typing import Optional, List
|
|
from difflib import SequenceMatcher
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
try:
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
|
|
except ImportError:
|
|
print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium")
|
|
sys.exit(1)
|
|
|
|
from database import SessionLocal, Company
|
|
|
|
# Output directory
|
|
RESULTS_DIR = Path(__file__).parent.parent / "data" / "ceidg_search_results"
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Domains to skip (public email providers)
|
|
SKIP_DOMAINS = {
|
|
'gmail.com', 'wp.pl', 'onet.pl', 'op.pl', 'interia.pl',
|
|
'o2.pl', 'poczta.fm', 'yahoo.com', 'hotmail.com', 'outlook.com'
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class CEIDGSearchResult:
|
|
"""Wynik wyszukiwania w CEIDG"""
|
|
company_id: int
|
|
company_name: str
|
|
search_query: str
|
|
|
|
# Znalezione dane
|
|
found_nip: Optional[str] = None
|
|
found_regon: Optional[str] = None
|
|
found_name: Optional[str] = None
|
|
found_owner: Optional[str] = None
|
|
found_address: Optional[str] = None
|
|
found_status: Optional[str] = None
|
|
|
|
# Weryfikacja
|
|
matches: List[str] = field(default_factory=list) # Co się zgadza
|
|
confidence: str = "low" # low, medium, high
|
|
verified: bool = False
|
|
|
|
error: Optional[str] = None
|
|
searched_at: str = ""
|
|
|
|
def __post_init__(self):
|
|
if not self.searched_at:
|
|
self.searched_at = datetime.now().isoformat()
|
|
|
|
def to_dict(self):
|
|
return asdict(self)
|
|
|
|
|
|
def normalize_phone(phone: str) -> str:
|
|
"""Normalizuje numer telefonu do samych cyfr"""
|
|
if not phone:
|
|
return ""
|
|
return re.sub(r'[^0-9]', '', phone)
|
|
|
|
|
|
def normalize_address(address: str) -> str:
|
|
"""Normalizuje adres do porównania"""
|
|
if not address:
|
|
return ""
|
|
# Lowercase, usuń znaki specjalne
|
|
addr = address.lower()
|
|
addr = re.sub(r'[^\w\s]', ' ', addr)
|
|
addr = re.sub(r'\s+', ' ', addr).strip()
|
|
return addr
|
|
|
|
|
|
def similarity(a: str, b: str) -> float:
|
|
"""Oblicza podobieństwo dwóch stringów (0-1)"""
|
|
if not a or not b:
|
|
return 0.0
|
|
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
|
|
|
|
|
def validate_nip(nip: str) -> bool:
|
|
"""Waliduje NIP (checksum)"""
|
|
nip = re.sub(r'[^0-9]', '', nip)
|
|
if len(nip) != 10:
|
|
return False
|
|
|
|
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
|
try:
|
|
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
|
|
return checksum == int(nip[9])
|
|
except (ValueError, IndexError):
|
|
return False
|
|
|
|
|
|
def extract_nip_from_text(text: str) -> Optional[str]:
|
|
"""Wyciąga NIP z tekstu"""
|
|
patterns = [
|
|
r'NIP[:\s]*(\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2})',
|
|
r'NIP[:\s]*(\d{10})',
|
|
r'\b(\d{10})\b', # Standalone 10 digits
|
|
]
|
|
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
for match in matches:
|
|
nip = re.sub(r'[^0-9]', '', match)
|
|
if validate_nip(nip):
|
|
return nip
|
|
return None
|
|
|
|
|
|
def extract_regon_from_text(text: str) -> Optional[str]:
|
|
"""Wyciąga REGON z tekstu"""
|
|
patterns = [
|
|
r'REGON[:\s]*(\d{9,14})',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
for match in matches:
|
|
regon = re.sub(r'[^0-9]', '', match)
|
|
if len(regon) in (9, 14):
|
|
return regon
|
|
return None
|
|
|
|
|
|
def search_ceidg(company: Company) -> CEIDGSearchResult:
|
|
"""
|
|
Szuka firmy w CEIDG po nazwie.
|
|
Portal CEIDG: https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx
|
|
"""
|
|
# Prepare search query
|
|
search_name = company.name
|
|
|
|
# Remove common suffixes (CEIDG is for sole proprietorships, not companies)
|
|
for suffix in [' sp. z o.o.', ' sp.z o.o.', ' s.c.', ' s.j.']:
|
|
search_name = search_name.replace(suffix, '').replace(suffix.upper(), '')
|
|
|
|
search_name = search_name.strip()
|
|
|
|
result = CEIDGSearchResult(
|
|
company_id=company.id,
|
|
company_name=company.name,
|
|
search_query=search_name
|
|
)
|
|
|
|
print(f" Szukam w CEIDG: '{search_name}'")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
context = browser.new_context(
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
page = context.new_page()
|
|
page.set_default_timeout(60000) # 60 seconds default timeout
|
|
|
|
try:
|
|
# Go to CEIDG search page
|
|
print(" → Ładuję stronę CEIDG...")
|
|
page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=60000)
|
|
time.sleep(3)
|
|
|
|
# Wait for page to be ready
|
|
page.wait_for_load_state("domcontentloaded", timeout=30000)
|
|
print(" → Strona załadowana")
|
|
|
|
# Try multiple selectors for company name field
|
|
firma_input = None
|
|
selectors = [
|
|
"input[id*='txtFirma']",
|
|
"input[id*='Firma']",
|
|
"input[name*='Firma']",
|
|
"#ctl00_MainContent_txtFirma",
|
|
"input[placeholder*='Nazwa firmy']",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
elem = page.locator(selector).first
|
|
if elem.is_visible(timeout=2000):
|
|
firma_input = elem
|
|
print(f" → Znaleziono pole wyszukiwania: {selector}")
|
|
break
|
|
except:
|
|
continue
|
|
|
|
if not firma_input:
|
|
# Take screenshot for debugging
|
|
screenshot_path = RESULTS_DIR / f"ceidg_debug_{company.id}.png"
|
|
page.screenshot(path=str(screenshot_path))
|
|
result.error = f"Nie znaleziono pola wyszukiwania. Screenshot: {screenshot_path}"
|
|
return result
|
|
|
|
# Fill in company name
|
|
firma_input.fill(search_name)
|
|
print(f" → Wpisano: '{search_name}'")
|
|
time.sleep(1)
|
|
|
|
# Add city if available
|
|
if company.address_city:
|
|
city_selectors = [
|
|
"input[id*='txtMiasto']",
|
|
"input[id*='Miasto']",
|
|
"#ctl00_MainContent_txtMiasto",
|
|
]
|
|
for selector in city_selectors:
|
|
try:
|
|
city_input = page.locator(selector).first
|
|
if city_input.is_visible(timeout=2000):
|
|
city_input.fill(company.address_city)
|
|
print(f" → Dodano miasto: '{company.address_city}'")
|
|
break
|
|
except:
|
|
continue
|
|
|
|
# Find and click search button
|
|
search_selectors = [
|
|
"input[id*='btnSearch']",
|
|
"input[value='Szukaj']",
|
|
"button:has-text('Szukaj')",
|
|
"#ctl00_MainContent_btnSearch",
|
|
]
|
|
|
|
search_clicked = False
|
|
for selector in search_selectors:
|
|
try:
|
|
btn = page.locator(selector).first
|
|
if btn.is_visible(timeout=2000):
|
|
btn.click()
|
|
search_clicked = True
|
|
print(" → Kliknięto Szukaj")
|
|
break
|
|
except:
|
|
continue
|
|
|
|
if not search_clicked:
|
|
page.keyboard.press("Enter")
|
|
print(" → Wysłano Enter")
|
|
|
|
# Wait for results
|
|
time.sleep(5)
|
|
page.wait_for_load_state("networkidle", timeout=30000)
|
|
print(" → Wyniki załadowane")
|
|
|
|
# Check for "no results" message
|
|
page_text_check = page.inner_text("body")
|
|
if "Brak wyników" in page_text_check or "nie znaleziono" in page_text_check.lower():
|
|
result.error = "Nie znaleziono w CEIDG"
|
|
return result
|
|
|
|
# Find details link
|
|
details_selectors = [
|
|
"a:has-text('Szczegóły')",
|
|
"a[href*='SearchDetails']",
|
|
"a[id*='Details']",
|
|
"a.details-link",
|
|
]
|
|
|
|
details_link = None
|
|
for selector in details_selectors:
|
|
try:
|
|
link = page.locator(selector).first
|
|
if link.is_visible(timeout=3000):
|
|
details_link = link
|
|
break
|
|
except:
|
|
continue
|
|
|
|
if not details_link:
|
|
# Maybe direct results page?
|
|
page_text = page.inner_text("body")
|
|
nip = extract_nip_from_text(page_text)
|
|
if nip:
|
|
result.found_nip = nip
|
|
result.found_regon = extract_regon_from_text(page_text)
|
|
result = verify_result(result, company)
|
|
return result
|
|
|
|
result.error = "Brak linku do szczegółów"
|
|
screenshot_path = RESULTS_DIR / f"ceidg_results_{company.id}.png"
|
|
page.screenshot(path=str(screenshot_path))
|
|
return result
|
|
|
|
# Click details link
|
|
details_link.click()
|
|
print(" → Kliknięto Szczegóły")
|
|
time.sleep(4)
|
|
page.wait_for_load_state("networkidle", timeout=30000)
|
|
|
|
# Extract data from details page
|
|
page_text = page.inner_text("body")
|
|
|
|
# Extract NIP
|
|
result.found_nip = extract_nip_from_text(page_text)
|
|
result.found_regon = extract_regon_from_text(page_text)
|
|
|
|
# Extract owner name
|
|
owner_match = re.search(r'Imię i nazwisko[:\s]*([A-ZĄĆĘŁŃÓŚŹŻ][a-ząćęłńóśźż]+\s+[A-ZĄĆĘŁŃÓŚŹŻ][a-ząćęłńóśźż]+)', page_text)
|
|
if owner_match:
|
|
result.found_owner = owner_match.group(1).strip()
|
|
|
|
# Extract company name from CEIDG
|
|
firma_match = re.search(r'Firma przedsiębiorcy[:\s]*(.+?)(?:\n|Adres|Status)', page_text, re.DOTALL)
|
|
if firma_match:
|
|
result.found_name = firma_match.group(1).strip()[:200]
|
|
|
|
# Extract address
|
|
addr_match = re.search(r'Adres[:\s]*(.+?)(?:\n\n|Status|Data)', page_text, re.DOTALL)
|
|
if addr_match:
|
|
result.found_address = addr_match.group(1).strip()[:200]
|
|
|
|
# Extract status
|
|
if 'AKTYWNY' in page_text.upper():
|
|
result.found_status = 'AKTYWNY'
|
|
elif 'ZAWIESZONY' in page_text.upper():
|
|
result.found_status = 'ZAWIESZONY'
|
|
elif 'WYKREŚLONY' in page_text.upper():
|
|
result.found_status = 'WYKREŚLONY'
|
|
|
|
# Verify the result
|
|
result = verify_result(result, company)
|
|
|
|
except PlaywrightTimeout:
|
|
result.error = "Timeout"
|
|
except Exception as e:
|
|
result.error = str(e)[:200]
|
|
finally:
|
|
browser.close()
|
|
|
|
return result
|
|
|
|
|
|
def verify_result(result: CEIDGSearchResult, company: Company) -> CEIDGSearchResult:
|
|
"""
|
|
Weryfikuje czy znaleziony wynik pasuje do naszej firmy.
|
|
"""
|
|
if not result.found_nip:
|
|
result.error = "NIP nie znaleziony na stronie szczegółów"
|
|
return result
|
|
|
|
matches = []
|
|
|
|
# 1. Sprawdź podobieństwo nazwy
|
|
if result.found_name:
|
|
name_sim = similarity(company.name, result.found_name)
|
|
if name_sim > 0.7:
|
|
matches.append(f"nazwa ({name_sim:.0%})")
|
|
elif name_sim > 0.5:
|
|
matches.append(f"nazwa częściowa ({name_sim:.0%})")
|
|
|
|
# 2. Sprawdź adres/miasto
|
|
if result.found_address and company.address_city:
|
|
if company.address_city.lower() in result.found_address.lower():
|
|
matches.append("miasto")
|
|
|
|
if result.found_address and company.address_street:
|
|
if company.address_street.lower()[:10] in result.found_address.lower():
|
|
matches.append("ulica")
|
|
|
|
# 3. Sprawdź właściciela (jeśli mamy w nazwie)
|
|
if result.found_owner:
|
|
owner_parts = result.found_owner.lower().split()
|
|
company_name_lower = company.name.lower()
|
|
for part in owner_parts:
|
|
if len(part) > 3 and part in company_name_lower:
|
|
matches.append("właściciel w nazwie")
|
|
break
|
|
|
|
# Determine confidence
|
|
result.matches = matches
|
|
|
|
if len(matches) >= 2:
|
|
result.confidence = "high"
|
|
result.verified = True
|
|
elif len(matches) == 1 and "nazwa" in matches[0]:
|
|
result.confidence = "medium"
|
|
result.verified = True
|
|
elif len(matches) == 1:
|
|
result.confidence = "low"
|
|
result.verified = False
|
|
else:
|
|
result.confidence = "low"
|
|
result.verified = False
|
|
|
|
return result
|
|
|
|
|
|
def get_companies_without_nip(db, company_id: int = None) -> List[Company]:
|
|
"""Pobiera firmy bez NIP"""
|
|
query = db.query(Company).filter(
|
|
(Company.nip == None) | (Company.nip == '')
|
|
)
|
|
|
|
if company_id:
|
|
query = query.filter(Company.id == company_id)
|
|
|
|
return query.order_by(Company.name).all()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Search CEIDG by company name")
|
|
parser.add_argument('--id', type=int, help="Search specific company ID")
|
|
parser.add_argument('--apply', action='store_true', help="Apply found NIPs to database")
|
|
parser.add_argument('--limit', type=int, default=50, help="Limit number of companies to search")
|
|
parser.add_argument('--output', type=str, help="Output JSON file path")
|
|
args = parser.parse_args()
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
companies = get_companies_without_nip(db, args.id)
|
|
|
|
if not args.id:
|
|
companies = companies[:args.limit]
|
|
|
|
print(f"\n=== Wyszukiwanie {len(companies)} firm w CEIDG ===\n")
|
|
|
|
results = []
|
|
found_count = 0
|
|
verified_count = 0
|
|
|
|
for i, company in enumerate(companies, 1):
|
|
print(f"[{i}/{len(companies)}] {company.name}")
|
|
|
|
result = search_ceidg(company)
|
|
results.append(result)
|
|
|
|
if result.found_nip:
|
|
found_count += 1
|
|
status = "✓" if result.verified else "?"
|
|
print(f" {status} NIP: {result.found_nip} (confidence: {result.confidence})")
|
|
print(f" Matches: {', '.join(result.matches) if result.matches else 'brak'}")
|
|
|
|
if result.verified:
|
|
verified_count += 1
|
|
|
|
if args.apply:
|
|
company.nip = result.found_nip
|
|
if result.found_regon and not company.regon:
|
|
company.regon = result.found_regon
|
|
db.commit()
|
|
print(f" → Zapisano do bazy")
|
|
elif result.error:
|
|
print(f" ✗ {result.error}")
|
|
|
|
# Rate limiting - CEIDG może blokować
|
|
time.sleep(3)
|
|
|
|
# Save results to JSON
|
|
output_file = args.output or (RESULTS_DIR / f"ceidg_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"\n=== Podsumowanie ===")
|
|
print(f"Przeszukano: {len(companies)} firm")
|
|
print(f"Znaleziono NIP: {found_count}")
|
|
print(f"Zweryfikowano: {verified_count}")
|
|
print(f"Wyniki zapisane: {output_file}")
|
|
|
|
if verified_count > 0 and not args.apply:
|
|
print(f"\nUżyj --apply aby zapisać zweryfikowane NIP do bazy")
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|