nordabiz/scripts/arm_company.py
Maciej Pienczyn 93e90b2c72
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: add data quality dashboard, auto-scoring, bulk enrichment and GBP data flow
- Extract 12-field completeness scoring to utils/data_quality.py service
- Auto-update data_quality_score and data_quality label on company data changes
- Add /admin/data-quality dashboard with field coverage stats, quality distribution, and sortable company table
- Add bulk enrichment with background processing, step selection, and progress tracking
- Flow GBP phone/website to Company record when company fields are empty
- Display Google opening hours on public company profile
- Add BulkEnrichmentJob model and migration 075
- Refactor arm_company.py to support selective steps and progress callbacks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 07:02:45 +01:00

385 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Skrypt do automatycznego uzbrajania firm (enrichment) z linii poleceń.
Odpowiednik przycisku "Uzbrój firmę" w panelu admina.
Użycie:
python3 scripts/arm_company.py <company_id> [--force]
python3 scripts/arm_company.py 120 121 122 --force # wiele firm naraz
Opcje:
--force Wymusza ponowne wykonanie wszystkich kroków (jak "Zaktualizuj dane")
"""
import sys
import os
import logging
# Setup path
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, BASE_DIR)
scripts_dir = os.path.join(BASE_DIR, 'scripts')
if scripts_dir not in sys.path:
sys.path.insert(0, scripts_dir)
from database import SessionLocal, Company, CompanyWebsiteAnalysis, CompanySocialMedia, CompanyPKD, CompanyPerson
from database import GBPAudit
from utils.data_quality import update_company_data_quality
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('arm_company')
ALL_STEPS = ['registry', 'seo', 'social', 'gbp', 'logo']
def arm_company(company_id, force=False, steps=None, progress_callback=None):
"""Arm a company with enrichment data.
Args:
company_id: Company ID to enrich
force: Force re-execution of all steps
steps: List of steps to run (default: all). Options: registry, seo, social, gbp, logo
progress_callback: Optional callback(company_id, step, result_text) for bulk tracking
"""
if steps is None:
steps = ALL_STEPS
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
print("Firma ID %d nie znaleziona!" % company_id)
return False
print("=" * 60)
print("Uzbrajam: %s (ID: %d)" % (company.name, company.id))
print("NIP: %s | WWW: %s" % (company.nip or '-', company.website or '-'))
print("Tryb: %s" % ("FORCE (wszystkie kroki)" if force else "SMART (tylko brakujące)"))
if steps != ALL_STEPS:
print("Kroki: %s" % ', '.join(steps))
print("=" * 60)
results = {}
# --- Krok 1: Dane urzędowe ---
registry_done = bool(company.krs_fetched_at or company.ceidg_fetched_at)
if 'registry' not in steps:
results['registry'] = 'SKIP (nie wybrano)'
elif force or not registry_done:
if company.nip:
print("\n[1/5] Pobieranie danych urzędowych...")
try:
from blueprints.admin.routes_membership import _enrich_company_from_krs
from krs_api_service import KRSApiService
krs_service = KRSApiService()
# Sprawdź KRS przez Białą Listę (search_by_nip zwraca dict lub None)
if not company.krs:
krs_data = krs_service.search_by_nip(company.nip)
if krs_data and krs_data.get('krs'):
company.krs = krs_data['krs']
db.flush()
logger.info("Znaleziono KRS %s dla NIP %s" % (company.krs, company.nip))
if company.krs:
success = _enrich_company_from_krs(company, db)
if success:
db.commit()
results['registry'] = 'OK (KRS)'
print(" -> OK: Dane z KRS pobrane")
else:
results['registry'] = 'FAIL (KRS)'
print(" -> FAIL: Nie udało się pobrać z KRS")
else:
# Próbuj CEIDG — używamy tego samego serwisu co API endpoint
from ceidg_api_service import fetch_ceidg_by_nip
from datetime import datetime, date as date_type
ceidg_data = fetch_ceidg_by_nip(company.nip)
if ceidg_data:
# CEIDG identifiers & metadata
if ceidg_data.get('ceidg_id'):
company.ceidg_id = ceidg_data['ceidg_id']
if ceidg_data.get('status'):
company.ceidg_status = ceidg_data['status']
company.ceidg_raw_data = ceidg_data.get('raw')
company.ceidg_fetched_at = datetime.now()
company.data_source = 'CEIDG API'
company.last_verified_at = datetime.now()
# Owner
wlasciciel = ceidg_data.get('wlasciciel', {})
if wlasciciel.get('imie'):
company.owner_first_name = wlasciciel['imie']
if wlasciciel.get('nazwisko'):
company.owner_last_name = wlasciciel['nazwisko']
if ceidg_data.get('obywatelstwa'):
company.owner_citizenships = ceidg_data['obywatelstwa']
# Legal name
if ceidg_data.get('firma') and (not company.legal_name or company.legal_name == company.name):
company.legal_name = ceidg_data['firma']
# REGON
if not company.regon:
regon = ceidg_data.get('regon') or wlasciciel.get('regon')
if regon:
company.regon = regon
# Business start date
if ceidg_data.get('dataRozpoczecia'):
try:
d = ceidg_data['dataRozpoczecia']
if isinstance(d, str):
company.business_start_date = date_type.fromisoformat(d)
except (ValueError, TypeError):
pass
# Legal form
if not company.legal_form:
company.legal_form = 'JEDNOOSOBOWA DZIAŁALNOŚĆ GOSPODARCZA'
# PKD (main)
pkd_gl = ceidg_data.get('pkdGlowny', {})
if pkd_gl and pkd_gl.get('kod'):
company.pkd_code = pkd_gl['kod']
company.pkd_description = pkd_gl.get('nazwa')
# PKD (full list)
pkd_lista = ceidg_data.get('pkd', [])
if pkd_lista:
company.ceidg_pkd_list = pkd_lista
pkd_main_code = pkd_gl.get('kod', '') if pkd_gl else ''
for pkd_item in pkd_lista:
kod = pkd_item.get('kod', '')
if not kod:
continue
existing_pkd = db.query(CompanyPKD).filter(
CompanyPKD.company_id == company.id,
CompanyPKD.pkd_code == kod
).first()
if not existing_pkd:
db.add(CompanyPKD(
company_id=company.id,
pkd_code=kod,
pkd_description=pkd_item.get('nazwa', ''),
is_primary=(kod == pkd_main_code)
))
# Business address
adres = ceidg_data.get('adresDzialalnosci', {})
ulica = adres.get('ulica', '')
budynek = adres.get('budynek', '')
lokal = adres.get('lokal', '')
if ulica or budynek:
street_parts = [ulica, budynek]
if lokal:
street_parts[-1] = (budynek + '/' + lokal) if budynek else lokal
company.address_street = ' '.join(p for p in street_parts if p)
if adres.get('kod') or adres.get('kodPocztowy'):
company.address_postal = adres.get('kod') or adres.get('kodPocztowy')
if adres.get('miasto') or adres.get('miejscowosc'):
company.address_city = adres.get('miasto') or adres.get('miejscowosc')
if company.address_street and getattr(company, 'address_postal', None) and company.address_city:
company.address_full = '%s, %s %s' % (company.address_street, company.address_postal, company.address_city)
# Contact (only if empty)
if ceidg_data.get('email') and not company.email:
company.email = ceidg_data['email']
if ceidg_data.get('stronaWWW') and not company.website:
company.website = ceidg_data['stronaWWW']
if ceidg_data.get('telefon') and not company.phone:
company.phone = ceidg_data['telefon']
db.commit()
results['registry'] = 'OK (CEIDG)'
print(" -> OK: Dane z CEIDG pobrane")
else:
results['registry'] = 'NOT FOUND'
print(" -> Nie znaleziono w żadnym rejestrze")
except Exception as e:
results['registry'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['registry'] = 'SKIP (brak NIP)'
print("\n[1/5] Pominięto - brak NIP")
else:
results['registry'] = 'SKIP (done)'
print("\n[1/5] Dane urzędowe - już wykonane")
# Refresh company data po registry
db.refresh(company)
# --- Krok 2: Audyt SEO ---
seo_done = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company.id).first() is not None
if 'seo' not in steps:
results['seo'] = 'SKIP (nie wybrano)'
elif force or not seo_done:
if company.website:
print("\n[2/5] Audyt SEO...")
try:
from seo_audit import SEOAuditor
seo_service = SEOAuditor()
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or '',
}
audit_result = seo_service.audit_company(company_dict)
seo_score = audit_result.get('scores', {}).get('pagespeed_seo', '?')
perf_score = audit_result.get('scores', {}).get('pagespeed_performance', '?')
results['seo'] = 'OK (SEO: %s, Perf: %s)' % (seo_score, perf_score)
print(" -> OK: SEO=%s, Perf=%s" % (seo_score, perf_score))
except Exception as e:
results['seo'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['seo'] = 'SKIP (brak WWW)'
print("\n[2/5] Audyt SEO - pominięto (brak strony WWW)")
else:
results['seo'] = 'SKIP (done)'
print("\n[2/5] Audyt SEO - już wykonane")
# --- Krok 3: Social Media ---
social_done = db.query(CompanySocialMedia).filter_by(company_id=company.id).count() > 0
if 'social' not in steps:
results['social'] = 'SKIP (nie wybrano)'
elif force or not social_done:
print("\n[3/5] Audyt Social Media...")
try:
from social_media_audit import SocialMediaAuditor
auditor = SocialMediaAuditor() # uses DATABASE_URL from env
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or '',
}
audit_result = auditor.audit_company(company_dict)
# Save audit results to database (critical! without this profiles aren't persisted)
if audit_result:
auditor.save_audit_result(audit_result)
# Check DB for actual saved count
db.expire_all()
saved_count = db.query(CompanySocialMedia).filter_by(company_id=company.id).count()
results['social'] = 'OK (%d profili)' % saved_count
print(" -> OK: %d profili zapisanych w bazie" % saved_count)
except Exception as e:
results['social'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['social'] = 'SKIP (done)'
print("\n[3/5] Social Media - już wykonane")
# --- Krok 4: GBP ---
gbp_done = db.query(GBPAudit).filter_by(company_id=company.id).first() is not None
if 'gbp' not in steps:
results['gbp'] = 'SKIP (nie wybrano)'
elif force or not gbp_done:
print("\n[4/5] Audyt GBP...")
try:
from gbp_audit_service import GBPAuditService
gbp_service = GBPAuditService(db)
gbp_result = gbp_service.audit_company(company.id)
if gbp_result:
score = gbp_result.completeness_score
# Save to database
gbp_service.save_audit(gbp_result, source='script')
results['gbp'] = 'OK (score: %s)' % score
print(" -> OK: Score=%s" % score)
else:
results['gbp'] = 'FAIL'
print(" -> FAIL: brak wyniku")
except Exception as e:
results['gbp'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['gbp'] = 'SKIP (done)'
print("\n[4/5] Audyt GBP - już wykonane")
# --- Krok 5: Logo ---
logo_done = False
for ext in ('webp', 'svg'):
logo_path = os.path.join('static', 'img', 'companies', '%s.%s' % (company.slug, ext))
if os.path.isfile(logo_path):
logo_done = True
break
if 'logo' not in steps:
results['logo'] = 'SKIP (nie wybrano)'
elif force or not logo_done:
if company.website:
print("\n[5/5] Pobieranie logo...")
try:
from logo_fetch_service import LogoFetchService
service = LogoFetchService()
fetch_result = service.fetch_candidates(company.website, company.slug)
candidates = fetch_result.get('candidates', [])
if candidates:
pick = fetch_result.get('recommended_index', 0) or 0
ok = service.confirm_candidate(company.slug, pick)
results['logo'] = 'OK (kandydat #%d z %d)' % (pick, len(candidates))
print(" -> OK: Wybrano kandydata #%d z %d" % (pick, len(candidates)))
else:
results['logo'] = 'FAIL (0 kandydatów)'
print(" -> FAIL: Nie znaleziono kandydatów na logo")
except Exception as e:
results['logo'] = 'ERROR: %s' % str(e)[:80]
print(" -> ERROR: %s" % str(e)[:80])
else:
results['logo'] = 'SKIP (brak WWW)'
print("\n[5/5] Logo - pominięto (brak strony WWW)")
else:
results['logo'] = 'SKIP (done)'
print("\n[5/5] Logo - już istnieje")
# Podsumowanie
print("\n" + "=" * 60)
print("PODSUMOWANIE: %s (ID: %d)" % (company.name, company.id))
print("-" * 60)
for step, status in results.items():
print(" %-12s: %s" % (step, status))
ok_count = sum(1 for v in results.values() if v.startswith('OK') or v.startswith('SKIP (done)'))
print("-" * 60)
print(" Wynik: %d/5 kroków zaliczonych" % ok_count)
print("=" * 60)
# Update data quality score
dq = update_company_data_quality(company, db)
db.commit()
print("\n Data quality: %s (%d%%)" % (company.data_quality, dq['score']))
if progress_callback:
progress_callback(company_id, results)
return results
except Exception as e:
logger.error("Błąd uzbrajania firmy %d: %s" % (company_id, str(e)))
print("\nBŁĄD KRYTYCZNY: %s" % str(e))
import traceback
traceback.print_exc()
return False
finally:
db.close()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Użycie: python3 scripts/arm_company.py <company_id> [<id2> ...] [--force]")
print(" --force Wymusza ponowne wykonanie wszystkich kroków")
sys.exit(1)
force = '--force' in sys.argv
ids = [int(a) for a in sys.argv[1:] if a != '--force' and a.isdigit()]
for cid in ids:
arm_company(cid, force=force)
if len(ids) > 1:
print("\n")