feat: Add company logos to search results, hide events section

- Add company logo display in search results cards
- Make logo clickable (links to company profile)
- Temporarily hide "Aktualności i wydarzenia" section on company profiles
- Add scripts for KRS PDF download/parsing and CEIDG API

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-01-11 15:32:53 +01:00
parent 3e8700f98f
commit 3f9273cff6
8 changed files with 1981 additions and 1 deletions

View File

@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
CEIDG Data Downloader - pobiera dane JDG z portalu CEIDG
Używa Playwright do pobierania danych o jednoosobowych działalnościach
gospodarczych z oficjalnego portalu CEIDG (aplikacja.ceidg.gov.pl).
Dla JDG właściciel = firma, więc wyciągamy:
- Imię i nazwisko właściciela
- Status działalności
- Adres prowadzenia działalności
Usage:
python scripts/download_ceidg_data.py --nip 5881943861
python scripts/download_ceidg_data.py --all # wszystkie JDG z bazy
"""
import os
import sys
import argparse
import time
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
except ImportError:
print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium")
sys.exit(1)
@dataclass
class CEIDGData:
"""Dane z CEIDG"""
nip: str
imiona: str = ""
nazwisko: str = ""
nazwa_firmy: str = ""
status: str = "" # AKTYWNY, ZAWIESZONY, WYKREŚLONY
adres: str = ""
data_rozpoczecia: str = ""
zrodlo: str = "ceidg.gov.pl"
pobrano: str = ""
def to_dict(self):
return asdict(self)
def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]:
"""
Pobiera dane z CEIDG dla podanego NIP.
Returns:
CEIDGData lub None jeśli nie znaleziono
"""
print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...")
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
page = context.new_page()
try:
# Go to CEIDG search page
page.goto("https://aplikacja.ceidg.gov.pl/ceidg/ceidg.public.ui/search.aspx", timeout=30000)
time.sleep(3)
# Wait for page to load
page.wait_for_load_state("networkidle", timeout=15000)
# Find NIP input field
nip_input = page.locator("input[id*='NIP'], input[name*='nip']").first
if not nip_input.is_visible(timeout=5000):
# Try alternative - look for text inputs
nip_input = page.locator("input[type='text']").first
nip_input.fill(nip)
time.sleep(1)
# Click search button
search_btn = page.locator("input[type='submit'][value*='Szukaj'], button:has-text('Szukaj')").first
search_btn.click()
# Wait for results
time.sleep(5)
page.wait_for_load_state("networkidle", timeout=20000)
# Check if we have results
# Look for "Szczegóły" link or result row
details_link = page.locator("a:has-text('Szczegóły'), a[href*='SearchDetails']").first
if details_link.is_visible(timeout=5000):
details_link.click()
time.sleep(3)
page.wait_for_load_state("networkidle", timeout=15000)
# Extract data from details page
data = CEIDGData(nip=nip, pobrano=datetime.now().isoformat())
# Get page content
content = page.content()
# Try to extract data from the page
# Look for specific labels and their values
# Imię i Nazwisko
name_label = page.locator("span:has-text('Imię i nazwisko')").first
if name_label.is_visible(timeout=2000):
# Get the next sibling or parent's text
name_row = name_label.locator("xpath=ancestor::tr").first
if name_row.is_visible():
name_text = name_row.inner_text()
# Parse name from text
if "Imię i nazwisko" in name_text:
parts = name_text.split("Imię i nazwisko")
if len(parts) > 1:
full_name = parts[1].strip()
# Split into first/last name
name_parts = full_name.split()
if len(name_parts) >= 2:
data.nazwisko = name_parts[-1]
data.imiona = " ".join(name_parts[:-1])
# Nazwa firmy
firma_element = page.locator("td:has-text('Firma przedsiębiorcy')").first
if firma_element.is_visible(timeout=2000):
firma_row = firma_element.locator("xpath=following-sibling::td").first
if firma_row.is_visible():
data.nazwa_firmy = firma_row.inner_text().strip()
# Status
status_element = page.locator("td:has-text('Status')").first
if status_element.is_visible(timeout=2000):
status_value = status_element.locator("xpath=following-sibling::td").first
if status_value.is_visible():
data.status = status_value.inner_text().strip()
# If we didn't get structured data, try to get raw text
if not data.imiona and not data.nazwisko:
# Get all text from the page and parse
page_text = page.inner_text("body")
# Look for common patterns
import re
# Pattern: "Imię i nazwisko: JAN KOWALSKI"
name_match = re.search(r'Imię i nazwisko[:\s]+([A-ZĄĆĘŁŃÓŚŹŻ]+\s+[A-ZĄĆĘŁŃÓŚŹŻ]+)', page_text, re.IGNORECASE)
if name_match:
full_name = name_match.group(1).strip()
parts = full_name.split()
if len(parts) >= 2:
data.imiona = " ".join(parts[:-1])
data.nazwisko = parts[-1]
if data.imiona or data.nazwisko or data.nazwa_firmy:
print(f" [OK] Znaleziono: {data.imiona} {data.nazwisko}")
return data
else:
print(f" [WARN] Nie udało się wyciągnąć danych ze strony")
# Save screenshot for debugging
page.screenshot(path=f"/tmp/ceidg_debug_{nip}.png")
return None
else:
print(f" [ERROR] Nie znaleziono wpisu dla NIP {nip}")
return None
except PlaywrightTimeout as e:
print(f" [ERROR] Timeout dla NIP {nip}: {e}")
return None
except Exception as e:
print(f" [ERROR] Błąd dla NIP {nip}: {e}")
return None
finally:
browser.close()
def main():
parser = argparse.ArgumentParser(description="Download CEIDG data for JDG companies")
parser.add_argument("--nip", type=str, help="Single NIP to fetch")
parser.add_argument("--all", action="store_true", help="Fetch all JDG from database")
parser.add_argument("--output", type=str, help="Output JSON file")
args = parser.parse_args()
results = []
if args.nip:
data = fetch_ceidg_data(args.nip)
if data:
results.append(data.to_dict())
print(f"\n=== {data.imiona} {data.nazwisko} ===")
print(f" Firma: {data.nazwa_firmy}")
print(f" Status: {data.status}")
print(f" NIP: {data.nip}")
elif args.all:
# Load environment and import database
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / '.env')
from database import SessionLocal, Company
db = SessionLocal()
try:
# Get JDG companies (no KRS)
jdg_companies = db.query(Company).filter(
(Company.krs.is_(None)) | (Company.krs == ''),
Company.nip.isnot(None),
Company.nip != ''
).all()
print(f"Znaleziono {len(jdg_companies)} firm JDG\n")
for i, company in enumerate(jdg_companies):
print(f"[{i+1}/{len(jdg_companies)}] {company.name}")
data = fetch_ceidg_data(company.nip)
if data:
results.append(data.to_dict())
time.sleep(3) # Rate limiting
finally:
db.close()
else:
parser.print_help()
return
# Save results
if args.output and results:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\nWyniki zapisane do: {args.output}")
elif results:
print("\n=== JSON OUTPUT ===")
print(json.dumps(results, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

201
scripts/download_krs_pdf.py Normal file
View File

@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
KRS PDF Downloader - pobiera odpisy pełne z portalu PRS
Używa Playwright do automatycznego pobierania PDF z oficjalnego
portalu Ministerstwa Sprawiedliwości (prs.ms.gov.pl).
Pliki PDF zawierają PEŁNE dane (niezanonimizowane), w przeciwieństwie
do API które zwraca dane zanonimizowane.
Usage:
python scripts/download_krs_pdf.py --krs 0000725183
python scripts/download_krs_pdf.py --all # wszystkie firmy z bazy
"""
import os
import sys
import argparse
import time
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
except ImportError:
print("Playwright nie jest zainstalowany. Uruchom: pip install playwright && playwright install chromium")
sys.exit(1)
# Output directory for PDFs
PDF_OUTPUT_DIR = Path(__file__).parent.parent / "data" / "krs_pdfs"
def download_krs_pdf(krs_number: str, output_dir: Path = PDF_OUTPUT_DIR) -> str:
"""
Download full KRS extract PDF from wyszukiwarka-krs.ms.gov.pl
Args:
krs_number: KRS number (with or without leading zeros)
output_dir: Directory to save PDF
Returns:
Path to downloaded PDF file
"""
# Normalize KRS number
krs = krs_number.zfill(10)
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f"odpis_pelny_{krs}.pdf"
# Skip if already downloaded
if output_file.exists():
print(f" [SKIP] PDF już istnieje: {output_file}")
return str(output_file)
print(f" [INFO] Pobieranie odpisu pełnego dla KRS {krs}...")
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
accept_downloads=True,
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
page = context.new_page()
try:
# Go to KRS search page - wyszukiwarka-krs.ms.gov.pl
page.goto("https://wyszukiwarka-krs.ms.gov.pl/", timeout=30000)
time.sleep(3)
# Wait for page to load
page.wait_for_load_state("networkidle", timeout=15000)
# Find visible text input (skip hidden checkbox inputs)
# The KRS input is typically the first visible text input
search_inputs = page.locator("input[type='text']:visible")
search_input = search_inputs.first
# Fill KRS number
search_input.fill(krs)
time.sleep(1)
# Click search button
search_btn = page.locator("button:has-text('Szukaj')").first
search_btn.click()
# Wait for results
time.sleep(5)
page.wait_for_load_state("networkidle", timeout=20000)
# Click on "Wyświetl szczegóły" to see details
details_btn = page.locator("button:has-text('Wyświetl szczegóły'), a:has-text('Wyświetl szczegóły')").first
if details_btn.is_visible(timeout=5000):
details_btn.click()
time.sleep(3)
page.wait_for_load_state("networkidle", timeout=15000)
# Find PDF download buttons - look for "Pobierz PDF"
# There are usually 2: "Informacja skrócona" and "Informacja pełna"
# We want "Informacja pełna" (the second one)
pdf_buttons = page.locator("button:has-text('Pobierz PDF')")
if pdf_buttons.count() >= 2:
# Click the second PDF button (Informacja pełna)
with page.expect_download(timeout=30000) as download_info:
pdf_buttons.nth(1).click()
download = download_info.value
download.save_as(str(output_file))
print(f" [OK] Zapisano: {output_file}")
return str(output_file)
elif pdf_buttons.count() == 1:
# Only one button, use it
with page.expect_download(timeout=30000) as download_info:
pdf_buttons.first.click()
download = download_info.value
download.save_as(str(output_file))
print(f" [OK] Zapisano: {output_file}")
return str(output_file)
else:
print(f" [ERROR] Nie znaleziono przycisku PDF dla KRS {krs}")
page.screenshot(path=str(output_dir / f"debug_{krs}.png"))
return None
except PlaywrightTimeout as e:
print(f" [ERROR] Timeout dla KRS {krs}: {e}")
page.screenshot(path=str(output_dir / f"timeout_{krs}.png"))
return None
except Exception as e:
print(f" [ERROR] Błąd dla KRS {krs}: {e}")
page.screenshot(path=str(output_dir / f"error_{krs}.png"))
return None
finally:
browser.close()
def get_all_krs_numbers():
"""Get all KRS numbers from database"""
from database import SessionLocal, Company
db = SessionLocal()
try:
companies = db.query(Company).filter(
Company.status == 'active',
Company.krs.isnot(None),
Company.krs != ''
).all()
return [(c.krs, c.name) for c in companies]
finally:
db.close()
def main():
parser = argparse.ArgumentParser(description="Download KRS PDF extracts")
parser.add_argument("--krs", type=str, help="Single KRS number to download")
parser.add_argument("--all", action="store_true", help="Download all KRS from database")
parser.add_argument("--output", type=str, default=str(PDF_OUTPUT_DIR), help="Output directory")
args = parser.parse_args()
output_dir = Path(args.output)
if args.krs:
# Download single KRS
result = download_krs_pdf(args.krs, output_dir)
if result:
print(f"\nPobrano: {result}")
else:
print("\nBłąd pobierania")
sys.exit(1)
elif args.all:
# Download all from database
print("Pobieranie wszystkich firm z KRS z bazy danych...")
companies = get_all_krs_numbers()
print(f"Znaleziono {len(companies)} firm z numerem KRS\n")
success = 0
failed = 0
for krs, name in companies:
print(f"[{success + failed + 1}/{len(companies)}] {name}")
result = download_krs_pdf(krs, output_dir)
if result:
success += 1
else:
failed += 1
# Rate limiting - be nice to the server
time.sleep(2)
print(f"\n=== PODSUMOWANIE ===")
print(f"Pobrano: {success}")
print(f"Błędy: {failed}")
print(f"Pliki zapisane w: {output_dir}")
else:
parser.print_help()
if __name__ == "__main__":
main()

342
scripts/fetch_ceidg_api.py Normal file
View File

@ -0,0 +1,342 @@
#!/usr/bin/env python3
"""
CEIDG API v3 Client - pobiera dane właścicieli JDG
Używa oficjalnego API CEIDG v3 (dane.biznes.gov.pl) do pobierania
danych o jednoosobowych działalnościach gospodarczych.
Usage:
python scripts/fetch_ceidg_api.py --nip 5881571773
python scripts/fetch_ceidg_api.py --all # wszystkie JDG z bazy
python scripts/fetch_ceidg_api.py --all --import # pobierz i importuj do bazy
"""
import os
import sys
import argparse
import json
import time
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, List
import requests
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
# Load environment
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / '.env')
# API Configuration
CEIDG_API_URL = "https://dane.biznes.gov.pl/api/ceidg/v3/firma"
CEIDG_API_KEY = os.getenv("CEIDG_API_KEY")
# Output directory for JSON cache
JSON_OUTPUT_DIR = Path(__file__).parent.parent / "data" / "ceidg_json"
@dataclass
class CEIDGOwner:
"""Dane właściciela JDG z CEIDG"""
imie: str
nazwisko: str
nip: str
regon: str = ""
def to_dict(self):
return asdict(self)
@dataclass
class CEIDGData:
"""Dane firmy z CEIDG API v3"""
id: str
nazwa: str
nip: str
regon: str = ""
wlasciciel: Optional[CEIDGOwner] = None
adres_miasto: str = ""
adres_ulica: str = ""
adres_kod: str = ""
pkd_glowny: str = ""
pkd_opis: str = ""
data_rozpoczecia: str = ""
status: str = ""
zrodlo: str = "dane.biznes.gov.pl"
pobrano: str = ""
def to_dict(self):
d = asdict(self)
if self.wlasciciel:
d['wlasciciel'] = self.wlasciciel.to_dict()
return d
def fetch_ceidg_data(nip: str) -> Optional[CEIDGData]:
"""
Pobiera dane z CEIDG API v3 dla podanego NIP.
Returns:
CEIDGData lub None jeśli nie znaleziono
"""
if not CEIDG_API_KEY:
print(" [ERROR] Brak CEIDG_API_KEY w .env")
return None
print(f" [INFO] Pobieranie danych CEIDG dla NIP {nip}...")
headers = {
"Authorization": f"Bearer {CEIDG_API_KEY}",
"Accept": "application/json"
}
try:
response = requests.get(
CEIDG_API_URL,
params={"nip": nip},
headers=headers,
timeout=30
)
if response.status_code == 204:
print(f" [WARN] Brak danych w CEIDG dla NIP {nip}")
return None
if response.status_code == 401:
print(f" [ERROR] Błąd autoryzacji - sprawdź CEIDG_API_KEY")
return None
if response.status_code != 200:
print(f" [ERROR] HTTP {response.status_code}: {response.text[:100]}")
return None
data = response.json()
if "firma" not in data or not data["firma"]:
print(f" [WARN] Brak danych firmy w odpowiedzi")
return None
firma = data["firma"][0]
# Parse owner data
owner = None
if "wlasciciel" in firma:
w = firma["wlasciciel"]
owner = CEIDGOwner(
imie=w.get("imie", ""),
nazwisko=w.get("nazwisko", ""),
nip=w.get("nip", nip),
regon=w.get("regon", "")
)
# Parse address
adres = firma.get("adresDzialalnosci", {})
adres_ulica = ""
if adres.get("ulica"):
adres_ulica = adres.get("ulica", "")
if adres.get("budynek"):
adres_ulica += f" {adres.get('budynek')}"
if adres.get("lokal"):
adres_ulica += f"/{adres.get('lokal')}"
# Parse PKD
pkd_glowny = firma.get("pkdGlowny", {})
ceidg_data = CEIDGData(
id=firma.get("id", ""),
nazwa=firma.get("nazwa", ""),
nip=nip,
regon=owner.regon if owner else "",
wlasciciel=owner,
adres_miasto=adres.get("miasto", ""),
adres_ulica=adres_ulica,
adres_kod=adres.get("kod", ""),
pkd_glowny=pkd_glowny.get("kod", ""),
pkd_opis=pkd_glowny.get("nazwa", ""),
data_rozpoczecia=firma.get("dataRozpoczecia", ""),
status=firma.get("status", ""),
pobrano=datetime.now().isoformat()
)
if owner:
print(f" [OK] {owner.imie} {owner.nazwisko} ({ceidg_data.status})")
else:
print(f" [OK] {ceidg_data.nazwa} ({ceidg_data.status})")
return ceidg_data
except requests.RequestException as e:
print(f" [ERROR] Błąd połączenia: {e}")
return None
except json.JSONDecodeError as e:
print(f" [ERROR] Błąd parsowania JSON: {e}")
return None
def import_to_database(results: List[CEIDGData]) -> dict:
"""
Importuje dane właścicieli JDG do bazy danych.
Returns:
dict z podsumowaniem importu
"""
from database import SessionLocal, Company, Person, CompanyPerson
db = SessionLocal()
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
try:
for data in results:
if not data.wlasciciel:
stats["skipped"] += 1
continue
owner = data.wlasciciel
# Find company by NIP
company = db.query(Company).filter(Company.nip == data.nip).first()
if not company:
print(f" [SKIP] Firma z NIP {data.nip} nie istnieje w bazie")
stats["skipped"] += 1
continue
# Find or create person (by name since JDG owners don't have PESEL in API)
person = db.query(Person).filter(
Person.nazwisko == owner.nazwisko,
Person.imiona == owner.imie
).first()
if not person:
person = Person(
imiona=owner.imie,
nazwisko=owner.nazwisko,
pesel=None # CEIDG API doesn't return PESEL
)
db.add(person)
db.flush()
print(f" [NEW] Utworzono osobę: {owner.imie} {owner.nazwisko}")
# Check if relationship already exists
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id,
CompanyPerson.person_id == person.id,
CompanyPerson.role_category == "wlasciciel_jdg"
).first()
if existing:
# Update source if needed
if existing.source != "dane.biznes.gov.pl":
existing.source = "dane.biznes.gov.pl"
existing.fetched_at = datetime.now()
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
# Create new relationship
company_person = CompanyPerson(
company_id=company.id,
person_id=person.id,
role="WŁAŚCICIEL",
role_category="wlasciciel_jdg",
source="dane.biznes.gov.pl",
fetched_at=datetime.now()
)
db.add(company_person)
stats["imported"] += 1
print(f" [ADD] {owner.imie} {owner.nazwisko}{company.name}")
db.commit()
except Exception as e:
db.rollback()
print(f" [ERROR] Błąd importu: {e}")
stats["errors"] += 1
finally:
db.close()
return stats
def main():
parser = argparse.ArgumentParser(description="Fetch JDG owner data from CEIDG API v3")
parser.add_argument("--nip", type=str, help="Single NIP to fetch")
parser.add_argument("--all", action="store_true", help="Fetch all JDG from database")
parser.add_argument("--import", dest="do_import", action="store_true",
help="Import fetched data to database")
parser.add_argument("--output", type=str, help="Output JSON file")
args = parser.parse_args()
results = []
if args.nip:
data = fetch_ceidg_data(args.nip)
if data:
results.append(data)
print(f"\n=== {data.nazwa} ===")
if data.wlasciciel:
print(f" Właściciel: {data.wlasciciel.imie} {data.wlasciciel.nazwisko}")
print(f" Status: {data.status}")
print(f" PKD: {data.pkd_glowny} - {data.pkd_opis}")
print(f" Adres: {data.adres_ulica}, {data.adres_kod} {data.adres_miasto}")
elif args.all:
from database import SessionLocal, Company
db = SessionLocal()
try:
# Get JDG companies (no KRS)
jdg_companies = db.query(Company).filter(
(Company.krs.is_(None)) | (Company.krs == ''),
Company.nip.isnot(None),
Company.nip != ''
).all()
print(f"Znaleziono {len(jdg_companies)} firm JDG\n")
success = 0
failed = 0
for i, company in enumerate(jdg_companies):
print(f"[{i+1}/{len(jdg_companies)}] {company.name}")
data = fetch_ceidg_data(company.nip)
if data:
results.append(data)
success += 1
else:
failed += 1
time.sleep(0.5) # Rate limiting
print(f"\n=== PODSUMOWANIE ===")
print(f"Pobrano: {success}")
print(f"Błędy/brak danych: {failed}")
finally:
db.close()
else:
parser.print_help()
return
# Save to JSON cache
if results:
JSON_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_file = args.output or str(JSON_OUTPUT_DIR / f"ceidg_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump([r.to_dict() for r in results], f, ensure_ascii=False, indent=2)
print(f"\nDane zapisane do: {output_file}")
# Import to database if requested
if args.do_import and results:
print("\n=== IMPORT DO BAZY ===")
stats = import_to_database(results)
print(f"\nZaimportowano: {stats['imported']}")
print(f"Zaktualizowano: {stats['updated']}")
print(f"Pominięto: {stats['skipped']}")
print(f"Błędy: {stats['errors']}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,292 @@
#!/usr/bin/env python3
"""
Import danych osób z odpisu KRS do bazy danych.
Używa parse_krs_pdf.py do wyciągania danych z PDF i importuje je do tabel:
- people: osoby (zarząd, wspólnicy, prokurenci)
- company_people: relacje osoba-firma
Usage:
python scripts/import_krs_people.py --file /path/to/odpis.pdf --company-id 26
python scripts/import_krs_people.py --dir /path/to/pdfs/
"""
import os
import sys
import argparse
from pathlib import Path
from datetime import datetime
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
# Load environment variables
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / '.env')
from database import SessionLocal, Company, Person, CompanyPerson
from parse_krs_pdf import parse_krs_pdf, KRSData
def get_or_create_person(db, nazwisko: str, imiona: str, pesel: str = None) -> Person:
"""
Znajdź istniejącą osobę lub utwórz nową.
Jeśli PESEL podany, szuka po PESEL (unikalne).
W przeciwnym razie szuka po nazwisku i imionach.
"""
if pesel:
person = db.query(Person).filter(Person.pesel == pesel).first()
if person:
return person
# Szukaj po nazwisku i imionach (jeśli brak PESEL lub nie znaleziono)
person = db.query(Person).filter(
Person.nazwisko == nazwisko,
Person.imiona == imiona
).first()
if person:
# Jeśli znaleziono osobę bez PESEL, a teraz mamy PESEL - aktualizuj
if pesel and not person.pesel:
person.pesel = pesel
db.flush()
return person
# Utwórz nową osobę
person = Person(
nazwisko=nazwisko,
imiona=imiona,
pesel=pesel
)
db.add(person)
db.flush() # Aby uzyskać ID
return person
def find_company_by_krs(db, krs: str) -> Company:
"""Znajdź firmę po numerze KRS."""
return db.query(Company).filter(Company.krs == krs).first()
def find_company_by_nip(db, nip: str) -> Company:
"""Znajdź firmę po numerze NIP."""
return db.query(Company).filter(Company.nip == nip).first()
def import_krs_data(db, krs_data: KRSData, company: Company, pdf_filename: str) -> dict:
"""
Importuje dane z odpisu KRS do bazy danych.
Returns:
dict z podsumowaniem importu
"""
stats = {
'zarzad_added': 0,
'wspolnicy_added': 0,
'prokurenci_added': 0,
'people_created': 0,
'people_updated': 0,
'skipped': 0
}
now = datetime.now()
# Import zarządu
for p in krs_data.zarzad:
person = get_or_create_person(db, p.nazwisko, p.imiona, p.pesel)
# Sprawdź czy relacja już istnieje
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id,
CompanyPerson.person_id == person.id,
CompanyPerson.role_category == 'zarzad',
CompanyPerson.role == p.rola
).first()
if not existing:
cp = CompanyPerson(
company_id=company.id,
person_id=person.id,
role=p.rola or 'CZŁONEK ZARZĄDU',
role_category='zarzad',
source='ekrs.ms.gov.pl',
source_document=pdf_filename,
fetched_at=now
)
db.add(cp)
stats['zarzad_added'] += 1
else:
stats['skipped'] += 1
# Import wspólników
for p in krs_data.wspolnicy:
person = get_or_create_person(db, p.nazwisko, p.imiona, p.pesel)
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id,
CompanyPerson.person_id == person.id,
CompanyPerson.role_category == 'wspolnik'
).first()
if not existing:
cp = CompanyPerson(
company_id=company.id,
person_id=person.id,
role='WSPÓLNIK',
role_category='wspolnik',
source='ekrs.ms.gov.pl',
source_document=pdf_filename,
fetched_at=now
)
db.add(cp)
stats['wspolnicy_added'] += 1
else:
stats['skipped'] += 1
# Import prokurentów
for p in krs_data.prokurenci:
person = get_or_create_person(db, p.nazwisko, p.imiona, p.pesel)
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id,
CompanyPerson.person_id == person.id,
CompanyPerson.role_category == 'prokurent'
).first()
if not existing:
cp = CompanyPerson(
company_id=company.id,
person_id=person.id,
role='PROKURENT',
role_category='prokurent',
source='ekrs.ms.gov.pl',
source_document=pdf_filename,
fetched_at=now
)
db.add(cp)
stats['prokurenci_added'] += 1
else:
stats['skipped'] += 1
return stats
def import_from_file(pdf_path: str, company_id: int = None, dry_run: bool = False):
"""
Importuje dane z pojedynczego pliku PDF.
"""
print(f"\n{'='*60}")
print(f"Przetwarzanie: {pdf_path}")
print('='*60)
# Parsuj PDF
try:
krs_data = parse_krs_pdf(pdf_path)
except Exception as e:
print(f" [ERROR] Błąd parsowania: {e}")
return None
print(f" Nazwa: {krs_data.nazwa}")
print(f" KRS: {krs_data.krs}")
print(f" NIP: {krs_data.nip}")
print(f" Zarząd: {len(krs_data.zarzad)} osób")
print(f" Wspólnicy: {len(krs_data.wspolnicy)} osób")
print(f" Prokurenci: {len(krs_data.prokurenci)} osób")
if dry_run:
print(" [DRY-RUN] Pomijam zapis do bazy")
return krs_data
db = SessionLocal()
try:
# Znajdź firmę w bazie
company = None
if company_id:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
print(f" [ERROR] Firma o ID {company_id} nie istnieje")
return None
elif krs_data.krs:
company = find_company_by_krs(db, krs_data.krs)
if not company and krs_data.nip:
company = find_company_by_nip(db, krs_data.nip)
if not company:
print(f" [ERROR] Nie znaleziono firmy w bazie (KRS: {krs_data.krs}, NIP: {krs_data.nip})")
return None
print(f" Firma w bazie: {company.name} (ID: {company.id})")
# Import danych
pdf_filename = Path(pdf_path).name
stats = import_krs_data(db, krs_data, company, pdf_filename)
db.commit()
print(f"\n [OK] Import zakończony:")
print(f" Zarząd: +{stats['zarzad_added']}")
print(f" Wspólnicy: +{stats['wspolnicy_added']}")
print(f" Prokurenci: +{stats['prokurenci_added']}")
print(f" Pominięto (duplikaty): {stats['skipped']}")
return krs_data
except Exception as e:
db.rollback()
print(f" [ERROR] Błąd importu: {e}")
raise
finally:
db.close()
def import_from_directory(dir_path: str, dry_run: bool = False):
"""
Importuje dane ze wszystkich PDF w katalogu.
"""
pdf_dir = Path(dir_path)
pdf_files = sorted(pdf_dir.glob("odpis_*.pdf"))
print(f"Znaleziono {len(pdf_files)} plików PDF")
success = 0
errors = 0
for pdf_file in pdf_files:
try:
result = import_from_file(str(pdf_file), dry_run=dry_run)
if result:
success += 1
else:
errors += 1
except Exception as e:
print(f" [ERROR] {e}")
errors += 1
print(f"\n{'='*60}")
print("PODSUMOWANIE")
print('='*60)
print(f" Sukces: {success}")
print(f" Błędy: {errors}")
print(f" Łącznie: {len(pdf_files)}")
def main():
parser = argparse.ArgumentParser(description="Import KRS people data to database")
parser.add_argument("--file", type=str, help="Single PDF file to import")
parser.add_argument("--dir", type=str, help="Directory with PDF files")
parser.add_argument("--company-id", type=int, help="Force company ID (for --file only)")
parser.add_argument("--dry-run", action="store_true", help="Parse only, don't save to database")
args = parser.parse_args()
if args.file:
import_from_file(args.file, company_id=args.company_id, dry_run=args.dry_run)
elif args.dir:
import_from_directory(args.dir, dry_run=args.dry_run)
else:
parser.print_help()
if __name__ == "__main__":
main()

279
scripts/parse_krs_pdf.py Normal file
View File

@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
KRS PDF Parser - wyciąga dane zarządu i wspólników z odpisu KRS
Parsuje odpisy pełne pobrane z ekrs.ms.gov.pl i wyciąga:
- Członków zarządu (funkcja, imię, nazwisko, PESEL)
- Wspólników (imię, nazwisko, PESEL, udziały)
- Prokurentów
Usage:
python scripts/parse_krs_pdf.py --file /path/to/odpis.pdf
python scripts/parse_krs_pdf.py --dir /path/to/pdfs/
"""
import re
import json
import argparse
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
try:
import pdfplumber
except ImportError:
print("Wymagana biblioteka pdfplumber. Zainstaluj: pip install pdfplumber")
exit(1)
@dataclass
class Person:
"""Osoba powiązana z firmą"""
nazwisko: str
imiona: str
pesel: Optional[str] = None
rola: str = "" # PREZES ZARZĄDU, CZŁONEK ZARZĄDU, WSPÓLNIK, PROKURENT
udzialy: Optional[str] = None # dla wspólników
def full_name(self) -> str:
return f"{self.imiona} {self.nazwisko}"
@dataclass
class KRSData:
"""Dane wyciągnięte z odpisu KRS"""
krs: str
nazwa: str
nip: Optional[str] = None
regon: Optional[str] = None
zarzad: List[Person] = None
wspolnicy: List[Person] = None
prokurenci: List[Person] = None
zrodlo: str = "ekrs.ms.gov.pl"
def __post_init__(self):
if self.zarzad is None:
self.zarzad = []
if self.wspolnicy is None:
self.wspolnicy = []
if self.prokurenci is None:
self.prokurenci = []
def to_dict(self) -> Dict[str, Any]:
return {
'krs': self.krs,
'nazwa': self.nazwa,
'nip': self.nip,
'regon': self.regon,
'zarzad': [asdict(p) for p in self.zarzad],
'wspolnicy': [asdict(p) for p in self.wspolnicy],
'prokurenci': [asdict(p) for p in self.prokurenci],
'zrodlo': self.zrodlo
}
def extract_text_from_pdf(pdf_path: str) -> str:
"""Wyciąga tekst z PDF"""
with pdfplumber.open(pdf_path) as pdf:
text = ""
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
def parse_person_block(lines: List[str], start_idx: int) -> Optional[Person]:
"""
Parsuje blok danych osoby z linii PDF
Format w PDF:
1.Nazwisko / Nazwa lub firma 1 - NAZWISKO
2.Imiona 1 - IMIĘ DRUGIE_IMIĘ
3.Numer PESEL/REGON lub data 1 - 12345678901, ------
"""
person = Person(nazwisko="", imiona="")
found_nazwisko = False
found_imiona = False
found_pesel = False
for i in range(start_idx, min(start_idx + 10, len(lines))):
line = lines[i].strip()
# Wykryj początek następnej osoby - przestań parsować
if i > start_idx and ('1.Nazwisko' in line or 'Nazwisko / Nazwa' in line):
# Początek nowej osoby - koniec bloku
break
# Nazwisko (tylko pierwsze znalezione)
if not found_nazwisko and 'Nazwisko' in line and ' - ' in line:
match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ\-]+)$', line)
if match:
person.nazwisko = match.group(1)
found_nazwisko = True
# Imiona (tylko pierwsze znalezione)
if not found_imiona and 'Imiona' in line and ' - ' in line:
match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', line)
if match:
person.imiona = match.group(1).strip()
found_imiona = True
# PESEL (tylko pierwsze znalezione)
if not found_pesel and 'PESEL' in line and ' - ' in line:
match = re.search(r' - (\d{11})', line)
if match:
person.pesel = match.group(1)
found_pesel = True
# Funkcja (dla zarządu)
if 'Funkcja' in line and ' - ' in line:
match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', line)
if match:
person.rola = match.group(1).strip()
if person.nazwisko and person.imiona:
return person
return None
def parse_krs_pdf(pdf_path: str) -> KRSData:
"""
Parsuje odpis KRS i wyciąga dane
"""
text = extract_text_from_pdf(pdf_path)
lines = text.split('\n')
# Extract basic info
krs_match = re.search(r'Numer KRS:\s*(\d{10})', text)
krs = krs_match.group(1) if krs_match else ""
# Find company name - format: "3.Firma, pod którą spółka działa 1 - NAZWA FIRMY"
nazwa = ""
nazwa_match = re.search(r'3\.Firma,?\s+pod którą spółka działa\s+\d+\s+-\s+([^\n]+)', text)
if nazwa_match:
nazwa = nazwa_match.group(1).strip()
# NIP and REGON - format: "REGON: 369796786, NIP: 5862329746"
nip_match = re.search(r'NIP:\s*(\d{10})', text)
nip = nip_match.group(1) if nip_match else None
regon_match = re.search(r'REGON:\s*(\d{9,14})', text)
regon = regon_match.group(1) if regon_match else None
data = KRSData(krs=krs, nazwa=nazwa, nip=nip, regon=regon)
# Parse sections
in_zarzad = False
in_wspolnicy = False
in_prokurenci = False
for i, line in enumerate(lines):
line_stripped = line.strip()
# Detect sections
if 'ZARZĄD' in line_stripped.upper() and 'Nazwa organu' in line_stripped:
in_zarzad = True
in_wspolnicy = False
in_prokurenci = False
continue
# Wspólnicy - szukaj "Dane wspólników" żeby nie łapać "Wspólnik może mieć:"
if 'Dane wspólników' in line_stripped or 'WSPÓLNICY' in line_stripped.upper():
in_wspolnicy = True
in_zarzad = False
in_prokurenci = False
if 'PROKURENCI' in line_stripped.upper() or 'Prokurent' in line_stripped:
in_prokurenci = True
in_zarzad = False
in_wspolnicy = False
# Parse person data when we find "Nazwisko"
if '1.Nazwisko' in line_stripped or 'Nazwisko / Nazwa' in line_stripped:
person = parse_person_block(lines, i)
if person:
if in_zarzad:
# Look for function in nearby lines
for j in range(i, min(i + 8, len(lines))):
if 'Funkcja' in lines[j] and ' - ' in lines[j]:
func_match = re.search(r' - ([A-ZĄĆĘŁŃÓŚŹŻ ]+)$', lines[j])
if func_match:
person.rola = func_match.group(1).strip()
break
if not person.rola:
person.rola = "CZŁONEK ZARZĄDU"
data.zarzad.append(person)
elif in_wspolnicy:
person.rola = "WSPÓLNIK"
# Look for share info
for j in range(i, min(i + 10, len(lines))):
if 'udziałów' in lines[j].lower() or 'udział' in lines[j].lower():
data.wspolnicy.append(person)
break
else:
data.wspolnicy.append(person)
elif in_prokurenci:
person.rola = "PROKURENT"
data.prokurenci.append(person)
return data
def main():
parser = argparse.ArgumentParser(description="Parse KRS PDF files")
parser.add_argument("--file", type=str, help="Single PDF file to parse")
parser.add_argument("--dir", type=str, help="Directory with PDF files")
parser.add_argument("--output", type=str, help="Output JSON file")
args = parser.parse_args()
results = []
if args.file:
print(f"Parsing: {args.file}")
data = parse_krs_pdf(args.file)
results.append(data.to_dict())
# Print summary
print(f"\n=== {data.nazwa} (KRS: {data.krs}) ===")
print(f"NIP: {data.nip}, REGON: {data.regon}")
print(f"\nZarząd ({len(data.zarzad)} osób):")
for p in data.zarzad:
print(f" - {p.full_name()} - {p.rola}")
print(f"\nWspólnicy ({len(data.wspolnicy)} osób):")
for p in data.wspolnicy:
print(f" - {p.full_name()}")
if data.prokurenci:
print(f"\nProkurenci ({len(data.prokurenci)} osób):")
for p in data.prokurenci:
print(f" - {p.full_name()}")
elif args.dir:
pdf_dir = Path(args.dir)
pdf_files = list(pdf_dir.glob("*.pdf"))
print(f"Found {len(pdf_files)} PDF files")
for pdf_file in pdf_files:
print(f"Parsing: {pdf_file.name}...")
try:
data = parse_krs_pdf(str(pdf_file))
results.append(data.to_dict())
print(f" OK: {data.nazwa}")
except Exception as e:
print(f" ERROR: {e}")
# Save results
if args.output and results:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\nResults saved to: {args.output}")
elif results:
print("\n=== JSON OUTPUT ===")
print(json.dumps(results, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -2261,7 +2261,7 @@
</div>
{% endif %}
<!-- Company Events -->
{# Company Events - UKRYTE (2026-01-11) - do przywrócenia w przyszłości
{% if events %}
<div class="company-section">
<h2 class="section-title">
@ -2320,6 +2320,7 @@
{% endfor %}
</div>
{% endif %}
#}
<!-- Universal Confirm Modal -->
<div class="modal-overlay" id="confirmModal">

View File

@ -0,0 +1,589 @@
{% extends "base.html" %}
{% block title %}Mapa Powiazań - Norda Biznes Hub{% endblock %}
{% block extra_css %}
<style>
.connections-container {
width: 100%;
height: calc(100vh - 200px);
min-height: 600px;
background: #1a1a2e;
border-radius: 12px;
position: relative;
overflow: hidden;
}
#connections-graph {
width: 100%;
height: 100%;
}
.node-company {
cursor: pointer;
}
.node-person {
cursor: pointer;
}
.link {
stroke: #4a90d9;
stroke-opacity: 0.4;
}
.link.zarzad {
stroke: #e74c3c;
}
.link.wspolnik {
stroke: #2ecc71;
}
.link.prokurent {
stroke: #f39c12;
}
.link.wlasciciel_jdg {
stroke: #9b59b6;
}
.node-label {
font-size: 10px;
fill: #fff;
pointer-events: none;
text-shadow: 0 1px 2px rgba(0,0,0,0.8);
}
.tooltip {
position: absolute;
padding: 12px 16px;
background: rgba(26, 26, 46, 0.95);
border: 1px solid #4a90d9;
border-radius: 8px;
color: #fff;
font-size: 13px;
pointer-events: none;
z-index: 1000;
max-width: 300px;
box-shadow: 0 4px 12px rgba(0,0,0,0.3);
}
.tooltip h4 {
margin: 0 0 8px 0;
color: #4a90d9;
font-size: 14px;
}
.tooltip p {
margin: 4px 0;
}
.tooltip .role-badge {
display: inline-block;
padding: 2px 6px;
border-radius: 4px;
font-size: 11px;
margin: 2px;
}
.tooltip .role-badge.zarzad {
background: #e74c3c;
}
.tooltip .role-badge.wspolnik {
background: #2ecc71;
}
.tooltip .role-badge.prokurent {
background: #f39c12;
}
.tooltip .role-badge.wlasciciel_jdg {
background: #9b59b6;
}
.controls {
position: absolute;
top: 16px;
right: 16px;
display: flex;
flex-direction: column;
gap: 8px;
z-index: 100;
}
.control-btn {
padding: 8px 16px;
background: rgba(74, 144, 217, 0.9);
border: none;
border-radius: 6px;
color: #fff;
cursor: pointer;
font-size: 13px;
transition: background 0.2s;
}
.control-btn:hover {
background: rgba(74, 144, 217, 1);
}
.legend {
position: absolute;
bottom: 16px;
left: 16px;
background: rgba(26, 26, 46, 0.9);
padding: 12px 16px;
border-radius: 8px;
z-index: 100;
}
.legend h4 {
margin: 0 0 8px 0;
color: #fff;
font-size: 13px;
}
.legend-item {
display: flex;
align-items: center;
gap: 8px;
margin: 4px 0;
font-size: 12px;
color: #ccc;
}
.legend-color {
width: 16px;
height: 16px;
border-radius: 50%;
}
.legend-line {
width: 24px;
height: 3px;
border-radius: 2px;
}
.stats-panel {
position: absolute;
top: 16px;
left: 16px;
background: rgba(26, 26, 46, 0.9);
padding: 12px 16px;
border-radius: 8px;
z-index: 100;
}
.stats-panel h4 {
margin: 0 0 8px 0;
color: #4a90d9;
font-size: 14px;
}
.stats-panel .stat {
display: flex;
justify-content: space-between;
gap: 16px;
margin: 4px 0;
font-size: 13px;
color: #ccc;
}
.stats-panel .stat-value {
color: #fff;
font-weight: 600;
}
.page-header {
margin-bottom: 20px;
}
.page-header h1 {
font-size: 28px;
margin: 0;
}
.page-header p {
color: #888;
margin: 8px 0 0 0;
}
.loading-overlay {
position: absolute;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: rgba(26, 26, 46, 0.9);
display: flex;
align-items: center;
justify-content: center;
z-index: 1000;
}
.loading-text {
color: #fff;
font-size: 16px;
}
.source-info {
margin-top: 16px;
font-size: 12px;
color: #666;
}
.source-info a {
color: #4a90d9;
}
</style>
{% endblock %}
{% block content %}
<div class="page-header">
<h1>Mapa Powiazań Norda Biznes</h1>
<p>Wizualizacja powiązań między firmami członkowskimi a osobami (zarząd, wspólnicy, prokurenci)</p>
</div>
<div class="connections-container" id="container">
<div class="loading-overlay" id="loading">
<div class="loading-text">Ładowanie danych...</div>
</div>
<svg id="connections-graph"></svg>
<div class="stats-panel" id="stats">
<h4>Statystyki</h4>
<div class="stat">
<span>Firmy:</span>
<span class="stat-value" id="stat-companies">-</span>
</div>
<div class="stat">
<span>Osoby:</span>
<span class="stat-value" id="stat-people">-</span>
</div>
<div class="stat">
<span>Powiązania:</span>
<span class="stat-value" id="stat-connections">-</span>
</div>
</div>
<div class="controls">
<button class="control-btn" onclick="fitToScreen()" style="background: rgba(34, 197, 94, 0.9);">Dopasuj widok</button>
<button class="control-btn" onclick="resetZoom()">Reset widoku</button>
<button class="control-btn" onclick="toggleLabels()">Pokaż/ukryj etykiety</button>
</div>
<div class="legend">
<h4>Legenda</h4>
<div class="legend-item">
<div class="legend-color" style="background: #4a90d9;"></div>
<span>Firma</span>
</div>
<div class="legend-item">
<div class="legend-color" style="background: #f39c12;"></div>
<span>Osoba</span>
</div>
<div class="legend-item">
<div class="legend-line" style="background: #e74c3c;"></div>
<span>Zarząd</span>
</div>
<div class="legend-item">
<div class="legend-line" style="background: #2ecc71;"></div>
<span>Wspólnik</span>
</div>
<div class="legend-item">
<div class="legend-line" style="background: #f39c12;"></div>
<span>Prokurent</span>
</div>
<div class="legend-item">
<div class="legend-line" style="background: #9b59b6;"></div>
<span>Właściciel JDG</span>
</div>
</div>
<div class="tooltip" id="tooltip" style="display: none;"></div>
</div>
<div class="source-info">
Źródło danych: <a href="https://ekrs.ms.gov.pl" target="_blank">ekrs.ms.gov.pl</a> (KRS),
<a href="https://dane.biznes.gov.pl" target="_blank">dane.biznes.gov.pl</a> (CEIDG)
</div>
<script src="{{ url_for('static', filename='js/vendor/d3.v7.min.js') }}"></script>
<script>
// D3.js Force-Directed Graph for Company-Person Connections
let simulation, svg, g, zoom, showLabels = true;
let graphData = { nodes: [], links: [] };
// Wait for D3 to be available
function waitForD3(callback) {
if (typeof d3 !== 'undefined') {
callback();
} else {
setTimeout(() => waitForD3(callback), 100);
}
}
async function loadData() {
try {
const response = await fetch('/api/connections');
const data = await response.json();
if (!data.success) {
throw new Error('Failed to load data');
}
graphData = data;
// Update stats
document.getElementById('stat-companies').textContent = data.stats.companies;
document.getElementById('stat-people').textContent = data.stats.people;
document.getElementById('stat-connections').textContent = data.stats.connections;
// Hide loading
document.getElementById('loading').style.display = 'none';
// Initialize graph
initGraph(data.nodes, data.links);
} catch (error) {
console.error('Error loading data:', error);
document.querySelector('#loading .loading-text').textContent = 'Błąd ładowania danych';
}
}
function initGraph(nodes, links) {
const container = document.getElementById('container');
const width = container.clientWidth;
const height = container.clientHeight;
svg = d3.select('#connections-graph')
.attr('width', width)
.attr('height', height);
// Clear previous content
svg.selectAll('*').remove();
// Add zoom behavior
zoom = d3.zoom()
.scaleExtent([0.1, 4])
.on('zoom', (event) => {
g.attr('transform', event.transform);
});
svg.call(zoom);
// Create container for graph elements
g = svg.append('g');
// Create simulation - optimized for large graph
simulation = d3.forceSimulation(nodes)
.force('link', d3.forceLink(links).id(d => d.id).distance(80).strength(0.5))
.force('charge', d3.forceManyBody().strength(-150).distanceMax(300))
.force('center', d3.forceCenter(width / 2, height / 2))
.force('collision', d3.forceCollide().radius(20))
.force('x', d3.forceX(width / 2).strength(0.05))
.force('y', d3.forceY(height / 2).strength(0.05));
// Auto-fit after initial layout (2 seconds)
setTimeout(() => {
fitToScreen();
}, 2000);
// Create links
const link = g.append('g')
.selectAll('line')
.data(links)
.join('line')
.attr('class', d => `link ${d.category}`)
.attr('stroke-width', 2);
// Create nodes
const node = g.append('g')
.selectAll('g')
.data(nodes)
.join('g')
.attr('class', d => d.type === 'company' ? 'node-company' : 'node-person')
.call(drag(simulation));
// Add circles to nodes
node.append('circle')
.attr('r', d => {
if (d.type === 'company') return 12;
return 6 + (d.company_count || 1) * 2;
})
.attr('fill', d => d.type === 'company' ? '#4a90d9' : '#f39c12')
.attr('stroke', '#fff')
.attr('stroke-width', 1.5);
// Add labels
node.append('text')
.attr('class', 'node-label')
.attr('dx', 15)
.attr('dy', 4)
.text(d => {
if (d.type === 'company') return d.name.length > 20 ? d.name.substring(0, 20) + '...' : d.name;
return d.name;
});
// Add tooltip interaction
const tooltip = d3.select('#tooltip');
node.on('mouseover', (event, d) => {
let html = `<h4>${d.name}</h4>`;
if (d.type === 'company') {
html += `<p>Kategoria: ${d.category}</p>`;
if (d.city) html += `<p>Miasto: ${d.city}</p>`;
// Find connected people
const connected = links.filter(l => l.target.id === d.id || l.source.id === d.id);
if (connected.length > 0) {
html += `<p>Powiązania:</p>`;
connected.forEach(l => {
const person = l.source.id === d.id ? l.target : l.source;
if (person.type === 'person') {
html += `<span class="role-badge ${l.category}">${l.role}</span> ${person.name}<br>`;
}
});
}
} else {
html += `<p>Powiązany z ${d.company_count} firmami</p>`;
// Find connected companies
const connected = links.filter(l => l.source.id === d.id || l.target.id === d.id);
if (connected.length > 0) {
connected.forEach(l => {
const company = l.source.id === d.id ? l.target : l.source;
if (company.type === 'company') {
html += `<span class="role-badge ${l.category}">${l.role}</span> ${company.name}<br>`;
}
});
}
}
tooltip.html(html)
.style('display', 'block')
.style('left', (event.pageX + 15) + 'px')
.style('top', (event.pageY - 10) + 'px');
})
.on('mousemove', (event) => {
tooltip
.style('left', (event.pageX + 15) + 'px')
.style('top', (event.pageY - 10) + 'px');
})
.on('mouseout', () => {
tooltip.style('display', 'none');
})
.on('click', (event, d) => {
if (d.type === 'company' && d.slug) {
window.location.href = `/company/${d.slug}`;
}
});
// Update positions on tick
simulation.on('tick', () => {
link
.attr('x1', d => d.source.x)
.attr('y1', d => d.source.y)
.attr('x2', d => d.target.x)
.attr('y2', d => d.target.y);
node.attr('transform', d => `translate(${d.x},${d.y})`);
});
}
function drag(simulation) {
function dragstarted(event) {
if (!event.active) simulation.alphaTarget(0.3).restart();
event.subject.fx = event.subject.x;
event.subject.fy = event.subject.y;
}
function dragged(event) {
event.subject.fx = event.x;
event.subject.fy = event.y;
}
function dragended(event) {
if (!event.active) simulation.alphaTarget(0);
event.subject.fx = null;
event.subject.fy = null;
}
return d3.drag()
.on('start', dragstarted)
.on('drag', dragged)
.on('end', dragended);
}
function resetZoom() {
svg.transition()
.duration(750)
.call(zoom.transform, d3.zoomIdentity);
}
function fitToScreen() {
if (!graphData.nodes || graphData.nodes.length === 0) return;
const container = document.getElementById('container');
const width = container.clientWidth;
const height = container.clientHeight;
// Calculate bounding box of all nodes
let minX = Infinity, maxX = -Infinity;
let minY = Infinity, maxY = -Infinity;
graphData.nodes.forEach(n => {
if (n.x !== undefined && n.y !== undefined) {
minX = Math.min(minX, n.x);
maxX = Math.max(maxX, n.x);
minY = Math.min(minY, n.y);
maxY = Math.max(maxY, n.y);
}
});
if (minX === Infinity) return;
// Add padding
const padding = 50;
minX -= padding;
maxX += padding;
minY -= padding;
maxY += padding;
// Calculate scale and translation
const graphWidth = maxX - minX;
const graphHeight = maxY - minY;
const scale = Math.min(width / graphWidth, height / graphHeight, 1.5);
const centerX = (minX + maxX) / 2;
const centerY = (minY + maxY) / 2;
// Apply transform
svg.transition()
.duration(750)
.call(zoom.transform, d3.zoomIdentity
.translate(width / 2, height / 2)
.scale(scale)
.translate(-centerX, -centerY));
}
function toggleLabels() {
showLabels = !showLabels;
d3.selectAll('.node-label').style('display', showLabels ? 'block' : 'none');
}
// Handle window resize
window.addEventListener('resize', () => {
if (graphData.nodes.length > 0) {
initGraph(graphData.nodes, graphData.links);
}
});
// Load data on page load - wait for D3 first
document.addEventListener('DOMContentLoaded', () => {
waitForD3(loadData);
});
</script>
{% endblock %}

View File

@ -48,6 +48,31 @@
transform: translateY(-2px);
}
.company-logo {
width: 100%;
height: 120px;
background: var(--background);
border-radius: var(--radius);
display: flex;
align-items: center;
justify-content: center;
margin-bottom: var(--spacing-md);
overflow: hidden;
}
.company-logo img {
max-width: 80%;
max-height: 80%;
object-fit: contain;
}
.company-logo-placeholder {
font-size: 32px;
font-weight: 700;
color: var(--primary);
opacity: 0.3;
}
.company-header {
margin-bottom: var(--spacing-md);
}
@ -262,6 +287,11 @@
<div class="companies-grid">
{% for company in companies %}
<div class="company-card">
<a href="{{ url_for('company_detail', company_id=company.id) }}" class="company-logo">
<img src="{{ url_for('static', filename='img/companies/' ~ company.slug ~ '.webp') }}"
alt="{{ company.name }}"
onerror="this.parentElement.innerHTML='<span class=\'company-logo-placeholder\'>{{ company.name[:2]|upper }}</span>'">
</a>
<div class="company-header">
{% if company.category %}
<span class="company-category">{{ company.category.name }}</span>