#!/usr/bin/env python3 """ Skrypt do automatycznego uzbrajania firm (enrichment) z linii poleceń. Odpowiednik przycisku "Uzbrój firmę" w panelu admina. Użycie: python3 scripts/arm_company.py [--force] python3 scripts/arm_company.py 120 121 122 --force # wiele firm naraz Opcje: --force Wymusza ponowne wykonanie wszystkich kroków (jak "Zaktualizuj dane") """ import sys import os import logging # Setup path BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) scripts_dir = os.path.join(BASE_DIR, 'scripts') if scripts_dir not in sys.path: sys.path.insert(0, scripts_dir) from database import SessionLocal, Company, CompanyWebsiteAnalysis, CompanySocialMedia, CompanyPKD, CompanyPerson from database import GBPAudit logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('arm_company') def arm_company(company_id, force=False): db = SessionLocal() try: company = db.query(Company).filter_by(id=company_id).first() if not company: print("Firma ID %d nie znaleziona!" % company_id) return False print("=" * 60) print("Uzbrajam: %s (ID: %d)" % (company.name, company.id)) print("NIP: %s | WWW: %s" % (company.nip or '-', company.website or '-')) print("Tryb: %s" % ("FORCE (wszystkie kroki)" if force else "SMART (tylko brakujące)")) print("=" * 60) results = {} # --- Krok 1: Dane urzędowe --- registry_done = bool(company.krs_fetched_at or company.ceidg_fetched_at) if force or not registry_done: if company.nip: print("\n[1/5] Pobieranie danych urzędowych...") try: from blueprints.admin.routes_membership import _enrich_company_from_krs from krs_api_service import KRSApiService krs_service = KRSApiService() # Sprawdź KRS przez Białą Listę (search_by_nip zwraca dict lub None) if not company.krs: krs_data = krs_service.search_by_nip(company.nip) if krs_data and krs_data.get('krs'): company.krs = krs_data['krs'] db.flush() logger.info("Znaleziono KRS %s dla NIP %s" % (company.krs, company.nip)) if company.krs: success = _enrich_company_from_krs(company, db) if success: db.commit() results['registry'] = 'OK (KRS)' print(" -> OK: Dane z KRS pobrane") else: results['registry'] = 'FAIL (KRS)' print(" -> FAIL: Nie udało się pobrać z KRS") else: # Próbuj CEIDG — używamy tego samego serwisu co API endpoint from ceidg_api_service import fetch_ceidg_by_nip from datetime import datetime, date as date_type ceidg_data = fetch_ceidg_by_nip(company.nip) if ceidg_data: # CEIDG identifiers & metadata if ceidg_data.get('ceidg_id'): company.ceidg_id = ceidg_data['ceidg_id'] if ceidg_data.get('status'): company.ceidg_status = ceidg_data['status'] company.ceidg_raw_data = ceidg_data.get('raw') company.ceidg_fetched_at = datetime.now() company.data_source = 'CEIDG API' company.last_verified_at = datetime.now() # Owner wlasciciel = ceidg_data.get('wlasciciel', {}) if wlasciciel.get('imie'): company.owner_first_name = wlasciciel['imie'] if wlasciciel.get('nazwisko'): company.owner_last_name = wlasciciel['nazwisko'] if ceidg_data.get('obywatelstwa'): company.owner_citizenships = ceidg_data['obywatelstwa'] # Legal name if ceidg_data.get('firma') and (not company.legal_name or company.legal_name == company.name): company.legal_name = ceidg_data['firma'] # REGON if not company.regon: regon = ceidg_data.get('regon') or wlasciciel.get('regon') if regon: company.regon = regon # Business start date if ceidg_data.get('dataRozpoczecia'): try: d = ceidg_data['dataRozpoczecia'] if isinstance(d, str): company.business_start_date = date_type.fromisoformat(d) except (ValueError, TypeError): pass # Legal form if not company.legal_form: company.legal_form = 'JEDNOOSOBOWA DZIAŁALNOŚĆ GOSPODARCZA' # PKD (main) pkd_gl = ceidg_data.get('pkdGlowny', {}) if pkd_gl and pkd_gl.get('kod'): company.pkd_code = pkd_gl['kod'] company.pkd_description = pkd_gl.get('nazwa') # PKD (full list) pkd_lista = ceidg_data.get('pkd', []) if pkd_lista: company.ceidg_pkd_list = pkd_lista pkd_main_code = pkd_gl.get('kod', '') if pkd_gl else '' for pkd_item in pkd_lista: kod = pkd_item.get('kod', '') if not kod: continue existing_pkd = db.query(CompanyPKD).filter( CompanyPKD.company_id == company.id, CompanyPKD.pkd_code == kod ).first() if not existing_pkd: db.add(CompanyPKD( company_id=company.id, pkd_code=kod, pkd_description=pkd_item.get('nazwa', ''), is_primary=(kod == pkd_main_code) )) # Business address adres = ceidg_data.get('adresDzialalnosci', {}) ulica = adres.get('ulica', '') budynek = adres.get('budynek', '') lokal = adres.get('lokal', '') if ulica or budynek: street_parts = [ulica, budynek] if lokal: street_parts[-1] = (budynek + '/' + lokal) if budynek else lokal company.address_street = ' '.join(p for p in street_parts if p) if adres.get('kod') or adres.get('kodPocztowy'): company.address_postal = adres.get('kod') or adres.get('kodPocztowy') if adres.get('miasto') or adres.get('miejscowosc'): company.address_city = adres.get('miasto') or adres.get('miejscowosc') if company.address_street and getattr(company, 'address_postal', None) and company.address_city: company.address_full = '%s, %s %s' % (company.address_street, company.address_postal, company.address_city) # Contact (only if empty) if ceidg_data.get('email') and not company.email: company.email = ceidg_data['email'] if ceidg_data.get('stronaWWW') and not company.website: company.website = ceidg_data['stronaWWW'] if ceidg_data.get('telefon') and not company.phone: company.phone = ceidg_data['telefon'] db.commit() results['registry'] = 'OK (CEIDG)' print(" -> OK: Dane z CEIDG pobrane") else: results['registry'] = 'NOT FOUND' print(" -> Nie znaleziono w żadnym rejestrze") except Exception as e: results['registry'] = 'ERROR: %s' % str(e)[:80] print(" -> ERROR: %s" % str(e)[:80]) else: results['registry'] = 'SKIP (brak NIP)' print("\n[1/5] Pominięto - brak NIP") else: results['registry'] = 'SKIP (done)' print("\n[1/5] Dane urzędowe - już wykonane") # Refresh company data po registry db.refresh(company) # --- Krok 2: Audyt SEO --- seo_done = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company.id).first() is not None if force or not seo_done: if company.website: print("\n[2/5] Audyt SEO...") try: from seo_audit import SEOAuditor seo_service = SEOAuditor() company_dict = { 'id': company.id, 'name': company.name, 'slug': company.slug, 'website': company.website, 'address_city': company.address_city or '', } audit_result = seo_service.audit_company(company_dict) seo_score = audit_result.get('scores', {}).get('pagespeed_seo', '?') perf_score = audit_result.get('scores', {}).get('pagespeed_performance', '?') results['seo'] = 'OK (SEO: %s, Perf: %s)' % (seo_score, perf_score) print(" -> OK: SEO=%s, Perf=%s" % (seo_score, perf_score)) except Exception as e: results['seo'] = 'ERROR: %s' % str(e)[:80] print(" -> ERROR: %s" % str(e)[:80]) else: results['seo'] = 'SKIP (brak WWW)' print("\n[2/5] Audyt SEO - pominięto (brak strony WWW)") else: results['seo'] = 'SKIP (done)' print("\n[2/5] Audyt SEO - już wykonane") # --- Krok 3: Social Media --- social_done = db.query(CompanySocialMedia).filter_by(company_id=company.id).count() > 0 if force or not social_done: print("\n[3/5] Audyt Social Media...") try: from social_media_audit import SocialMediaAuditor auditor = SocialMediaAuditor() # uses DATABASE_URL from env company_dict = { 'id': company.id, 'name': company.name, 'slug': company.slug, 'website': company.website, 'address_city': company.address_city or '', } audit_result = auditor.audit_company(company_dict) # Save audit results to database (critical! without this profiles aren't persisted) if audit_result: auditor.save_audit_result(audit_result) # Check DB for actual saved count db.expire_all() saved_count = db.query(CompanySocialMedia).filter_by(company_id=company.id).count() results['social'] = 'OK (%d profili)' % saved_count print(" -> OK: %d profili zapisanych w bazie" % saved_count) except Exception as e: results['social'] = 'ERROR: %s' % str(e)[:80] print(" -> ERROR: %s" % str(e)[:80]) else: results['social'] = 'SKIP (done)' print("\n[3/5] Social Media - już wykonane") # --- Krok 4: GBP --- gbp_done = db.query(GBPAudit).filter_by(company_id=company.id).first() is not None if force or not gbp_done: print("\n[4/5] Audyt GBP...") try: from gbp_audit_service import GBPAuditService gbp_service = GBPAuditService(db) gbp_result = gbp_service.audit_company(company.id) if gbp_result: score = gbp_result.completeness_score # Save to database gbp_service.save_audit(gbp_result, source='script') results['gbp'] = 'OK (score: %s)' % score print(" -> OK: Score=%s" % score) else: results['gbp'] = 'FAIL' print(" -> FAIL: brak wyniku") except Exception as e: results['gbp'] = 'ERROR: %s' % str(e)[:80] print(" -> ERROR: %s" % str(e)[:80]) else: results['gbp'] = 'SKIP (done)' print("\n[4/5] Audyt GBP - już wykonane") # --- Krok 5: Logo --- logo_done = False for ext in ('webp', 'svg'): logo_path = os.path.join('static', 'img', 'companies', '%s.%s' % (company.slug, ext)) if os.path.isfile(logo_path): logo_done = True break if force or not logo_done: if company.website: print("\n[5/5] Pobieranie logo...") try: from logo_fetch_service import LogoFetchService service = LogoFetchService() fetch_result = service.fetch_candidates(company.website, company.slug) candidates = fetch_result.get('candidates', []) if candidates: pick = fetch_result.get('recommended_index', 0) or 0 ok = service.confirm_candidate(company.slug, pick) results['logo'] = 'OK (kandydat #%d z %d)' % (pick, len(candidates)) print(" -> OK: Wybrano kandydata #%d z %d" % (pick, len(candidates))) else: results['logo'] = 'FAIL (0 kandydatów)' print(" -> FAIL: Nie znaleziono kandydatów na logo") except Exception as e: results['logo'] = 'ERROR: %s' % str(e)[:80] print(" -> ERROR: %s" % str(e)[:80]) else: results['logo'] = 'SKIP (brak WWW)' print("\n[5/5] Logo - pominięto (brak strony WWW)") else: results['logo'] = 'SKIP (done)' print("\n[5/5] Logo - już istnieje") # Podsumowanie print("\n" + "=" * 60) print("PODSUMOWANIE: %s (ID: %d)" % (company.name, company.id)) print("-" * 60) for step, status in results.items(): print(" %-12s: %s" % (step, status)) ok_count = sum(1 for v in results.values() if v.startswith('OK') or v.startswith('SKIP (done)')) print("-" * 60) print(" Wynik: %d/5 kroków zaliczonych" % ok_count) print("=" * 60) return True except Exception as e: logger.error("Błąd uzbrajania firmy %d: %s" % (company_id, str(e))) print("\nBŁĄD KRYTYCZNY: %s" % str(e)) import traceback traceback.print_exc() return False finally: db.close() if __name__ == '__main__': if len(sys.argv) < 2: print("Użycie: python3 scripts/arm_company.py [ ...] [--force]") print(" --force Wymusza ponowne wykonanie wszystkich kroków") sys.exit(1) force = '--force' in sys.argv ids = [int(a) for a in sys.argv[1:] if a != '--force' and a.isdigit()] for cid in ids: arm_company(cid, force=force) if len(ids) > 1: print("\n")