#!/usr/bin/env python3 """ KRS Audit Service - Full data extraction from KRS PDF files. Downloads PDF documents from EKRS and extracts complete company data: - Basic info: KRS, NIP, REGON, company name, legal form - Address: full address with email, website - Capital: share capital amount, shares count, nominal value - People: management board, shareholders, procurators - PKD codes: main and secondary business activities - Financial reports: filing dates - Representation rules Author: Norda Biznes Development Team Created: 2026-01-13 """ import re import os import json import logging from pathlib import Path from datetime import datetime, date from dataclasses import dataclass, field, asdict from typing import List, Optional, Dict, Any, Tuple from decimal import Decimal # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: import pdfplumber except ImportError: logger.error("Required library pdfplumber. Install: pip install pdfplumber") raise # ============================================================ # DATA CLASSES # ============================================================ @dataclass class KRSPerson: """Person from KRS (board member, shareholder, procurator)""" nazwisko: str imiona: str pesel: Optional[str] = None rola: str = "" # PREZES ZARZĄDU, CZŁONEK ZARZĄDU, WSPÓLNIK, PROKURENT rola_kategoria: str = "" # zarzad, wspolnik, prokurent # For shareholders udzialy_liczba: Optional[int] = None udzialy_wartosc: Optional[Decimal] = None udzialy_procent: Optional[Decimal] = None def full_name(self) -> str: return f"{self.imiona} {self.nazwisko}" @dataclass class KRSPKD: """PKD code from KRS""" kod: str # e.g., "62.03.Z" opis: str jest_przewazajacy: bool = False @dataclass class KRSFinancialReport: """Financial report filing info from KRS""" okres_od: Optional[date] = None okres_do: Optional[date] = None data_zlozenia: Optional[date] = None @dataclass class KRSAddress: """Company address from KRS""" ulica: Optional[str] = None numer_domu: Optional[str] = None numer_lokalu: Optional[str] = None miejscowosc: Optional[str] = None kod_pocztowy: Optional[str] = None poczta: Optional[str] = None wojewodztwo: Optional[str] = None powiat: Optional[str] = None gmina: Optional[str] = None email: Optional[str] = None www: Optional[str] = None @dataclass class KRSFullData: """Complete data extracted from KRS PDF""" # Identifiers krs: str nip: Optional[str] = None regon: Optional[str] = None # Basic info nazwa: str = "" nazwa_skrocona: Optional[str] = None forma_prawna: Optional[str] = None # Dates data_rejestracji: Optional[date] = None data_umowy_spolki: Optional[date] = None czas_trwania: Optional[str] = None # "NIEOZNACZONY" or date # Address siedziba: Optional[KRSAddress] = None # Capital kapital_zakladowy: Optional[Decimal] = None waluta: str = "PLN" liczba_udzialow: Optional[int] = None wartosc_nominalna_udzialu: Optional[Decimal] = None # Representation sposob_reprezentacji: Optional[str] = None # PKD codes pkd_przewazajacy: Optional[KRSPKD] = None pkd_pozostale: List[KRSPKD] = field(default_factory=list) # People zarzad: List[KRSPerson] = field(default_factory=list) wspolnicy: List[KRSPerson] = field(default_factory=list) prokurenci: List[KRSPerson] = field(default_factory=list) rada_nadzorcza: List[KRSPerson] = field(default_factory=list) # Status czy_opp: bool = False zaleglosci: Optional[str] = None wierzytelnosci: Optional[str] = None # Financial reports sprawozdania_finansowe: List[KRSFinancialReport] = field(default_factory=list) # Metadata zrodlo: str = "ekrs.ms.gov.pl" data_pobrania: Optional[datetime] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" def convert_value(v): if isinstance(v, (date, datetime)): return v.isoformat() if isinstance(v, Decimal): return float(v) if hasattr(v, '__dict__'): return {k: convert_value(val) for k, val in asdict(v).items()} if isinstance(v, list): return [convert_value(item) for item in v] return v return {k: convert_value(v) for k, v in asdict(self).items()} # ============================================================ # PDF PARSING FUNCTIONS # ============================================================ def extract_text_from_pdf(pdf_path: str) -> str: """Extract full text from PDF""" with pdfplumber.open(pdf_path) as pdf: text = "" for page in pdf.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" return text def parse_date(date_str: str) -> Optional[date]: """Parse date from various formats""" if not date_str: return None # Try DD.MM.YYYY format match = re.search(r'(\d{2})\.(\d{2})\.(\d{4})', date_str) if match: try: return date(int(match.group(3)), int(match.group(2)), int(match.group(1))) except ValueError: pass # Try YYYY-MM-DD format match = re.search(r'(\d{4})-(\d{2})-(\d{2})', date_str) if match: try: return date(int(match.group(1)), int(match.group(2)), int(match.group(3))) except ValueError: pass return None def parse_money(money_str: str) -> Optional[Decimal]: """Parse money value from Polish format (e.g., '4.000,00' or '5 000,00')""" if not money_str: return None # Remove spaces cleaned = money_str.replace(' ', '') # Handle Polish format: dot as thousands separator, comma as decimal separator # e.g., "4.000,00" should become "4000.00" # First, check if we have Polish format (comma present) if ',' in cleaned: # Remove dots (thousands separators) and replace comma with dot (decimal separator) cleaned = cleaned.replace('.', '').replace(',', '.') else: # US format or simple integer - dots are decimal separators pass # Extract number match = re.search(r'([\d\.]+)', cleaned) if match: try: return Decimal(match.group(1)) except: pass return None def parse_shares_info(shares_str: str) -> Tuple[Optional[int], Optional[Decimal]]: """Parse shares count and value from string like '80 UDZIAŁÓW O ŁĄCZNEJ WARTOŚCI 4.000,00 ZŁ'""" count = None value = None # Count pattern count_match = re.search(r'(\d+)\s+UDZIAŁ', shares_str, re.IGNORECASE) if count_match: count = int(count_match.group(1)) # Value pattern value_match = re.search(r'WARTOŚCI\s+([\d\s,\.]+)\s*ZŁ', shares_str, re.IGNORECASE) if value_match: value = parse_money(value_match.group(1)) return count, value def extract_person_block(lines: List[str], start_idx: int, role_category: str) -> Optional[KRSPerson]: """Extract person data from PDF text block""" person = KRSPerson(nazwisko="", imiona="", rola_kategoria=role_category) found_nazwisko = False found_imiona = False for i in range(start_idx, min(start_idx + 15, len(lines))): line = lines[i].strip() # Stop at next person block if i > start_idx and ('1.Nazwisko' in line or 'Nazwisko / Nazwa' in line): break # Nazwisko if not found_nazwisko and 'Nazwisko' in line and ' - ' in line: match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ\-]+)$', line) if match: person.nazwisko = match.group(1) found_nazwisko = True # Imiona if not found_imiona and 'Imiona' in line and ' - ' in line: match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', line) if match: person.imiona = match.group(1).strip() found_imiona = True # PESEL if 'PESEL' in line and ' - ' in line: match = re.search(r' - (\d{11})', line) if match: person.pesel = match.group(1) # Function (for board members) if 'Funkcja' in line and ' - ' in line: match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', line) if match: person.rola = match.group(1).strip() # Shares (for shareholders) if 'udziały' in line.lower() and ' - ' in line: match = re.search(r' - (.+)$', line) if match: shares_str = match.group(1) count, value = parse_shares_info(shares_str) person.udzialy_liczba = count person.udzialy_wartosc = value if person.nazwisko and person.imiona: return person return None def parse_krs_pdf_full(pdf_path: str) -> KRSFullData: """ Parse KRS PDF and extract all available data. Args: pdf_path: Path to KRS PDF file Returns: KRSFullData object with all extracted data """ text = extract_text_from_pdf(pdf_path) lines = text.split('\n') data = KRSFullData(krs="", data_pobrania=datetime.now()) # === Basic identifiers === # KRS number krs_match = re.search(r'Numer KRS:\s*(\d{10})', text) if krs_match: data.krs = krs_match.group(1) # NIP and REGON nip_match = re.search(r'NIP:\s*(\d{10})', text) if nip_match: data.nip = nip_match.group(1) regon_match = re.search(r'REGON:\s*(\d{9,14})', text) if regon_match: data.regon = regon_match.group(1) # === Company name and form === # Company name nazwa_match = re.search(r'3\.Firma,?\s+pod którą spółka działa\s+\d+\s+-\s+([^\n]+)', text) if nazwa_match: data.nazwa = nazwa_match.group(1).strip() # Legal form forma_match = re.search(r'1\.Oznaczenie formy prawnej\s+\d+\s+-\s+([^\n]+)', text) if forma_match: data.forma_prawna = forma_match.group(1).strip() # OPP status opp_match = re.search(r'status organizacji\s*pożytku publicznego\?\s+\d+\s+-\s+(TAK|NIE)', text, re.IGNORECASE) if opp_match: data.czy_opp = opp_match.group(1).upper() == 'TAK' # === Registration date === # Find first entry date reg_match = re.search(r'Nr wpisu 1 Data dokonania wpisu (\d{2}\.\d{2}\.\d{4})', text) if reg_match: data.data_rejestracji = parse_date(reg_match.group(1)) # Company agreement date umowa_match = re.search(r'Informacja o zawarciu.+?(\d{2}\.\d{2}\.\d{4})', text, re.DOTALL) if umowa_match: data.data_umowy_spolki = parse_date(umowa_match.group(1)) # Duration czas_match = re.search(r'Czas,?\s+na jaki została utworzona spółka\s+\d+\s+-\s+([^\n]+)', text) if czas_match: data.czas_trwania = czas_match.group(1).strip() # === Address === data.siedziba = KRSAddress() # Full address line adres_match = re.search( r'2\.Adres\s+\d+\s+-\s+ul\.\s*([^,]+),\s*nr\s*(\d+\w*)' r'(?:,\s*lok\.\s*([^,]+))?' r',\s*miejsc\.\s*([^,]+)' r',\s*kod\s*(\d{2}-\d{3})' r',\s*poczta\s*([^,]+)', text, re.IGNORECASE ) if adres_match: data.siedziba.ulica = adres_match.group(1).strip() data.siedziba.numer_domu = adres_match.group(2).strip() if adres_match.group(3): data.siedziba.numer_lokalu = adres_match.group(3).strip() data.siedziba.miejscowosc = adres_match.group(4).strip() data.siedziba.kod_pocztowy = adres_match.group(5).strip() data.siedziba.poczta = adres_match.group(6).strip() # Siedziba (province info) siedziba_match = re.search( r'1\.Siedziba\s+\d+\s+-\s+kraj\s+POLSKA,\s*woj\.\s*([^,]+),\s*powiat\s*([^,]+),\s*gmina\s*([^,]+)', text, re.IGNORECASE ) if siedziba_match: data.siedziba.wojewodztwo = siedziba_match.group(1).strip() data.siedziba.powiat = siedziba_match.group(2).strip() data.siedziba.gmina = siedziba_match.group(3).strip() # Email email_match = re.search(r'3\.Adres poczty elektronicznej\s+\d+\s+-\s+([^\n]+)', text) if email_match: data.siedziba.email = email_match.group(1).strip() # Website www_match = re.search(r'4\.Adres strony internetowej\s+\d+\s+-\s+([^\n]+)', text) if www_match: data.siedziba.www = www_match.group(1).strip() # === Capital === kapital_match = re.search(r'1\.Wysokość kapitału zakładowego\s+\d+\s+-\s+([\d\s,\.]+)\s*ZŁ', text, re.IGNORECASE) if kapital_match: data.kapital_zakladowy = parse_money(kapital_match.group(1)) # === Representation rules === repr_match = re.search(r'2\.Sposób reprezentacji podmiotu\s+\d+\s+-\s+([^\n]+(?:\n[^\n]*)?)', text) if repr_match: # Clean up multiline representation repr_text = repr_match.group(1).strip() # Remove line breaks and extra spaces repr_text = ' '.join(repr_text.split()) data.sposob_reprezentacji = repr_text # === PKD codes === # Main PKD pkd_glowny_match = re.search( r'1\.Przedmiot przeważającej\s+\d+\s+\d+\s+-\s+(\d+),\s*(\d+),\s*([A-Z]),\s*([^\n]+)', text ) if pkd_glowny_match: kod = f"{pkd_glowny_match.group(1)}.{pkd_glowny_match.group(2)}.{pkd_glowny_match.group(3)}" opis = pkd_glowny_match.group(4).strip() data.pkd_przewazajacy = KRSPKD(kod=kod, opis=opis, jest_przewazajacy=True) # Secondary PKDs pkd_pozostale = re.findall( r'2\.Przedmiot pozostałej działalności\s+\d+\s+\d+\s+-\s+(\d+),\s*(\d+),\s*([A-Z]),\s*([^\n]+)', text ) for match in pkd_pozostale: kod = f"{match[0]}.{match[1]}.{match[2]}" opis = match[3].strip() data.pkd_pozostale.append(KRSPKD(kod=kod, opis=opis, jest_przewazajacy=False)) # === People === in_zarzad = False in_wspolnicy = False in_prokurenci = False in_rada = False for i, line in enumerate(lines): line_stripped = line.strip() # Section detection if 'ZARZĄD' in line_stripped.upper() and 'Nazwa organu' in line_stripped: in_zarzad = True in_wspolnicy = False in_prokurenci = False in_rada = False continue if 'Dane wspólników' in line_stripped or ('Rubryka 7' in line_stripped and 'wspólników' in line_stripped.lower()): in_wspolnicy = True in_zarzad = False in_prokurenci = False in_rada = False continue if 'Prokurenci' in line_stripped: in_prokurenci = True in_zarzad = False in_wspolnicy = False in_rada = False continue if 'Organ nadzoru' in line_stripped: in_rada = True in_zarzad = False in_wspolnicy = False in_prokurenci = False continue # New section - reset if 'Dział' in line_stripped and re.match(r'Dział \d+', line_stripped): if 'Dział 2' not in line_stripped and 'Dział 1' not in line_stripped: in_zarzad = False in_wspolnicy = False in_prokurenci = False in_rada = False # Parse person when we find "Nazwisko" if '1.Nazwisko' in line_stripped or 'Nazwisko / Nazwa' in line_stripped: if in_zarzad: person = extract_person_block(lines, i, 'zarzad') if person: if not person.rola: person.rola = "CZŁONEK ZARZĄDU" data.zarzad.append(person) elif in_wspolnicy: person = extract_person_block(lines, i, 'wspolnik') if person: person.rola = "WSPÓLNIK" data.wspolnicy.append(person) elif in_prokurenci: person = extract_person_block(lines, i, 'prokurent') if person: person.rola = "PROKURENT" data.prokurenci.append(person) elif in_rada: person = extract_person_block(lines, i, 'rada_nadzorcza') if person: person.rola = "CZŁONEK RADY NADZORCZEJ" data.rada_nadzorcza.append(person) # Calculate share percentage for shareholders if data.kapital_zakladowy and data.wspolnicy: for wspolnik in data.wspolnicy: if wspolnik.udzialy_wartosc: wspolnik.udzialy_procent = Decimal(str( round(float(wspolnik.udzialy_wartosc) / float(data.kapital_zakladowy) * 100, 2) )) # Calculate total shares total_shares = sum(w.udzialy_liczba or 0 for w in data.wspolnicy) if total_shares > 0: data.liczba_udzialow = total_shares if data.kapital_zakladowy: data.wartosc_nominalna_udzialu = Decimal(str( round(float(data.kapital_zakladowy) / total_shares, 2) )) # === Financial reports === # Parse financial report filings - multiple patterns needed # Format 1: "1.Wzmianka o złożeniu rocznego 1 4 - 22.06.2022 OD 05.02.2021 DO 31.12.2021" # Format 2: "2 6 - 20.06.2023 OD 01.01.2022 DO 31.12.2022" (continuation lines) report_matches = re.findall( r'(\d{2}\.\d{2}\.\d{4})\s+OD\s+(\d{2}\.\d{2}\.\d{4})\s+DO\s+(\d{2}\.\d{2}\.\d{4})', text ) seen_reports = set() for match in report_matches: # Deduplicate by period key = (match[1], match[2]) if key not in seen_reports: seen_reports.add(key) report = KRSFinancialReport( data_zlozenia=parse_date(match[0]), okres_od=parse_date(match[1]), okres_do=parse_date(match[2]) ) data.sprawozdania_finansowe.append(report) # Sort by period end date data.sprawozdania_finansowe.sort(key=lambda r: r.okres_do or date.min) return data # ============================================================ # MAIN ENTRY POINT # ============================================================ def parse_krs_pdf(pdf_path: str, verbose: bool = False) -> Dict[str, Any]: """ Parse KRS PDF and return dictionary with all data. This is the main entry point for external use. Args: pdf_path: Path to KRS PDF file verbose: Print debug information Returns: Dictionary with all extracted data """ if verbose: logger.info(f"Parsing: {pdf_path}") data = parse_krs_pdf_full(pdf_path) if verbose: logger.info(f" KRS: {data.krs}") logger.info(f" Nazwa: {data.nazwa}") logger.info(f" NIP: {data.nip}, REGON: {data.regon}") logger.info(f" Kapitał: {data.kapital_zakladowy} {data.waluta}") logger.info(f" Zarząd: {len(data.zarzad)} osób") logger.info(f" Wspólnicy: {len(data.wspolnicy)} osób") if data.pkd_przewazajacy: logger.info(f" PKD główny: {data.pkd_przewazajacy.kod}") return data.to_dict() def main(): """CLI entry point for testing""" import argparse parser = argparse.ArgumentParser(description="Parse KRS PDF files (full data extraction)") parser.add_argument("--file", type=str, help="Single PDF file to parse") parser.add_argument("--dir", type=str, help="Directory with PDF files") parser.add_argument("--output", type=str, help="Output JSON file") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() results = [] if args.file: data = parse_krs_pdf(args.file, verbose=args.verbose) results.append(data) # Print summary print(f"\n{'='*60}") print(f"{data['nazwa']} (KRS: {data['krs']})") print(f"{'='*60}") print(f"NIP: {data['nip']}, REGON: {data['regon']}") print(f"Forma prawna: {data['forma_prawna']}") print(f"Data rejestracji: {data['data_rejestracji']}") if data['siedziba']: addr = data['siedziba'] print(f"Adres: {addr.get('ulica', '')} {addr.get('numer_domu', '')}, {addr.get('kod_pocztowy', '')} {addr.get('miejscowosc', '')}") if addr.get('email'): print(f"Email: {addr['email']}") if addr.get('www'): print(f"WWW: {addr['www']}") print(f"\nKapitał zakładowy: {data['kapital_zakladowy']} {data['waluta']}") print(f"Liczba udziałów: {data['liczba_udzialow']}") print(f"Wartość nominalna: {data['wartosc_nominalna_udzialu']} {data['waluta']}") print(f"\nSposób reprezentacji: {data['sposob_reprezentacji']}") if data['pkd_przewazajacy']: print(f"\nPKD przeważający: {data['pkd_przewazajacy']['kod']} - {data['pkd_przewazajacy']['opis']}") if data['pkd_pozostale']: print("PKD pozostałe:") for pkd in data['pkd_pozostale']: print(f" - {pkd['kod']}: {pkd['opis']}") print(f"\nZarząd ({len(data['zarzad'])} osób):") for p in data['zarzad']: print(f" - {p['imiona']} {p['nazwisko']} - {p['rola']}") print(f"\nWspólnicy ({len(data['wspolnicy'])} osób):") for p in data['wspolnicy']: shares_info = "" if p.get('udzialy_liczba'): shares_info = f" ({p['udzialy_liczba']} udziałów, {p.get('udzialy_procent', '?')}%)" print(f" - {p['imiona']} {p['nazwisko']}{shares_info}") if data['prokurenci']: print(f"\nProkurenci ({len(data['prokurenci'])} osób):") for p in data['prokurenci']: print(f" - {p['imiona']} {p['nazwisko']}") if data['sprawozdania_finansowe']: print(f"\nSprawozdania finansowe ({len(data['sprawozdania_finansowe'])}):") for sf in data['sprawozdania_finansowe']: print(f" - {sf['okres_od']} do {sf['okres_do']} (złożone: {sf['data_zlozenia']})") elif args.dir: pdf_dir = Path(args.dir) pdf_files = list(pdf_dir.glob("*.pdf")) print(f"Found {len(pdf_files)} PDF files") for pdf_file in pdf_files: try: data = parse_krs_pdf(str(pdf_file), verbose=args.verbose) results.append(data) print(f" ✓ {data['nazwa']} (KRS: {data['krs']})") except Exception as e: print(f" ✗ {pdf_file.name}: {e}") # Save results if args.output and results: with open(args.output, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"\nResults saved to: {args.output}") if __name__ == "__main__": main()