nordabiz/blueprints/admin/routes_companies.py
Maciej Pienczyn e0bb6b718a
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: enhance data quality dashboard with filters, hints, weighted scores and contact scraping
- Add clickable field coverage bars to filter companies missing specific data
- Add quick-action buttons (Registry/SEO/GBP) per company in dashboard table
- Add stale data detection (>6 months) with yellow badges
- Implement weighted priority score (contacts 34%, audits 17%)
- Add data hints in admin company detail showing where to find missing data
- Add "Available data" section showing Google Business data ready to apply
- Add POST /api/company/<id>/apply-hint endpoint for one-click data fill
- Extend website content updater with phone/email extraction (AI + regex)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 07:25:39 +01:00

785 lines
28 KiB
Python

"""
Admin Routes - Companies
========================
CRUD operations for company management in admin panel.
"""
import os
import re
import csv
import logging
from io import StringIO
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash, jsonify, Response
from flask_login import login_required, current_user
from . import bp
from database import (
SessionLocal, Company, Category, User, Person, CompanyPerson, SystemRole,
CompanyWebsiteAnalysis, CompanySocialMedia, GBPAudit
)
from utils.decorators import role_required
from utils.data_quality import compute_data_quality_score, update_company_data_quality
# Logger
logger = logging.getLogger(__name__)
def validate_nip(nip: str) -> bool:
"""Validate Polish NIP number (10 digits, checksum)"""
if not nip or not re.match(r'^\d{10}$', nip):
return False
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
return checksum == int(nip[9])
# ============================================================
# COMPANIES ADMIN ROUTES
# ============================================================
@bp.route('/companies')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_companies():
"""Admin panel for company management"""
db = SessionLocal()
try:
# Get filter parameters
status_filter = request.args.get('status', 'all')
category_filter = request.args.get('category', '')
quality_filter = request.args.get('quality', '')
search_query = request.args.get('q', '').strip()
# Base query
query = db.query(Company)
# Apply filters
if status_filter and status_filter != 'all':
query = query.filter(Company.status == status_filter)
if category_filter:
query = query.filter(Company.category_id == int(category_filter))
if quality_filter:
query = query.filter(Company.data_quality == quality_filter)
if search_query:
search_pattern = f'%{search_query}%'
query = query.filter(
(Company.name.ilike(search_pattern)) |
(Company.nip.ilike(search_pattern))
)
# Order and fetch
companies = query.order_by(Company.name).all()
# Get categories for filter dropdown
categories = db.query(Category).order_by(Category.name).all()
# Statistics
total_companies = db.query(Company).count()
active_count = db.query(Company).filter(Company.status == 'active').count()
pending_count = db.query(Company).filter(Company.status == 'pending').count()
inactive_count = db.query(Company).filter(Company.status == 'inactive').count()
logger.info(f"Admin {current_user.email} accessed companies panel - {total_companies} companies")
return render_template(
'admin/companies.html',
companies=companies,
categories=categories,
total_companies=total_companies,
active_count=active_count,
pending_count=pending_count,
inactive_count=inactive_count,
current_status=status_filter,
current_category=category_filter,
current_quality=quality_filter,
search_query=search_query,
now=datetime.utcnow()
)
finally:
db.close()
@bp.route('/companies/add', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_add():
"""Create a new company"""
db = SessionLocal()
try:
data = request.get_json() or {}
name = data.get('name', '').strip()
if not name:
return jsonify({'success': False, 'error': 'Nazwa firmy jest wymagana'}), 400
nip = data.get('nip', '').strip().replace('-', '').replace(' ', '')
if nip:
if not validate_nip(nip):
return jsonify({'success': False, 'error': 'Nieprawidłowy NIP'}), 400
existing = db.query(Company).filter(Company.nip == nip).first()
if existing:
return jsonify({'success': False, 'error': f'Firma z NIP {nip} już istnieje'}), 400
# Generate slug from name
slug = re.sub(r'[^\w\s-]', '', name.lower())
slug = re.sub(r'[\s_]+', '-', slug)
slug = re.sub(r'-+', '-', slug).strip('-')
# Ensure unique slug
base_slug = slug
counter = 1
while db.query(Company).filter(Company.slug == slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
new_company = Company(
name=name,
slug=slug,
nip=nip if nip else None,
category_id=data.get('category_id') or None,
status=data.get('status', 'pending'),
email=data.get('email', '').strip() or None,
phone=data.get('phone', '').strip() or None,
address_city=data.get('address_city', '').strip() or None,
address_street=data.get('address_street', '').strip() or None,
address_postal=data.get('address_postal', '').strip() or None,
data_quality='basic'
)
db.add(new_company)
db.commit()
db.refresh(new_company)
update_company_data_quality(new_company, db)
db.commit()
logger.info(f"Admin {current_user.email} created new company: {name} (ID: {new_company.id})")
return jsonify({
'success': True,
'company_id': new_company.id,
'message': f'Firma "{name}" została utworzona'
})
except Exception as e:
db.rollback()
logger.error(f"Error creating company: {e}")
return jsonify({'success': False, 'error': 'Błąd podczas tworzenia firmy'}), 500
finally:
db.close()
@bp.route('/companies/<int:company_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_get(company_id):
"""Get company details (JSON)"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
return jsonify({
'success': True,
'company': {
'id': company.id,
'name': company.name,
'nip': company.nip,
'category_id': company.category_id,
'status': company.status,
'email': company.email,
'phone': company.phone,
'address_city': company.address_city,
'address_street': company.address_street,
'address_postal': company.address_postal,
'data_quality': company.data_quality,
'admin_notes': company.admin_notes
}
})
finally:
db.close()
@bp.route('/companies/<int:company_id>/update', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_update(company_id):
"""Update company data"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
data = request.get_json() or {}
if 'name' in data:
name = data['name'].strip()
if not name:
return jsonify({'success': False, 'error': 'Nazwa firmy jest wymagana'}), 400
company.name = name
if 'nip' in data:
nip = data['nip'].strip().replace('-', '').replace(' ', '') if data['nip'] else ''
if nip:
if not validate_nip(nip):
return jsonify({'success': False, 'error': 'Nieprawidłowy NIP'}), 400
existing = db.query(Company).filter(Company.nip == nip, Company.id != company_id).first()
if existing:
return jsonify({'success': False, 'error': f'Firma z NIP {nip} już istnieje'}), 400
company.nip = nip if nip else None
if 'category_id' in data:
company.category_id = data['category_id'] if data['category_id'] else None
if 'status' in data:
if data['status'] in ['active', 'pending', 'inactive', 'archived']:
company.status = data['status']
if 'email' in data:
company.email = data['email'].strip() if data['email'] else None
if 'phone' in data:
company.phone = data['phone'].strip() if data['phone'] else None
if 'address_city' in data:
company.address_city = data['address_city'].strip() if data['address_city'] else None
if 'address_street' in data:
company.address_street = data['address_street'].strip() if data['address_street'] else None
if 'address_postal' in data:
company.address_postal = data['address_postal'].strip() if data['address_postal'] else None
if 'admin_notes' in data:
company.admin_notes = data['admin_notes'].strip() if data['admin_notes'] else None
company.last_updated = datetime.utcnow()
db.commit()
update_company_data_quality(company, db)
db.commit()
logger.info(f"Admin {current_user.email} updated company {company.name} (ID: {company_id})")
return jsonify({
'success': True,
'message': 'Dane firmy zaktualizowane'
})
except Exception as e:
db.rollback()
logger.error(f"Error updating company {company_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/companies/<int:company_id>/toggle-status', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_toggle_status(company_id):
"""Toggle company status (active <-> inactive)"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
if company.status == 'active':
company.status = 'inactive'
else:
company.status = 'active'
company.last_updated = datetime.utcnow()
db.commit()
logger.info(f"Admin {current_user.email} toggled company {company.name} status to {company.status}")
return jsonify({
'success': True,
'status': company.status,
'message': f"Status zmieniony na {'aktywna' if company.status == 'active' else 'nieaktywna'}"
})
finally:
db.close()
@bp.route('/companies/<int:company_id>/delete', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_delete(company_id):
"""Soft delete company (set status to archived)"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
company.status = 'archived'
company.last_updated = datetime.utcnow()
db.commit()
logger.info(f"Admin {current_user.email} archived company {company.name} (ID: {company_id})")
return jsonify({
'success': True,
'message': f'Firma "{company.name}" została zarchiwizowana'
})
finally:
db.close()
@bp.route('/companies/<int:company_id>/hard-delete', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_company_hard_delete(company_id):
"""Permanently delete an archived company and all related data."""
from sqlalchemy import text
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
if company.status != 'archived':
return jsonify({
'success': False,
'error': 'Tylko zarchiwizowane firmy mogą być trwale usunięte. Najpierw zarchiwizuj firmę.'
}), 400
company_name = company.name
db.expunge(company)
# 1) Nullable FKs → SET NULL
nullable_fk_updates = [
"UPDATE users SET company_id = NULL WHERE company_id = :cid",
"UPDATE companies SET parent_company_id = NULL WHERE parent_company_id = :cid",
"UPDATE companies SET it_provider_company_id = NULL WHERE it_provider_company_id = :cid",
"UPDATE norda_events SET speaker_company_id = NULL WHERE speaker_company_id = :cid",
"UPDATE classifieds SET company_id = NULL WHERE company_id = :cid",
"UPDATE membership_applications SET company_id = NULL WHERE company_id = :cid",
"UPDATE zopk_knowledge_entities SET company_id = NULL WHERE company_id = :cid",
"UPDATE membership_fee_config SET company_id = NULL WHERE company_id = :cid",
"UPDATE ai_usage_logs SET company_id = NULL WHERE company_id = :cid",
]
# 2) NOT NULL FKs without CASCADE → DELETE records
not_null_fk_deletes = [
"DELETE FROM company_services WHERE company_id = :cid",
"DELETE FROM company_competencies WHERE company_id = :cid",
"DELETE FROM certifications WHERE company_id = :cid",
"DELETE FROM awards WHERE company_id = :cid",
"DELETE FROM company_events WHERE company_id = :cid",
"DELETE FROM company_digital_maturity WHERE company_id = :cid",
"DELETE FROM company_website_analysis WHERE company_id = :cid",
"DELETE FROM company_quality_tracking WHERE company_id = :cid",
"DELETE FROM company_website_content WHERE company_id = :cid",
"DELETE FROM company_ai_insights WHERE company_id = :cid",
"DELETE FROM ai_enrichment_proposals WHERE company_id = :cid",
"DELETE FROM maturity_assessments WHERE company_id = :cid",
]
# 3) CASCADE tables (auto-handled by DB, but explicit for safety)
cascade_fk_deletes = [
"DELETE FROM user_company_permissions WHERE company_id = :cid",
"DELETE FROM company_contacts WHERE company_id = :cid",
"DELETE FROM company_social_media WHERE company_id = :cid",
"DELETE FROM company_recommendations WHERE company_id = :cid",
"DELETE FROM gbp_audits WHERE company_id = :cid",
"DELETE FROM it_audits WHERE company_id = :cid",
"DELETE FROM it_collaboration_matches WHERE company_a_id = :cid OR company_b_id = :cid",
"DELETE FROM membership_fees WHERE company_id = :cid",
"DELETE FROM zopk_company_links WHERE company_id = :cid",
"DELETE FROM company_people WHERE company_id = :cid",
"DELETE FROM krs_audits WHERE company_id = :cid",
"DELETE FROM company_pkd WHERE company_id = :cid",
"DELETE FROM company_financial_reports WHERE company_id = :cid",
]
for sql in nullable_fk_updates + not_null_fk_deletes + cascade_fk_deletes:
try:
db.execute(text(sql), {"cid": company_id})
except Exception:
pass # Table may not exist yet
db.execute(text("DELETE FROM companies WHERE id = :cid"), {"cid": company_id})
db.commit()
logger.info(f"Admin {current_user.email} permanently deleted company {company_name} (ID: {company_id})")
return jsonify({
'success': True,
'message': f'Firma "{company_name}" została trwale usunięta'
})
except Exception as e:
db.rollback()
logger.error(f"Error permanently deleting company {company_id}: {e}")
return jsonify({'success': False, 'error': f'Błąd: {str(e)}'}), 500
finally:
db.close()
@bp.route('/companies/<int:company_id>/assign-user', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_assign_user(company_id):
"""Assign a user to a company"""
db = SessionLocal()
try:
data = request.get_json() or {}
user_id = data.get('user_id')
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
if user_id:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie istnieje'}), 404
user.company_id = company_id
db.commit()
logger.info(f"Admin {current_user.email} assigned user {user.email} to company {company.name}")
return jsonify({
'success': True,
'message': f'Użytkownik {user.email} przypisany do {company.name}'
})
else:
return jsonify({'success': False, 'error': 'Nie podano ID użytkownika'}), 400
finally:
db.close()
@bp.route('/companies/<int:company_id>/people')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_people(company_id):
"""Get people associated with a company"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
# Get CompanyPerson relationships
people_roles = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company_id
).all()
people_list = []
for cp in people_roles:
person = cp.person
people_list.append({
'id': cp.id,
'person_id': person.id,
'imiona': person.imiona,
'nazwisko': person.nazwisko,
'pesel_masked': f"{person.pesel[:4]}******" if person.pesel else None,
'role': cp.role,
'role_category': cp.role_category,
'shares_percent': float(cp.shares_percent) if cp.shares_percent else None
})
return jsonify({
'success': True,
'company_name': company.name,
'people': people_list
})
finally:
db.close()
@bp.route('/companies/<int:company_id>/unassign-user', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_unassign_user(company_id):
"""Unassign a user from a company"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
data = request.get_json() or {}
user_id = data.get('user_id')
if not user_id:
return jsonify({'success': False, 'error': 'Nie podano ID użytkownika'}), 400
user = db.query(User).filter(User.id == user_id, User.company_id == company_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie jest przypisany do tej firmy'}), 404
user.company_id = None
db.commit()
logger.info(f"Admin {current_user.email} unassigned user {user.email} from company {company.name}")
return jsonify({
'success': True,
'message': f'Użytkownik {user.email} odpięty od {company.name}'
})
finally:
db.close()
@bp.route('/companies/<int:company_id>/users')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_users(company_id):
"""Get users assigned to a company"""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
users = db.query(User).filter(User.company_id == company_id).all()
users_list = [{
'id': u.id,
'name': u.name,
'email': u.email,
'role': u.role,
'is_verified': u.is_verified
} for u in users]
return jsonify({
'success': True,
'company_name': company.name,
'users': users_list
})
finally:
db.close()
@bp.route('/companies/export')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_companies_export():
"""Export companies to CSV"""
db = SessionLocal()
try:
companies = db.query(Company).order_by(Company.name).all()
output = StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
'ID', 'Nazwa', 'NIP', 'Kategoria', 'Status',
'Email', 'Telefon', 'Miasto', 'Ulica', 'Kod pocztowy',
'Jakość danych', 'Data utworzenia'
])
# Data rows
for c in companies:
writer.writerow([
c.id,
c.name,
c.nip or '',
c.category.name if c.category else '',
c.status or '',
c.email or '',
c.phone or '',
c.address_city or '',
c.address_street or '',
c.address_postal or '',
c.data_quality or '',
c.created_at.strftime('%Y-%m-%d') if c.created_at else ''
])
output.seek(0)
logger.info(f"Admin {current_user.email} exported {len(companies)} companies to CSV")
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=companies_{datetime.now().strftime("%Y%m%d")}.csv'
}
)
finally:
db.close()
@bp.route('/companies/<int:company_id>/settings')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def company_settings(company_id):
"""Company settings page with OAuth integrations UI."""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
flash('Firma nie istnieje', 'error')
return redirect(url_for('admin.admin_companies'))
from oauth_service import OAuthService
oauth = OAuthService()
connections = oauth.get_connected_services(db, company_id)
# Check if OAuth credentials are configured
oauth_available = {
'google': bool(oauth.google_client_id),
'meta': bool(oauth.meta_app_id),
}
return render_template(
'admin/company_settings.html',
company=company,
connections=connections,
oauth_available=oauth_available,
)
finally:
db.close()
@bp.route('/companies/<int:company_id>/detail')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_company_detail(company_id):
"""Admin company detail page with enrichment status and completeness score."""
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
flash('Firma nie istnieje', 'error')
return redirect(url_for('admin.admin_companies'))
# Users assigned to this company
users = db.query(User).filter(User.company_id == company_id).all()
# --- Enrichment status ---
# Registry data
registry_done = bool(company.ceidg_fetched_at or company.krs_fetched_at)
registry_source = None
registry_date = None
if company.krs_fetched_at:
registry_source = 'KRS'
registry_date = company.krs_fetched_at
elif company.ceidg_fetched_at:
registry_source = 'CEIDG'
registry_date = company.ceidg_fetched_at
# Logo check (webp or svg)
logo_exists = False
logo_ext = None
for _ext in ('webp', 'svg'):
if os.path.isfile(os.path.join('static', 'img', 'companies', f'{company.slug}.{_ext}')):
logo_exists = True
logo_ext = _ext
break
# SEO - latest website analysis
seo_analysis = db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company_id
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
# Social media count and latest date
social_accounts = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id
).order_by(CompanySocialMedia.verified_at.desc()).all()
social_count = len(social_accounts)
social_latest_date = social_accounts[0].verified_at if social_accounts else None
# GBP - latest audit
gbp_audit = db.query(GBPAudit).filter(
GBPAudit.company_id == company_id
).order_by(GBPAudit.audit_date.desc()).first()
registry_stale = registry_done and registry_date and (datetime.now() - registry_date).days > 180
enrichment = {
'registry': {
'done': registry_done,
'source': registry_source,
'date': registry_date,
'has_krs': bool(company.krs),
'has_nip': bool(company.nip),
'stale': registry_stale,
},
'logo': {
'done': logo_exists,
'path': f'/static/img/companies/{company.slug}.{logo_ext}' if logo_exists else None,
},
'seo': {
'done': seo_analysis is not None,
'date': seo_analysis.analyzed_at if seo_analysis else None,
'score': seo_analysis.seo_overall_score if seo_analysis else None,
},
'social': {
'done': social_count > 0,
'count': social_count,
'date': social_latest_date,
},
'gbp': {
'done': gbp_audit is not None,
'date': gbp_audit.audit_date if gbp_audit else None,
'score': gbp_audit.completeness_score if gbp_audit else None,
},
}
# --- Completeness score (12 fields) ---
completeness = compute_data_quality_score(company, db)
# --- Hints: where to find missing data ---
hints = {}
analysis = seo_analysis # CompanyWebsiteAnalysis object or None
if not company.phone:
if analysis and analysis.google_phone:
hints['Telefon'] = {'source': 'Google Business', 'value': analysis.google_phone, 'action': 'apply'}
elif analysis and analysis.nap_on_website:
nap = analysis.nap_on_website if isinstance(analysis.nap_on_website, dict) else {}
if nap.get('phone'):
hints['Telefon'] = {'source': 'Strona WWW (NAP)', 'value': nap['phone'], 'action': 'apply'}
elif company.nip:
hints['Telefon'] = {'source': 'CEIDG/KRS', 'value': None, 'action': 'fetch_registry'}
if not company.email:
if analysis and analysis.nap_on_website:
nap = analysis.nap_on_website if isinstance(analysis.nap_on_website, dict) else {}
if nap.get('email'):
hints['Email'] = {'source': 'Strona WWW (NAP)', 'value': nap['email'], 'action': 'apply'}
if not company.website:
if analysis and analysis.google_website:
hints['Strona WWW'] = {'source': 'Google Business', 'value': analysis.google_website, 'action': 'apply'}
if not company.address_city:
if analysis and analysis.google_address:
hints['Adres'] = {'source': 'Google Business', 'value': analysis.google_address, 'action': 'apply'}
if not company.description_short:
if analysis and analysis.content_summary:
hints['Opis'] = {'source': 'Analiza strony WWW', 'value': analysis.content_summary[:200], 'action': 'apply'}
logger.info(f"Admin {current_user.email} viewed company detail: {company.name} (ID: {company_id})")
return render_template(
'admin/company_detail.html',
company=company,
enrichment=enrichment,
completeness=completeness,
users=users,
hints=hints,
)
finally:
db.close()