nordabiz/blueprints/admin/routes_company_wizard.py
Maciej Pienczyn 3a4213a077
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(wizard): use raw SQL DELETE for draft cleanup to trigger DB-level CASCADE
ORM db.delete() doesn't honor ondelete='CASCADE' on FK constraints,
causing NotNullViolation on company_financial_reports and other tables.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 12:22:48 +02:00

871 lines
29 KiB
Python

"""
Admin Routes - Company Creation Wizard
=======================================
Multi-step wizard for adding new companies with automatic
data enrichment from KRS, CEIDG, SEO, GBP, and Social Media.
"""
import re
import os
import sys
import logging
import threading
from datetime import datetime, date as date_type
from flask import render_template, request, jsonify
from flask_login import login_required, current_user
from sqlalchemy.orm.attributes import flag_modified
from . import bp
from database import (
SessionLocal, Company, Category, CompanyPKD, CompanyPerson, Person,
CompanyWebsiteAnalysis, CompanySocialMedia, GBPAudit,
SystemRole
)
from utils.decorators import role_required
logger = logging.getLogger(__name__)
def _delete_draft_company(db, company_id):
"""Delete a wizard_draft company using raw SQL to trigger DB-level CASCADE."""
from sqlalchemy import text
db.execute(text("DELETE FROM companies WHERE id = :cid AND status = 'wizard_draft'"),
{'cid': company_id})
def _validate_nip(nip: str) -> bool:
"""Validate Polish NIP number (10 digits, checksum)."""
if not nip or not re.match(r'^\d{10}$', nip):
return False
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
return checksum == int(nip[9])
def _generate_slug(name):
"""Generate URL-safe slug from company name."""
slug = re.sub(r'[^\w\s-]', '', name.lower())
slug = re.sub(r'[\s_]+', '-', slug)
slug = re.sub(r'-+', '-', slug).strip('-')
return slug
def _ensure_unique_slug(db, slug, exclude_id=None):
"""Ensure slug is unique, appending -2, -3 etc. if needed."""
base_slug = slug
counter = 2
while True:
q = db.query(Company).filter(Company.slug == slug)
if exclude_id:
q = q.filter(Company.id != exclude_id)
if not q.first():
return slug
slug = f"{base_slug}-{counter}"
counter += 1
# ============================================================
# WIZARD PAGE
# ============================================================
@bp.route('/companies/wizard')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def company_wizard():
"""Render the company creation wizard page."""
db = SessionLocal()
try:
categories = db.query(Category).order_by(Category.name).all()
categories_list = [{'id': c.id, 'name': c.name} for c in categories]
# Check for existing wizard drafts by this user
draft = db.query(Company).filter(
Company.status == 'wizard_draft',
Company.wizard_started_by == current_user.id
).first()
draft_info = None
if draft:
draft_info = {
'id': draft.id,
'name': draft.name,
'nip': draft.nip,
'step': draft.wizard_step or 1,
'created_at': draft.created_at.strftime('%Y-%m-%d %H:%M') if draft.created_at else ''
}
return render_template(
'admin/company_wizard.html',
categories=categories_list,
draft=draft_info
)
finally:
db.close()
# ============================================================
# STEP 1: NIP LOOKUP & REGISTRY FETCH
# ============================================================
@bp.route('/companies/wizard/step1', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step1():
"""Validate NIP, fetch registry data, create draft company."""
data = request.get_json()
nip = (data.get('nip') or '').strip().replace('-', '').replace(' ', '')
if not _validate_nip(nip):
return jsonify({'success': False, 'error': 'Nieprawidłowy NIP (10 cyfr, suma kontrolna)'}), 400
db = SessionLocal()
try:
# Check for duplicates (excluding wizard_draft)
existing = db.query(Company).filter(
Company.nip == nip,
Company.status != 'wizard_draft'
).first()
if existing:
return jsonify({
'success': False,
'error': f'Firma z NIP {nip} już istnieje: {existing.name} (ID: {existing.id})',
'existing_id': existing.id,
'existing_name': existing.name
}), 409
# Delete any existing wizard_draft with this NIP by this user
old_draft = db.query(Company).filter(
Company.nip == nip,
Company.status == 'wizard_draft'
).first()
if old_draft:
_delete_draft_company(db, old_draft.id)
db.commit()
# Try KRS first (via Biala Lista → KRS Open API)
registry_source = None
registry_data = {}
company_name = ''
krs_number = None
try:
from krs_api_service import KRSApiService
krs_service = KRSApiService()
biala_lista = krs_service.search_by_nip(nip)
if biala_lista and biala_lista.get('krs'):
krs_number = biala_lista['krs']
company_name = biala_lista.get('name', '')
registry_source = 'KRS'
registry_data = biala_lista
except Exception as e:
logger.warning(f"KRS lookup failed for NIP {nip}: {e}")
# If no KRS, try CEIDG
ceidg_data = None
if not registry_source:
try:
from ceidg_api_service import fetch_ceidg_by_nip
ceidg_data = fetch_ceidg_by_nip(nip)
if ceidg_data:
registry_source = 'CEIDG'
company_name = ceidg_data.get('firma', ceidg_data.get('nazwa', ''))
registry_data = ceidg_data
except Exception as e:
logger.warning(f"CEIDG lookup failed for NIP {nip}: {e}")
# Create draft company
slug = _generate_slug(company_name or f'firma-{nip}')
slug = _ensure_unique_slug(db, slug)
company = Company(
name=company_name or f'Firma NIP {nip}',
slug=slug,
nip=nip,
status='wizard_draft',
data_quality='basic',
wizard_step=1,
wizard_started_by=current_user.id,
created_at=datetime.now(),
last_updated=datetime.now()
)
db.add(company)
db.flush() # Get company.id
# Enrich from KRS if found
if registry_source == 'KRS' and krs_number:
company.krs = krs_number
try:
from .routes_membership import _enrich_company_from_krs
_enrich_company_from_krs(company, db)
except Exception as e:
logger.warning(f"KRS enrichment failed: {e}")
# Enrich from CEIDG if found
elif registry_source == 'CEIDG' and ceidg_data:
_apply_ceidg_data(company, ceidg_data, db)
company.wizard_step = 2
db.commit()
# Build response with company data
response_data = _company_to_dict(company)
response_data['registry_source'] = registry_source or 'manual'
# Add PKD codes
pkd_codes = db.query(CompanyPKD).filter_by(company_id=company.id).all()
response_data['pkd_codes'] = [
{'code': p.pkd_code, 'description': p.pkd_description, 'is_primary': p.is_primary}
for p in pkd_codes
]
# Add people (board members)
people = db.query(CompanyPerson).filter_by(company_id=company.id).all()
response_data['people'] = [
{
'name': cp.person.full_name() if cp.person else '',
'role': cp.role_in_company or ''
}
for cp in people if cp.person
]
return jsonify({'success': True, 'company': response_data})
except Exception as e:
db.rollback()
logger.error(f"Wizard step 1 error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
def _apply_ceidg_data(company, ceidg_data, db):
"""Apply CEIDG API data to company record. Mirrors arm_company.py logic."""
company.ceidg_id = ceidg_data.get('ceidg_id')
company.ceidg_status = ceidg_data.get('status')
company.ceidg_raw_data = ceidg_data.get('raw')
company.ceidg_fetched_at = datetime.now()
company.data_source = 'CEIDG API'
company.last_verified_at = datetime.now()
# Owner
wlasciciel = ceidg_data.get('wlasciciel', {})
if wlasciciel.get('imie'):
company.owner_first_name = wlasciciel['imie']
if wlasciciel.get('nazwisko'):
company.owner_last_name = wlasciciel['nazwisko']
if ceidg_data.get('obywatelstwa'):
company.owner_citizenships = ceidg_data['obywatelstwa']
# Legal name
if ceidg_data.get('firma'):
company.legal_name = ceidg_data['firma']
# REGON
regon = ceidg_data.get('regon') or wlasciciel.get('regon')
if regon:
company.regon = regon
# Business start date
if ceidg_data.get('dataRozpoczecia'):
try:
d = ceidg_data['dataRozpoczecia']
if isinstance(d, str):
company.business_start_date = date_type.fromisoformat(d)
except (ValueError, TypeError):
pass
# Legal form
company.legal_form = 'JEDNOOSOBOWA DZIALALNOSC GOSPODARCZA'
# PKD (main)
pkd_gl = ceidg_data.get('pkdGlowny', {})
if pkd_gl and pkd_gl.get('kod'):
company.pkd_code = pkd_gl['kod']
company.pkd_description = pkd_gl.get('nazwa')
# PKD (full list)
pkd_lista = ceidg_data.get('pkd', [])
if pkd_lista:
company.ceidg_pkd_list = pkd_lista
pkd_main_code = pkd_gl.get('kod', '') if pkd_gl else ''
for pkd_item in pkd_lista:
kod = pkd_item.get('kod', '')
if not kod:
continue
existing_pkd = db.query(CompanyPKD).filter(
CompanyPKD.company_id == company.id,
CompanyPKD.pkd_code == kod
).first()
if not existing_pkd:
db.add(CompanyPKD(
company_id=company.id,
pkd_code=kod,
pkd_description=pkd_item.get('nazwa', ''),
is_primary=(kod == pkd_main_code)
))
# Business address
adres = ceidg_data.get('adresDzialalnosci', {})
ulica = adres.get('ulica', '')
budynek = adres.get('budynek', '')
lokal = adres.get('lokal', '')
if ulica or budynek:
street_parts = [ulica, budynek]
if lokal:
street_parts[-1] = (budynek + '/' + lokal) if budynek else lokal
company.address_street = ' '.join(p for p in street_parts if p)
if adres.get('kod') or adres.get('kodPocztowy'):
company.address_postal = adres.get('kod') or adres.get('kodPocztowy')
if adres.get('miasto') or adres.get('miejscowosc'):
company.address_city = adres.get('miasto') or adres.get('miejscowosc')
if company.address_street and company.address_postal and company.address_city:
company.address_full = f'{company.address_street}, {company.address_postal} {company.address_city}'
# Contact
if ceidg_data.get('email'):
company.email = ceidg_data['email']
if ceidg_data.get('stronaWWW'):
company.website = ceidg_data['stronaWWW']
if ceidg_data.get('telefon'):
company.phone = ceidg_data['telefon']
def _company_to_dict(company):
"""Convert Company to a dictionary for JSON response."""
return {
'id': company.id,
'name': company.name,
'legal_name': company.legal_name,
'slug': company.slug,
'nip': company.nip,
'regon': company.regon,
'krs': company.krs,
'website': company.website,
'email': company.email,
'phone': company.phone,
'address_street': company.address_street,
'address_city': company.address_city,
'address_postal': company.address_postal,
'address_full': company.address_full,
'legal_form': company.legal_form,
'pkd_code': company.pkd_code,
'pkd_description': company.pkd_description,
'capital_amount': float(company.capital_amount) if company.capital_amount else None,
'capital_currency': company.capital_currency,
'year_established': company.year_established,
'owner_first_name': company.owner_first_name,
'owner_last_name': company.owner_last_name,
'category_id': company.category_id,
'description_short': company.description_short,
'description_full': company.description_full,
'data_source': company.data_source,
'wizard_step': company.wizard_step,
}
# ============================================================
# STEP 2: REVIEW & EDIT REGISTRY DATA
# ============================================================
@bp.route('/companies/wizard/step2', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step2():
"""Update company with edited registry data."""
data = request.get_json()
company_id = data.get('company_id')
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
# Update editable fields
editable_fields = [
'name', 'legal_name', 'website', 'email', 'phone',
'address_street', 'address_city', 'address_postal',
'description_short', 'description_full'
]
for field in editable_fields:
if field in data:
setattr(company, field, data[field] or None)
# Category
if 'category_id' in data and data['category_id']:
company.category_id = int(data['category_id'])
# Rebuild address_full
if company.address_street and company.address_postal and company.address_city:
company.address_full = f'{company.address_street}, {company.address_postal} {company.address_city}'
# Update slug if name changed
if 'name' in data and data['name']:
new_slug = _generate_slug(data['name'])
new_slug = _ensure_unique_slug(db, new_slug, exclude_id=company.id)
company.slug = new_slug
company.wizard_step = 3
company.last_updated = datetime.now()
db.commit()
return jsonify({'success': True, 'company': _company_to_dict(company)})
except Exception as e:
db.rollback()
logger.error(f"Wizard step 2 error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
# ============================================================
# STEP 3: WEBSITE & LOGO
# ============================================================
@bp.route('/companies/wizard/step3/fetch-logos', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step3_fetch_logos():
"""Fetch logo candidates from company website."""
data = request.get_json()
company_id = data.get('company_id')
website_url = data.get('website_url', '').strip()
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
# Update website URL if provided
if website_url:
if not website_url.startswith('http'):
website_url = 'https://' + website_url
company.website = website_url
db.commit()
if not company.website:
return jsonify({'success': False, 'error': 'Brak adresu strony WWW'}), 400
from logo_fetch_service import LogoFetchService
service = LogoFetchService()
result = service.fetch_candidates(company.website, company.slug)
candidates = result.get('candidates', [])
return jsonify({
'success': True,
'candidates': candidates,
'recommended_index': result.get('recommended_index', 0),
'message': result.get('message', '')
})
except Exception as e:
logger.error(f"Logo fetch error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/companies/wizard/step3/select-logo', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step3_select_logo():
"""Confirm selected logo candidate."""
data = request.get_json()
company_id = data.get('company_id')
candidate_index = data.get('candidate_index', 0)
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
from logo_fetch_service import LogoFetchService
service = LogoFetchService()
success = service.confirm_candidate(company.slug, candidate_index)
if success:
company.wizard_step = 4
db.commit()
return jsonify({'success': True, 'logo_path': f'/static/img/companies/{company.slug}.webp'})
else:
return jsonify({'success': False, 'error': 'Nie udało się zapisać logo'}), 500
except Exception as e:
logger.error(f"Logo select error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/companies/wizard/step3/skip', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step3_skip():
"""Skip logo step."""
data = request.get_json()
company_id = data.get('company_id')
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
company.wizard_step = 4
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
logger.error(f"Wizard step3 skip error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
# ============================================================
# STEP 4: AUDITS (BACKGROUND)
# ============================================================
@bp.route('/companies/wizard/step4/start-audits', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step4_start_audits():
"""Start background audit thread for SEO, GBP, and Social Media."""
data = request.get_json()
company_id = data.get('company_id')
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
# Initialize audit status
company.wizard_audit_status = {
'seo': 'pending' if company.website else 'skipped',
'gbp': 'pending',
'social': 'pending'
}
flag_modified(company, 'wizard_audit_status')
db.commit()
# Launch background thread
thread = threading.Thread(
target=_run_wizard_audits,
args=(company_id,),
daemon=True
)
thread.start()
return jsonify({'success': True, 'audit_status': company.wizard_audit_status})
except Exception as e:
logger.error(f"Audit start error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
def _run_wizard_audits(company_id):
"""Background worker: run SEO, GBP, and Social Media audits."""
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
scripts_dir = os.path.join(base_dir, 'scripts')
if base_dir not in sys.path:
sys.path.insert(0, base_dir)
if scripts_dir not in sys.path:
sys.path.insert(0, scripts_dir)
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return
audit_status = dict(company.wizard_audit_status or {})
# --- SEO Audit ---
if company.website and audit_status.get('seo') == 'pending':
audit_status['seo'] = 'running'
company.wizard_audit_status = audit_status
flag_modified(company, 'wizard_audit_status')
db.commit()
try:
from seo_audit import SEOAuditor
seo_service = SEOAuditor()
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or '',
}
result = seo_service.audit_company(company_dict)
seo_score = result.get('scores', {}).get('pagespeed_seo', 0)
audit_status['seo'] = 'completed'
audit_status['seo_score'] = seo_score
except Exception as e:
audit_status['seo'] = f'error: {str(e)[:80]}'
logger.error(f"Wizard SEO audit error: {e}")
company.wizard_audit_status = audit_status
flag_modified(company, 'wizard_audit_status')
db.commit()
# --- GBP Audit ---
if audit_status.get('gbp') == 'pending':
audit_status['gbp'] = 'running'
company.wizard_audit_status = audit_status
flag_modified(company, 'wizard_audit_status')
db.commit()
try:
db.expire_all()
from gbp_audit_service import GBPAuditService
gbp_service = GBPAuditService(db)
gbp_result = gbp_service.audit_company(company.id)
if gbp_result:
gbp_service.save_audit(gbp_result, source='wizard')
audit_status['gbp'] = 'completed'
audit_status['gbp_score'] = gbp_result.completeness_score
else:
audit_status['gbp'] = 'not_found'
except Exception as e:
audit_status['gbp'] = f'error: {str(e)[:80]}'
logger.error(f"Wizard GBP audit error: {e}")
company.wizard_audit_status = audit_status
flag_modified(company, 'wizard_audit_status')
db.commit()
# --- Social Media Audit ---
if audit_status.get('social') == 'pending':
audit_status['social'] = 'running'
company.wizard_audit_status = audit_status
flag_modified(company, 'wizard_audit_status')
db.commit()
try:
from social_media_audit import SocialMediaAuditor
auditor = SocialMediaAuditor()
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city or '',
}
result = auditor.audit_company(company_dict)
if result:
auditor.save_audit_result(result)
db.expire_all()
saved_count = db.query(CompanySocialMedia).filter_by(company_id=company.id).count()
audit_status['social'] = 'completed'
audit_status['social_count'] = saved_count
except Exception as e:
audit_status['social'] = f'error: {str(e)[:80]}'
logger.error(f"Wizard Social audit error: {e}")
company.wizard_audit_status = audit_status
flag_modified(company, 'wizard_audit_status')
db.commit()
except Exception as e:
logger.error(f"Wizard audit thread error: {e}")
finally:
db.close()
@bp.route('/companies/wizard/step4/audit-status')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step4_audit_status():
"""Poll audit progress."""
company_id = request.args.get('company_id', type=int)
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
audit_status = company.wizard_audit_status or {}
all_done = all(
v not in ('pending', 'running')
for v in [audit_status.get('seo', 'skipped'),
audit_status.get('gbp', 'skipped'),
audit_status.get('social', 'skipped')]
)
return jsonify({
'success': True,
'audit_status': audit_status,
'all_done': all_done
})
finally:
db.close()
@bp.route('/companies/wizard/step4/skip', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step4_skip():
"""Skip audits step."""
data = request.get_json()
company_id = data.get('company_id')
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
company.wizard_step = 5
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
logger.error(f"Wizard step4 skip error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
# ============================================================
# STEP 5: FINALIZE
# ============================================================
@bp.route('/companies/wizard/step5/finalize', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_step5_finalize():
"""Finalize company: change status, compute data quality."""
data = request.get_json()
company_id = data.get('company_id')
target_status = data.get('status', 'active')
category_id = data.get('category_id')
if target_status not in ('active', 'pending'):
target_status = 'active'
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
# Apply final settings
company.status = target_status
if category_id:
company.category_id = int(category_id)
# Clean up wizard fields
company.wizard_step = None
company.wizard_started_by = None
company.wizard_audit_status = None
# Update data quality
from utils.data_quality import update_company_data_quality
dq = update_company_data_quality(company, db)
db.commit()
return jsonify({
'success': True,
'company_id': company.id,
'slug': company.slug,
'name': company.name,
'status': company.status,
'data_quality': company.data_quality,
'data_quality_score': dq.get('score', 0)
})
except Exception as e:
db.rollback()
logger.error(f"Wizard finalize error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
# ============================================================
# CANCEL / CLEANUP
# ============================================================
@bp.route('/companies/wizard/cancel', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def wizard_cancel():
"""Delete wizard draft company."""
data = request.get_json()
company_id = data.get('company_id')
if not company_id:
return jsonify({'success': False, 'error': 'Brak company_id'}), 400
db = SessionLocal()
try:
company = db.query(Company).filter_by(
id=company_id, status='wizard_draft'
).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
# Clean up logo candidates
from logo_fetch_service import LogoFetchService
LogoFetchService.cleanup_candidates(company.slug)
_delete_draft_company(db, company.id)
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
logger.error(f"Wizard cancel error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()