Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- Extract 12-field completeness scoring to utils/data_quality.py service - Auto-update data_quality_score and data_quality label on company data changes - Add /admin/data-quality dashboard with field coverage stats, quality distribution, and sortable company table - Add bulk enrichment with background processing, step selection, and progress tracking - Flow GBP phone/website to Company record when company fields are empty - Display Google opening hours on public company profile - Add BulkEnrichmentJob model and migration 075 - Refactor arm_company.py to support selective steps and progress callbacks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
385 lines
18 KiB
Python
385 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Skrypt do automatycznego uzbrajania firm (enrichment) z linii poleceń.
|
|
Odpowiednik przycisku "Uzbrój firmę" w panelu admina.
|
|
|
|
Użycie:
|
|
python3 scripts/arm_company.py <company_id> [--force]
|
|
python3 scripts/arm_company.py 120 121 122 --force # wiele firm naraz
|
|
|
|
Opcje:
|
|
--force Wymusza ponowne wykonanie wszystkich kroków (jak "Zaktualizuj dane")
|
|
"""
|
|
import sys
|
|
import os
|
|
import logging
|
|
|
|
# Setup path
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, BASE_DIR)
|
|
|
|
scripts_dir = os.path.join(BASE_DIR, 'scripts')
|
|
if scripts_dir not in sys.path:
|
|
sys.path.insert(0, scripts_dir)
|
|
|
|
from database import SessionLocal, Company, CompanyWebsiteAnalysis, CompanySocialMedia, CompanyPKD, CompanyPerson
|
|
from database import GBPAudit
|
|
from utils.data_quality import update_company_data_quality
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger('arm_company')
|
|
|
|
|
|
ALL_STEPS = ['registry', 'seo', 'social', 'gbp', 'logo']
|
|
|
|
|
|
def arm_company(company_id, force=False, steps=None, progress_callback=None):
|
|
"""Arm a company with enrichment data.
|
|
|
|
Args:
|
|
company_id: Company ID to enrich
|
|
force: Force re-execution of all steps
|
|
steps: List of steps to run (default: all). Options: registry, seo, social, gbp, logo
|
|
progress_callback: Optional callback(company_id, step, result_text) for bulk tracking
|
|
"""
|
|
if steps is None:
|
|
steps = ALL_STEPS
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
print("Firma ID %d nie znaleziona!" % company_id)
|
|
return False
|
|
|
|
print("=" * 60)
|
|
print("Uzbrajam: %s (ID: %d)" % (company.name, company.id))
|
|
print("NIP: %s | WWW: %s" % (company.nip or '-', company.website or '-'))
|
|
print("Tryb: %s" % ("FORCE (wszystkie kroki)" if force else "SMART (tylko brakujące)"))
|
|
if steps != ALL_STEPS:
|
|
print("Kroki: %s" % ', '.join(steps))
|
|
print("=" * 60)
|
|
|
|
results = {}
|
|
|
|
# --- Krok 1: Dane urzędowe ---
|
|
registry_done = bool(company.krs_fetched_at or company.ceidg_fetched_at)
|
|
if 'registry' not in steps:
|
|
results['registry'] = 'SKIP (nie wybrano)'
|
|
elif force or not registry_done:
|
|
if company.nip:
|
|
print("\n[1/5] Pobieranie danych urzędowych...")
|
|
try:
|
|
from blueprints.admin.routes_membership import _enrich_company_from_krs
|
|
from krs_api_service import KRSApiService
|
|
|
|
krs_service = KRSApiService()
|
|
|
|
# Sprawdź KRS przez Białą Listę (search_by_nip zwraca dict lub None)
|
|
if not company.krs:
|
|
krs_data = krs_service.search_by_nip(company.nip)
|
|
if krs_data and krs_data.get('krs'):
|
|
company.krs = krs_data['krs']
|
|
db.flush()
|
|
logger.info("Znaleziono KRS %s dla NIP %s" % (company.krs, company.nip))
|
|
|
|
if company.krs:
|
|
success = _enrich_company_from_krs(company, db)
|
|
if success:
|
|
db.commit()
|
|
results['registry'] = 'OK (KRS)'
|
|
print(" -> OK: Dane z KRS pobrane")
|
|
else:
|
|
results['registry'] = 'FAIL (KRS)'
|
|
print(" -> FAIL: Nie udało się pobrać z KRS")
|
|
else:
|
|
# Próbuj CEIDG — używamy tego samego serwisu co API endpoint
|
|
from ceidg_api_service import fetch_ceidg_by_nip
|
|
from datetime import datetime, date as date_type
|
|
ceidg_data = fetch_ceidg_by_nip(company.nip)
|
|
if ceidg_data:
|
|
# CEIDG identifiers & metadata
|
|
if ceidg_data.get('ceidg_id'):
|
|
company.ceidg_id = ceidg_data['ceidg_id']
|
|
if ceidg_data.get('status'):
|
|
company.ceidg_status = ceidg_data['status']
|
|
company.ceidg_raw_data = ceidg_data.get('raw')
|
|
company.ceidg_fetched_at = datetime.now()
|
|
company.data_source = 'CEIDG API'
|
|
company.last_verified_at = datetime.now()
|
|
|
|
# Owner
|
|
wlasciciel = ceidg_data.get('wlasciciel', {})
|
|
if wlasciciel.get('imie'):
|
|
company.owner_first_name = wlasciciel['imie']
|
|
if wlasciciel.get('nazwisko'):
|
|
company.owner_last_name = wlasciciel['nazwisko']
|
|
if ceidg_data.get('obywatelstwa'):
|
|
company.owner_citizenships = ceidg_data['obywatelstwa']
|
|
|
|
# Legal name
|
|
if ceidg_data.get('firma') and (not company.legal_name or company.legal_name == company.name):
|
|
company.legal_name = ceidg_data['firma']
|
|
|
|
# REGON
|
|
if not company.regon:
|
|
regon = ceidg_data.get('regon') or wlasciciel.get('regon')
|
|
if regon:
|
|
company.regon = regon
|
|
|
|
# Business start date
|
|
if ceidg_data.get('dataRozpoczecia'):
|
|
try:
|
|
d = ceidg_data['dataRozpoczecia']
|
|
if isinstance(d, str):
|
|
company.business_start_date = date_type.fromisoformat(d)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Legal form
|
|
if not company.legal_form:
|
|
company.legal_form = 'JEDNOOSOBOWA DZIAŁALNOŚĆ GOSPODARCZA'
|
|
|
|
# PKD (main)
|
|
pkd_gl = ceidg_data.get('pkdGlowny', {})
|
|
if pkd_gl and pkd_gl.get('kod'):
|
|
company.pkd_code = pkd_gl['kod']
|
|
company.pkd_description = pkd_gl.get('nazwa')
|
|
|
|
# PKD (full list)
|
|
pkd_lista = ceidg_data.get('pkd', [])
|
|
if pkd_lista:
|
|
company.ceidg_pkd_list = pkd_lista
|
|
pkd_main_code = pkd_gl.get('kod', '') if pkd_gl else ''
|
|
for pkd_item in pkd_lista:
|
|
kod = pkd_item.get('kod', '')
|
|
if not kod:
|
|
continue
|
|
existing_pkd = db.query(CompanyPKD).filter(
|
|
CompanyPKD.company_id == company.id,
|
|
CompanyPKD.pkd_code == kod
|
|
).first()
|
|
if not existing_pkd:
|
|
db.add(CompanyPKD(
|
|
company_id=company.id,
|
|
pkd_code=kod,
|
|
pkd_description=pkd_item.get('nazwa', ''),
|
|
is_primary=(kod == pkd_main_code)
|
|
))
|
|
|
|
# Business address
|
|
adres = ceidg_data.get('adresDzialalnosci', {})
|
|
ulica = adres.get('ulica', '')
|
|
budynek = adres.get('budynek', '')
|
|
lokal = adres.get('lokal', '')
|
|
if ulica or budynek:
|
|
street_parts = [ulica, budynek]
|
|
if lokal:
|
|
street_parts[-1] = (budynek + '/' + lokal) if budynek else lokal
|
|
company.address_street = ' '.join(p for p in street_parts if p)
|
|
if adres.get('kod') or adres.get('kodPocztowy'):
|
|
company.address_postal = adres.get('kod') or adres.get('kodPocztowy')
|
|
if adres.get('miasto') or adres.get('miejscowosc'):
|
|
company.address_city = adres.get('miasto') or adres.get('miejscowosc')
|
|
if company.address_street and getattr(company, 'address_postal', None) and company.address_city:
|
|
company.address_full = '%s, %s %s' % (company.address_street, company.address_postal, company.address_city)
|
|
|
|
# Contact (only if empty)
|
|
if ceidg_data.get('email') and not company.email:
|
|
company.email = ceidg_data['email']
|
|
if ceidg_data.get('stronaWWW') and not company.website:
|
|
company.website = ceidg_data['stronaWWW']
|
|
if ceidg_data.get('telefon') and not company.phone:
|
|
company.phone = ceidg_data['telefon']
|
|
|
|
db.commit()
|
|
results['registry'] = 'OK (CEIDG)'
|
|
print(" -> OK: Dane z CEIDG pobrane")
|
|
else:
|
|
results['registry'] = 'NOT FOUND'
|
|
print(" -> Nie znaleziono w żadnym rejestrze")
|
|
except Exception as e:
|
|
results['registry'] = 'ERROR: %s' % str(e)[:80]
|
|
print(" -> ERROR: %s" % str(e)[:80])
|
|
else:
|
|
results['registry'] = 'SKIP (brak NIP)'
|
|
print("\n[1/5] Pominięto - brak NIP")
|
|
else:
|
|
results['registry'] = 'SKIP (done)'
|
|
print("\n[1/5] Dane urzędowe - już wykonane")
|
|
|
|
# Refresh company data po registry
|
|
db.refresh(company)
|
|
|
|
# --- Krok 2: Audyt SEO ---
|
|
seo_done = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company.id).first() is not None
|
|
if 'seo' not in steps:
|
|
results['seo'] = 'SKIP (nie wybrano)'
|
|
elif force or not seo_done:
|
|
if company.website:
|
|
print("\n[2/5] Audyt SEO...")
|
|
try:
|
|
from seo_audit import SEOAuditor
|
|
seo_service = SEOAuditor()
|
|
company_dict = {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'website': company.website,
|
|
'address_city': company.address_city or '',
|
|
}
|
|
audit_result = seo_service.audit_company(company_dict)
|
|
seo_score = audit_result.get('scores', {}).get('pagespeed_seo', '?')
|
|
perf_score = audit_result.get('scores', {}).get('pagespeed_performance', '?')
|
|
results['seo'] = 'OK (SEO: %s, Perf: %s)' % (seo_score, perf_score)
|
|
print(" -> OK: SEO=%s, Perf=%s" % (seo_score, perf_score))
|
|
except Exception as e:
|
|
results['seo'] = 'ERROR: %s' % str(e)[:80]
|
|
print(" -> ERROR: %s" % str(e)[:80])
|
|
else:
|
|
results['seo'] = 'SKIP (brak WWW)'
|
|
print("\n[2/5] Audyt SEO - pominięto (brak strony WWW)")
|
|
else:
|
|
results['seo'] = 'SKIP (done)'
|
|
print("\n[2/5] Audyt SEO - już wykonane")
|
|
|
|
# --- Krok 3: Social Media ---
|
|
social_done = db.query(CompanySocialMedia).filter_by(company_id=company.id).count() > 0
|
|
if 'social' not in steps:
|
|
results['social'] = 'SKIP (nie wybrano)'
|
|
elif force or not social_done:
|
|
print("\n[3/5] Audyt Social Media...")
|
|
try:
|
|
from social_media_audit import SocialMediaAuditor
|
|
auditor = SocialMediaAuditor() # uses DATABASE_URL from env
|
|
company_dict = {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'website': company.website,
|
|
'address_city': company.address_city or '',
|
|
}
|
|
audit_result = auditor.audit_company(company_dict)
|
|
# Save audit results to database (critical! without this profiles aren't persisted)
|
|
if audit_result:
|
|
auditor.save_audit_result(audit_result)
|
|
# Check DB for actual saved count
|
|
db.expire_all()
|
|
saved_count = db.query(CompanySocialMedia).filter_by(company_id=company.id).count()
|
|
results['social'] = 'OK (%d profili)' % saved_count
|
|
print(" -> OK: %d profili zapisanych w bazie" % saved_count)
|
|
except Exception as e:
|
|
results['social'] = 'ERROR: %s' % str(e)[:80]
|
|
print(" -> ERROR: %s" % str(e)[:80])
|
|
else:
|
|
results['social'] = 'SKIP (done)'
|
|
print("\n[3/5] Social Media - już wykonane")
|
|
|
|
# --- Krok 4: GBP ---
|
|
gbp_done = db.query(GBPAudit).filter_by(company_id=company.id).first() is not None
|
|
if 'gbp' not in steps:
|
|
results['gbp'] = 'SKIP (nie wybrano)'
|
|
elif force or not gbp_done:
|
|
print("\n[4/5] Audyt GBP...")
|
|
try:
|
|
from gbp_audit_service import GBPAuditService
|
|
gbp_service = GBPAuditService(db)
|
|
gbp_result = gbp_service.audit_company(company.id)
|
|
if gbp_result:
|
|
score = gbp_result.completeness_score
|
|
# Save to database
|
|
gbp_service.save_audit(gbp_result, source='script')
|
|
results['gbp'] = 'OK (score: %s)' % score
|
|
print(" -> OK: Score=%s" % score)
|
|
else:
|
|
results['gbp'] = 'FAIL'
|
|
print(" -> FAIL: brak wyniku")
|
|
except Exception as e:
|
|
results['gbp'] = 'ERROR: %s' % str(e)[:80]
|
|
print(" -> ERROR: %s" % str(e)[:80])
|
|
else:
|
|
results['gbp'] = 'SKIP (done)'
|
|
print("\n[4/5] Audyt GBP - już wykonane")
|
|
|
|
# --- Krok 5: Logo ---
|
|
logo_done = False
|
|
for ext in ('webp', 'svg'):
|
|
logo_path = os.path.join('static', 'img', 'companies', '%s.%s' % (company.slug, ext))
|
|
if os.path.isfile(logo_path):
|
|
logo_done = True
|
|
break
|
|
|
|
if 'logo' not in steps:
|
|
results['logo'] = 'SKIP (nie wybrano)'
|
|
elif force or not logo_done:
|
|
if company.website:
|
|
print("\n[5/5] Pobieranie logo...")
|
|
try:
|
|
from logo_fetch_service import LogoFetchService
|
|
service = LogoFetchService()
|
|
fetch_result = service.fetch_candidates(company.website, company.slug)
|
|
candidates = fetch_result.get('candidates', [])
|
|
if candidates:
|
|
pick = fetch_result.get('recommended_index', 0) or 0
|
|
ok = service.confirm_candidate(company.slug, pick)
|
|
results['logo'] = 'OK (kandydat #%d z %d)' % (pick, len(candidates))
|
|
print(" -> OK: Wybrano kandydata #%d z %d" % (pick, len(candidates)))
|
|
else:
|
|
results['logo'] = 'FAIL (0 kandydatów)'
|
|
print(" -> FAIL: Nie znaleziono kandydatów na logo")
|
|
except Exception as e:
|
|
results['logo'] = 'ERROR: %s' % str(e)[:80]
|
|
print(" -> ERROR: %s" % str(e)[:80])
|
|
else:
|
|
results['logo'] = 'SKIP (brak WWW)'
|
|
print("\n[5/5] Logo - pominięto (brak strony WWW)")
|
|
else:
|
|
results['logo'] = 'SKIP (done)'
|
|
print("\n[5/5] Logo - już istnieje")
|
|
|
|
# Podsumowanie
|
|
print("\n" + "=" * 60)
|
|
print("PODSUMOWANIE: %s (ID: %d)" % (company.name, company.id))
|
|
print("-" * 60)
|
|
for step, status in results.items():
|
|
print(" %-12s: %s" % (step, status))
|
|
|
|
ok_count = sum(1 for v in results.values() if v.startswith('OK') or v.startswith('SKIP (done)'))
|
|
print("-" * 60)
|
|
print(" Wynik: %d/5 kroków zaliczonych" % ok_count)
|
|
print("=" * 60)
|
|
|
|
# Update data quality score
|
|
dq = update_company_data_quality(company, db)
|
|
db.commit()
|
|
print("\n Data quality: %s (%d%%)" % (company.data_quality, dq['score']))
|
|
|
|
if progress_callback:
|
|
progress_callback(company_id, results)
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error("Błąd uzbrajania firmy %d: %s" % (company_id, str(e)))
|
|
print("\nBŁĄD KRYTYCZNY: %s" % str(e))
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) < 2:
|
|
print("Użycie: python3 scripts/arm_company.py <company_id> [<id2> ...] [--force]")
|
|
print(" --force Wymusza ponowne wykonanie wszystkich kroków")
|
|
sys.exit(1)
|
|
|
|
force = '--force' in sys.argv
|
|
ids = [int(a) for a in sys.argv[1:] if a != '--force' and a.isdigit()]
|
|
|
|
for cid in ids:
|
|
arm_company(cid, force=force)
|
|
if len(ids) > 1:
|
|
print("\n")
|