nordabiz/blueprints/admin/routes_audits.py
Maciej Pienczyn 1d39c9190a
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: add review-before-save workflow to GBP batch audit
Batch audit now collects changes without saving to DB. Admin must
review before/after differences and approve or discard. Mirrors the
existing social audit enrichment review pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 12:43:33 +01:00

1531 lines
59 KiB
Python

"""
Admin Audit Routes
==================
SEO and GBP audit dashboards for admin panel.
"""
import fcntl
import json
import logging
import os
import tempfile
import threading
from datetime import datetime
from flask import abort, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from database import (
SessionLocal, Company, Category, CompanyWebsiteAnalysis, GBPAudit,
CompanyDigitalMaturity, KRSAudit, CompanyPKD, CompanyPerson,
ITAudit, ITCollaborationMatch, SystemRole
)
from utils.decorators import role_required, is_audit_owner
logger = logging.getLogger(__name__)
# ============================================================
# GBP BATCH AUDIT STATE (shared file for multi-worker safety)
# ============================================================
_GBP_BATCH_STATE_FILE = os.path.join(tempfile.gettempdir(), 'nordabiz_gbp_batch_state.json')
_GBP_BATCH_DEFAULT = {
'running': False,
'progress': 0,
'total': 0,
'completed': 0,
'errors': 0,
'results': [],
'pending_changes': [],
'approved': False,
}
def _read_gbp_batch_state():
try:
with open(_GBP_BATCH_STATE_FILE, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
data = json.load(f)
fcntl.flock(f, fcntl.LOCK_UN)
return data
except (FileNotFoundError, json.JSONDecodeError, IOError):
return dict(_GBP_BATCH_DEFAULT)
def _write_gbp_batch_state(state):
try:
tmp_path = _GBP_BATCH_STATE_FILE + '.tmp'
with open(tmp_path, 'w') as f:
fcntl.flock(f, fcntl.LOCK_EX)
json.dump(state, f, default=str)
fcntl.flock(f, fcntl.LOCK_UN)
os.replace(tmp_path, _GBP_BATCH_STATE_FILE)
except IOError as e:
logger.error(f"Failed to write GBP batch state: {e}")
def _get_old_audit(db, company_id):
"""Get latest existing GBP audit for comparison."""
from sqlalchemy import func
audit = (
db.query(GBPAudit)
.filter_by(company_id=company_id)
.order_by(GBPAudit.audit_date.desc())
.first()
)
if not audit:
return None
return {
'score': audit.completeness_score,
'rating': float(audit.average_rating) if audit.average_rating else None,
'review_count': audit.review_count or 0,
'photo_count': audit.photo_count or 0,
'has_name': audit.has_name,
'has_address': audit.has_address,
'has_phone': audit.has_phone,
'has_website': audit.has_website,
'has_hours': audit.has_hours,
'has_categories': audit.has_categories,
'has_photos': audit.has_photos,
'has_description': audit.has_description,
'has_services': audit.has_services,
'has_reviews': audit.has_reviews,
'audit_date': str(audit.audit_date) if audit.audit_date else None,
}
_GBP_FIELD_LABELS = {
'has_name': 'Nazwa firmy',
'has_address': 'Adres',
'has_phone': 'Telefon',
'has_website': 'Strona WWW',
'has_hours': 'Godziny otwarcia',
'has_categories': 'Kategorie',
'has_photos': 'Zdjęcia',
'has_description': 'Opis',
'has_services': 'Usługi',
'has_reviews': 'Opinie',
}
def _run_gbp_batch_background(company_ids, fetch_google):
"""Background thread: audit all companies, collect changes without saving."""
from gbp_audit_service import GBPAuditService, fetch_google_business_data
db = SessionLocal()
try:
service = GBPAuditService(db)
state = _read_gbp_batch_state()
for i, company_id in enumerate(company_ids):
company_name = '?'
try:
company = db.get(Company, company_id)
company_name = company.name if company else f'ID {company_id}'
# Get old audit for comparison
old_audit = _get_old_audit(db, company_id)
if fetch_google:
fetch_google_business_data(db, company_id, force_refresh=True)
result = service.audit_company(company_id)
# DO NOT save — collect for review
# Build new audit data
new_audit = {
'score': result.completeness_score,
'rating': float(result.average_rating) if result.average_rating else None,
'review_count': result.review_count or 0,
'photo_count': result.photo_count or 0,
}
for field_key in _GBP_FIELD_LABELS:
field_name = field_key.replace('has_', '')
field_status = result.fields.get(field_name)
new_audit[field_key] = field_status.status in ('complete', 'partial') if field_status else False
# Detect changes
changes = []
if old_audit:
if old_audit['score'] != new_audit['score']:
changes.append({
'field': 'score', 'label': 'Kompletność',
'old': f"{old_audit['score']}%" if old_audit['score'] is not None else 'brak',
'new': f"{new_audit['score']}%",
})
if old_audit['rating'] != new_audit['rating']:
changes.append({
'field': 'rating', 'label': 'Ocena Google',
'old': str(old_audit['rating'] or '-'),
'new': str(new_audit['rating'] or '-'),
})
if old_audit['review_count'] != new_audit['review_count']:
changes.append({
'field': 'review_count', 'label': 'Liczba opinii',
'old': str(old_audit['review_count']),
'new': str(new_audit['review_count']),
})
if old_audit['photo_count'] != new_audit['photo_count']:
changes.append({
'field': 'photo_count', 'label': 'Liczba zdjęć',
'old': str(old_audit['photo_count']),
'new': str(new_audit['photo_count']),
})
for field_key, label in _GBP_FIELD_LABELS.items():
old_val = old_audit.get(field_key)
new_val = new_audit.get(field_key)
if old_val != new_val:
changes.append({
'field': field_key, 'label': label,
'old': 'Tak' if old_val else 'Nie',
'new': 'Tak' if new_val else 'Nie',
})
else:
# First audit — mark as new
changes.append({
'field': 'score', 'label': 'Kompletność',
'old': 'brak audytu',
'new': f"{new_audit['score']}%",
})
has_changes = len(changes) > 0
# Store pending audit result for later save
# Serialize the AuditResult fields we need to reconstruct it
pending_entry = {
'company_id': company_id,
'company_name': company_name,
'old_score': old_audit['score'] if old_audit else None,
'new_score': new_audit['score'],
'changes': changes,
'has_changes': has_changes,
# Store full result data for saving on approve
'audit_data': {
'completeness_score': result.completeness_score,
'review_count': result.review_count,
'average_rating': float(result.average_rating) if result.average_rating else None,
'photo_count': result.photo_count,
'logo_present': result.logo_present,
'cover_photo_present': result.cover_photo_present,
'google_place_id': result.google_place_id,
'google_maps_url': result.google_maps_url,
'has_name': new_audit['has_name'],
'has_address': new_audit['has_address'],
'has_phone': new_audit['has_phone'],
'has_website': new_audit['has_website'],
'has_hours': new_audit['has_hours'],
'has_categories': new_audit['has_categories'],
'has_photos': new_audit['has_photos'],
'has_description': new_audit['has_description'],
'has_services': new_audit['has_services'],
'has_reviews': new_audit['has_reviews'],
},
}
state['pending_changes'].append(pending_entry)
state['results'].append({
'company_id': company_id,
'company_name': company_name,
'score': result.completeness_score,
'old_score': old_audit['score'] if old_audit else None,
'status': 'changes' if has_changes else 'no_changes',
'changes_count': len(changes),
})
except Exception as e:
logger.error(f"GBP batch audit failed for company {company_id}: {e}")
state['errors'] += 1
state['results'].append({
'company_id': company_id,
'company_name': company_name,
'score': None,
'status': 'error',
'error': str(e)[:100],
})
state['completed'] = i + 1
state['progress'] = round((i + 1) / state['total'] * 100)
_write_gbp_batch_state(state)
state['running'] = False
_write_gbp_batch_state(state)
except Exception as e:
logger.error(f"GBP batch audit thread crashed: {e}")
state = _read_gbp_batch_state()
state['running'] = False
_write_gbp_batch_state(state)
finally:
db.close()
# ============================================================
# SEO ADMIN DASHBOARD
# ============================================================
@bp.route('/seo')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_seo():
"""
Admin dashboard for SEO metrics overview.
Displays:
- Summary stats (score distribution, average score)
- Sortable table of all companies with SEO scores
- Color-coded score badges (green 90-100, yellow 50-89, red 0-49)
- Filtering by category, score range, and search text
- Last audit date with staleness indicator
- Actions: view profile, trigger single company audit
Query Parameters:
- company: Slug of company to highlight/filter (optional)
"""
if not is_audit_owner():
abort(404)
# Get optional company filter from URL
filter_company_slug = request.args.get('company', '')
db = SessionLocal()
try:
from sqlalchemy import func
# Get all active companies with their latest SEO analysis data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
CompanyWebsiteAnalysis.pagespeed_seo_score,
CompanyWebsiteAnalysis.pagespeed_performance_score,
CompanyWebsiteAnalysis.pagespeed_accessibility_score,
CompanyWebsiteAnalysis.pagespeed_best_practices_score,
CompanyWebsiteAnalysis.seo_audited_at
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
CompanyWebsiteAnalysis,
Company.id == CompanyWebsiteAnalysis.company_id
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'seo_score': row.pagespeed_seo_score,
'performance_score': row.pagespeed_performance_score,
'accessibility_score': row.pagespeed_accessibility_score,
'best_practices_score': row.pagespeed_best_practices_score,
'seo_audited_at': row.seo_audited_at
})
# Calculate statistics
audited_companies = [c for c in companies if c['seo_score'] is not None]
not_audited = [c for c in companies if c['seo_score'] is None]
good_count = len([c for c in audited_companies if c['seo_score'] >= 90])
medium_count = len([c for c in audited_companies if 50 <= c['seo_score'] < 90])
poor_count = len([c for c in audited_companies if c['seo_score'] < 50])
not_audited_count = len(not_audited)
# Calculate average score (only for audited companies)
if audited_companies:
avg_score = round(sum(c['seo_score'] for c in audited_companies) / len(audited_companies))
else:
avg_score = None
stats = {
'good_count': good_count,
'medium_count': medium_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_score': avg_score
}
# Get unique categories for filter dropdown
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin_seo_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now(),
filter_company=filter_company_slug
)
finally:
db.close()
@bp.route('/seo/<int:company_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_seo_detail(company_id):
"""Detailed SEO audit view for a single company."""
if not is_audit_owner():
abort(404)
db = SessionLocal()
try:
from sqlalchemy import func
company = db.query(Company).filter_by(id=company_id).first()
if not company:
flash('Firma nie istnieje.', 'error')
return redirect(url_for('admin.admin_seo'))
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
company_id=company_id
).first()
# Build recommendations from available data
recommendations = []
if not analysis:
recommendations.append({
'severity': 'info',
'text': 'Brak danych audytu. Uruchom audyt SEO dla tej firmy.'
})
else:
# SEO score
if analysis.pagespeed_seo_score is not None:
if analysis.pagespeed_seo_score < 50:
recommendations.append({
'severity': 'critical',
'text': 'Wynik SEO jest bardzo niski. Strona ma powazne problemy z optymalizacja pod wyszukiwarki.'
})
elif analysis.pagespeed_seo_score < 90:
recommendations.append({
'severity': 'warning',
'text': 'Wynik SEO jest sredni. Sa elementy do poprawy w optymalizacji pod wyszukiwarki.'
})
# Performance
if analysis.pagespeed_performance_score is not None:
if analysis.pagespeed_performance_score < 50:
recommendations.append({
'severity': 'critical',
'text': 'Strona laduje sie bardzo wolno. Uzytkownicy moga ja opuszczac zanim sie zaladuje.'
})
elif analysis.pagespeed_performance_score < 90:
recommendations.append({
'severity': 'warning',
'text': 'Szybkosc strony jest srednia. Mozna poprawic czas ladowania.'
})
# Meta tags
if not analysis.seo_title and not analysis.meta_title:
recommendations.append({
'severity': 'critical',
'text': 'Brak tytulu strony (meta title). Wyszukiwarki nie wiedza, czego dotyczy strona.'
})
if not analysis.seo_description and not analysis.meta_description:
recommendations.append({
'severity': 'critical',
'text': 'Brak opisu strony (meta description). To tekst wyswietlany w wynikach Google.'
})
# SSL
if analysis.has_ssl is False:
recommendations.append({
'severity': 'critical',
'text': 'Brak certyfikatu SSL (HTTPS). Przegladarki oznaczaja strone jako niebezpieczna.'
})
# Sitemap & robots
if analysis.has_sitemap is False:
recommendations.append({
'severity': 'warning',
'text': 'Brak pliku sitemap.xml. Google moze nie odkryc wszystkich podstron.'
})
if analysis.has_robots_txt is False:
recommendations.append({
'severity': 'warning',
'text': 'Brak pliku robots.txt. Nie ma instrukcji dla robotow wyszukiwarek.'
})
# Structured data
if analysis.has_structured_data is False:
recommendations.append({
'severity': 'warning',
'text': 'Brak danych strukturalnych (Schema.org). Strona nie bedzie miala rozszerzonych wynikow w Google.'
})
# Images
if analysis.images_without_alt and analysis.images_without_alt > 0:
recommendations.append({
'severity': 'warning',
'text': f'{analysis.images_without_alt} obrazkow bez opisu (alt). Problem z dostepnoscia i SEO.'
})
# H1
if analysis.h1_count is not None and analysis.h1_count != 1:
if analysis.h1_count == 0:
recommendations.append({
'severity': 'warning',
'text': 'Brak naglowka H1. Strona powinna miec dokladnie jeden glowny naglowek.'
})
elif analysis.h1_count > 1:
recommendations.append({
'severity': 'info',
'text': f'Strona ma {analysis.h1_count} naglowkow H1 (powinien byc 1).'
})
# Open Graph
if analysis.has_og_tags is False:
recommendations.append({
'severity': 'info',
'text': 'Brak tagow Open Graph. Linki udostepnione na Facebooku nie beda mialy podgladu.'
})
# Canonical
if analysis.has_canonical is False:
recommendations.append({
'severity': 'info',
'text': 'Brak tagu canonical. Moze prowadzic do duplikacji tresci w wyszukiwarkach.'
})
# Accessibility
if analysis.pagespeed_accessibility_score is not None and analysis.pagespeed_accessibility_score < 50:
recommendations.append({
'severity': 'warning',
'text': 'Niska dostepnosc strony. Osoby z niepelnosprawnosciami moga miec problem z korzystaniem.'
})
# Mobile
if analysis.is_responsive is False and analysis.is_mobile_friendly is False:
recommendations.append({
'severity': 'critical',
'text': 'Strona nie jest dostosowana do urzadzen mobilnych. Google karze takie strony w wynikach.'
})
# Security headers
if analysis.security_headers_count is not None and analysis.security_headers_count < 2:
recommendations.append({
'severity': 'info',
'text': 'Brak waznych naglowkow bezpieczenstwa (HSTS, CSP). Strona jest slabiej zabezpieczona.'
})
# All good?
if not recommendations:
recommendations.append({
'severity': 'success',
'text': 'Strona wyglada dobrze! Nie znaleziono powaznych problemow.'
})
# Add issues from seo_issues JSONB if available
if analysis.seo_issues:
for issue in analysis.seo_issues:
if isinstance(issue, dict):
recommendations.append({
'severity': issue.get('severity', 'info'),
'text': issue.get('message', issue.get('text', str(issue)))
})
# IP geolocation lookup for hosting details
ip_info = {}
if analysis and analysis.hosting_ip:
try:
import requests as req
resp = req.get(
f'http://ip-api.com/json/{analysis.hosting_ip}?fields=status,org,isp,city,regionName,country,as',
timeout=3
)
if resp.status_code == 200:
data = resp.json()
if data.get('status') == 'success':
ip_info = {
'org': data.get('org', ''),
'isp': data.get('isp', ''),
'city': data.get('city', ''),
'region': data.get('regionName', ''),
'country': data.get('country', ''),
'as_number': data.get('as', ''),
}
except Exception:
pass
# Category benchmarks — average scores in same category
benchmarks = {}
if company.category_id and analysis:
try:
from sqlalchemy import func as sqlfunc
cat_stats = db.query(
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_seo_score),
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_performance_score),
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_accessibility_score),
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_best_practices_score),
sqlfunc.count(CompanyWebsiteAnalysis.id),
).join(Company, Company.id == CompanyWebsiteAnalysis.company_id
).filter(
Company.category_id == company.category_id,
Company.status == 'active',
CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None),
).first()
if cat_stats and cat_stats[4] > 1:
from database import Category
cat_name = db.query(Category.name).filter_by(id=company.category_id).scalar()
benchmarks = {
'category_name': cat_name or '',
'count': cat_stats[4],
'avg_seo': round(float(cat_stats[0] or 0)),
'avg_performance': round(float(cat_stats[1] or 0)),
'avg_accessibility': round(float(cat_stats[2] or 0)),
'avg_best_practices': round(float(cat_stats[3] or 0)),
}
except Exception:
pass
# All-members benchmark
all_benchmarks = {}
if analysis:
try:
from sqlalchemy import func as sqlfunc
all_stats = db.query(
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_seo_score),
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_performance_score),
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_accessibility_score),
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_best_practices_score),
sqlfunc.count(CompanyWebsiteAnalysis.id),
).join(Company, Company.id == CompanyWebsiteAnalysis.company_id
).filter(
Company.status == 'active',
CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None),
).first()
if all_stats and all_stats[4] > 1:
all_benchmarks = {
'count': all_stats[4],
'avg_seo': round(float(all_stats[0] or 0)),
'avg_performance': round(float(all_stats[1] or 0)),
'avg_accessibility': round(float(all_stats[2] or 0)),
'avg_best_practices': round(float(all_stats[3] or 0)),
}
except Exception:
pass
# WHOIS domain expiry lookup via RDAP
domain_info = {}
if analysis and company.website:
try:
import re
import requests as req
domain_match = re.search(r'https?://(?:www\.)?([^/]+)', company.website)
if domain_match:
domain = domain_match.group(1)
# Map TLD to direct RDAP server (bypass rdap.org Cloudflare)
tld = domain.rsplit('.', 1)[-1].lower()
rdap_servers = {
'pl': 'https://rdap.dns.pl',
'com': 'https://rdap.verisign.com/com/v1',
'net': 'https://rdap.verisign.com/net/v1',
'org': 'https://rdap.org/domain', # fallback
'eu': 'https://rdap.eu/domain',
'de': 'https://rdap.denic.de',
}
rdap_base = rdap_servers.get(tld, f'https://rdap.org')
rdap_url = f'{rdap_base}/domain/{domain}'
resp = req.get(rdap_url, timeout=5, headers={
'Accept': 'application/rdap+json',
'User-Agent': 'NordaBiz-SEO-Audit/1.0'
})
if resp.status_code == 200:
rdap = resp.json()
for event in rdap.get('events', []):
action = event.get('eventAction')
date_str = event.get('eventDate', '')[:10]
if action == 'expiration':
domain_info['expires'] = date_str
elif action == 'registration':
domain_info['registered'] = date_str
elif action == 'last changed':
domain_info['updated'] = date_str
# Registrar from entities
for entity in rdap.get('entities', []):
if 'registrar' in entity.get('roles', []):
vcard = entity.get('vcardArray', [None, []])[1]
for item in (vcard or []):
if item[0] == 'fn':
domain_info['registrar'] = item[3]
break
domain_info['domain'] = domain
except Exception:
pass
return render_template('admin/seo_detail.html',
company=company,
analysis=analysis,
recommendations=recommendations,
ip_info=ip_info,
benchmarks=benchmarks,
all_benchmarks=all_benchmarks,
domain_info=domain_info
)
finally:
db.close()
# ============================================================
# GBP AUDIT ADMIN DASHBOARD
# ============================================================
@bp.route('/gbp-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_gbp_audit():
"""
Admin dashboard for GBP (Google Business Profile) audit overview.
Displays:
- Summary stats (completeness score distribution, field coverage)
- Sortable table of all companies with GBP audit data
- Review metrics (avg rating, review counts)
- Photo statistics
"""
if not is_audit_owner():
abort(404)
db = SessionLocal()
try:
from sqlalchemy import func
# Subquery to get latest audit for each company
latest_audit_subq = db.query(
GBPAudit.company_id,
func.max(GBPAudit.audit_date).label('max_date')
).group_by(GBPAudit.company_id).subquery()
# Get all companies with their latest GBP audit data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
GBPAudit.completeness_score,
GBPAudit.average_rating,
GBPAudit.review_count,
GBPAudit.photo_count,
GBPAudit.has_name,
GBPAudit.has_address,
GBPAudit.has_phone,
GBPAudit.has_website,
GBPAudit.has_hours,
GBPAudit.has_categories,
GBPAudit.has_photos,
GBPAudit.has_description,
GBPAudit.has_services,
GBPAudit.has_reviews,
GBPAudit.audit_date
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
latest_audit_subq,
Company.id == latest_audit_subq.c.company_id
).outerjoin(
GBPAudit,
(Company.id == GBPAudit.company_id) &
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
).filter(
Company.status == 'active'
).order_by(Company.name).all()
# Build companies list
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'completeness_score': row.completeness_score,
'average_rating': float(row.average_rating) if row.average_rating else None,
'review_count': row.review_count or 0,
'photo_count': row.photo_count or 0,
'has_name': row.has_name,
'has_address': row.has_address,
'has_phone': row.has_phone,
'has_website': row.has_website,
'has_hours': row.has_hours,
'has_categories': row.has_categories,
'has_photos': row.has_photos,
'has_description': row.has_description,
'has_services': row.has_services,
'has_reviews': row.has_reviews,
'audit_date': row.audit_date
})
# Calculate statistics
total_companies = len(companies)
audited = [c for c in companies if c['completeness_score'] is not None]
not_audited = [c for c in companies if c['completeness_score'] is None]
# Score distribution
excellent_count = len([c for c in audited if c['completeness_score'] >= 90])
good_count = len([c for c in audited if 70 <= c['completeness_score'] < 90])
poor_count = len([c for c in audited if c['completeness_score'] < 70])
not_audited_count = len(not_audited)
# Average completeness
avg_completeness = round(sum(c['completeness_score'] for c in audited) / len(audited)) if audited else None
# Average rating (only for companies with reviews)
companies_with_rating = [c for c in audited if c['average_rating']]
avg_rating = round(sum(c['average_rating'] for c in companies_with_rating) / len(companies_with_rating), 1) if companies_with_rating else None
# Total reviews
total_reviews = sum(c['review_count'] for c in companies)
# Field coverage stats (percentage of audited companies with each field)
if audited:
field_coverage = {
'name': round(len([c for c in audited if c['has_name']]) / len(audited) * 100),
'address': round(len([c for c in audited if c['has_address']]) / len(audited) * 100),
'phone': round(len([c for c in audited if c['has_phone']]) / len(audited) * 100),
'website': round(len([c for c in audited if c['has_website']]) / len(audited) * 100),
'hours': round(len([c for c in audited if c['has_hours']]) / len(audited) * 100),
'categories': round(len([c for c in audited if c['has_categories']]) / len(audited) * 100),
'photos': round(len([c for c in audited if c['has_photos']]) / len(audited) * 100),
'description': round(len([c for c in audited if c['has_description']]) / len(audited) * 100),
'services': round(len([c for c in audited if c['has_services']]) / len(audited) * 100),
'reviews': round(len([c for c in audited if c['has_reviews']]) / len(audited) * 100),
}
else:
field_coverage = {k: 0 for k in ['name', 'address', 'phone', 'website', 'hours', 'categories', 'photos', 'description', 'services', 'reviews']}
stats = {
'total_companies': total_companies,
'audited_count': len(audited),
'excellent_count': excellent_count,
'good_count': good_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_completeness': avg_completeness,
'avg_rating': avg_rating,
'total_reviews': total_reviews,
'field_coverage': field_coverage
}
# Get unique categories
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert to objects for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin/gbp_audit_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now()
)
finally:
db.close()
@bp.route('/gbp-audit/run-batch', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_gbp_audit_run_batch():
"""Start batch GBP audit for all active companies."""
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
state = _read_gbp_batch_state()
if state.get('running'):
return jsonify({
'error': 'Audyt już działa',
'progress': state.get('progress', 0),
'completed': state.get('completed', 0),
'total': state.get('total', 0),
}), 409
fetch_google = request.form.get('fetch_google', '0') == '1'
db = SessionLocal()
try:
company_ids = [c.id for c in db.query(Company.id).filter(Company.status == 'active').all()]
finally:
db.close()
if not company_ids:
return jsonify({'error': 'Brak aktywnych firm'}), 400
_write_gbp_batch_state({
'running': True,
'progress': 0,
'total': len(company_ids),
'completed': 0,
'errors': 0,
'results': [],
'pending_changes': [],
'approved': False,
})
thread = threading.Thread(
target=_run_gbp_batch_background,
args=(company_ids, fetch_google),
daemon=True
)
thread.start()
return jsonify({
'status': 'started',
'total': len(company_ids),
'message': f'Rozpoczęto audyt GBP dla {len(company_ids)} firm.',
})
@bp.route('/gbp-audit/batch-status')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_gbp_audit_batch_status():
"""Get current batch GBP audit status with live results feed."""
state = _read_gbp_batch_state()
results = state.get('results', [])
since = request.args.get('since', 0, type=int)
new_results = results[since:]
return jsonify({
'running': state.get('running', False),
'progress': state.get('progress', 0),
'completed': state.get('completed', 0),
'total': state.get('total', 0),
'errors': state.get('errors', 0),
'results': new_results,
'results_total': len(results),
'pending_count': len(state.get('pending_changes', [])),
})
@bp.route('/gbp-audit/batch-review')
@login_required
@role_required(SystemRole.ADMIN)
def admin_gbp_audit_batch_review():
"""Review pending GBP batch audit changes before saving."""
if not is_audit_owner():
abort(404)
state = _read_gbp_batch_state()
if state.get('running'):
flash('Audyt jeszcze trwa. Poczekaj na zakończenie.', 'warning')
return redirect(url_for('admin.admin_gbp_audit'))
pending = state.get('pending_changes', [])
results = state.get('results', [])
approved = state.get('approved', False)
# Summary stats
with_changes = [p for p in pending if p.get('has_changes')]
without_changes = [p for p in pending if not p.get('has_changes')]
errors = [r for r in results if r.get('status') == 'error']
summary = {
'total_companies': len(pending) + len(errors),
'with_changes': len(with_changes),
'without_changes': len(without_changes),
'errors': len(errors),
}
return render_template('admin/gbp_audit_batch_review.html',
pending=with_changes,
no_changes=without_changes,
errors=errors,
summary=summary,
approved=approved,
)
@bp.route('/gbp-audit/batch-approve', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_gbp_audit_batch_approve():
"""Approve and save pending GBP audit results to database."""
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
state = _read_gbp_batch_state()
if state.get('running'):
return jsonify({'error': 'Audyt jeszcze trwa'}), 409
if state.get('approved'):
return jsonify({'error': 'Wyniki już zostały zatwierdzone'}), 409
pending = state.get('pending_changes', [])
if not pending:
return jsonify({'error': 'Brak wyników do zatwierdzenia'}), 400
db = SessionLocal()
applied = 0
errors = 0
try:
for entry in pending:
try:
data = entry['audit_data']
audit = GBPAudit(
company_id=entry['company_id'],
completeness_score=data['completeness_score'],
review_count=data.get('review_count', 0),
average_rating=data.get('average_rating'),
photo_count=data.get('photo_count', 0),
logo_present=data.get('logo_present', False),
cover_photo_present=data.get('cover_photo_present', False),
google_place_id=data.get('google_place_id'),
google_maps_url=data.get('google_maps_url'),
has_name=data.get('has_name', False),
has_address=data.get('has_address', False),
has_phone=data.get('has_phone', False),
has_website=data.get('has_website', False),
has_hours=data.get('has_hours', False),
has_categories=data.get('has_categories', False),
has_photos=data.get('has_photos', False),
has_description=data.get('has_description', False),
has_services=data.get('has_services', False),
has_reviews=data.get('has_reviews', False),
source='automated',
audit_date=datetime.now(),
)
db.add(audit)
applied += 1
except Exception as e:
logger.error(f"Failed to save GBP audit for company {entry.get('company_id')}: {e}")
errors += 1
db.commit()
# Mark as approved in state
state['approved'] = True
_write_gbp_batch_state(state)
return jsonify({
'status': 'approved',
'applied': applied,
'errors': errors,
'message': f'Zapisano {applied} audytów do bazy danych.',
})
except Exception as e:
db.rollback()
logger.error(f"GBP batch approve failed: {e}")
return jsonify({'error': f'Błąd zapisu: {str(e)[:100]}'}), 500
finally:
db.close()
@bp.route('/gbp-audit/batch-discard', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_gbp_audit_batch_discard():
"""Discard pending GBP batch audit results without saving."""
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
state = _read_gbp_batch_state()
count = len(state.get('pending_changes', []))
_write_gbp_batch_state(dict(_GBP_BATCH_DEFAULT))
return jsonify({
'status': 'discarded',
'count': count,
'message': f'Odrzucono {count} wyników audytu.',
})
# ============================================================
# DIGITAL MATURITY DASHBOARD
# ============================================================
@bp.route('/digital-maturity')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def digital_maturity_dashboard():
"""Admin dashboard for digital maturity assessment results"""
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get all companies with maturity data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
CompanyDigitalMaturity.overall_score,
CompanyDigitalMaturity.online_presence_score,
CompanyDigitalMaturity.sales_readiness,
CompanyDigitalMaturity.total_opportunity_value,
CompanyWebsiteAnalysis.opportunity_score,
CompanyWebsiteAnalysis.has_blog,
CompanyWebsiteAnalysis.has_portfolio,
CompanyWebsiteAnalysis.has_contact_form,
CompanyWebsiteAnalysis.content_richness_score,
CompanyDigitalMaturity.critical_gaps,
CompanyWebsiteAnalysis.missing_features
).join(
CompanyDigitalMaturity, Company.id == CompanyDigitalMaturity.company_id
).join(
CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id
).filter(
CompanyDigitalMaturity.overall_score > 0
).order_by(
desc(CompanyDigitalMaturity.overall_score)
).all()
# Calculate stats
total_analyzed = len(companies_query)
avg_score = round(sum(c.overall_score for c in companies_query) / total_analyzed, 1) if total_analyzed else 0
total_opportunity = sum(float(c.total_opportunity_value or 0) for c in companies_query)
warm_leads = [c for c in companies_query if c.sales_readiness == 'warm']
cold_leads = [c for c in companies_query if c.sales_readiness == 'cold']
# Top 10 and bottom 10
top_performers = companies_query[:10]
bottom_performers = sorted(companies_query, key=lambda c: c.overall_score)[:10]
# Top opportunities
top_opportunities = sorted(
companies_query,
key=lambda c: float(c.total_opportunity_value or 0),
reverse=True
)[:10]
return render_template('admin/digital_maturity.html',
total_analyzed=total_analyzed,
avg_score=avg_score,
total_opportunity=total_opportunity,
warm_leads_count=len(warm_leads),
cold_leads_count=len(cold_leads),
top_performers=top_performers,
bottom_performers=bottom_performers,
top_opportunities=top_opportunities,
all_companies=companies_query
)
finally:
db.close()
# ============================================================
# KRS AUDIT DASHBOARD
# ============================================================
@bp.route('/krs-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_krs_audit():
"""
Admin dashboard for KRS (Krajowy Rejestr Sądowy) audit.
Displays:
- Summary stats (with KRS, audited count, data extraction status)
- List of companies with KRS numbers
- Audit progress and status for each company
- Links to source PDF files
"""
# Check if KRS audit service is available
try:
from krs_audit_service import KRS_AUDIT_AVAILABLE
except ImportError:
KRS_AUDIT_AVAILABLE = False
db = SessionLocal()
try:
from sqlalchemy import func
# Get all active companies with KRS numbers
companies_query = db.query(Company).filter(
Company.status == 'active',
Company.krs.isnot(None),
Company.krs != ''
).order_by(Company.name).all()
# Get latest audit for each company
companies = []
for company in companies_query:
# Get latest audit
latest_audit = db.query(KRSAudit).filter(
KRSAudit.company_id == company.id
).order_by(KRSAudit.audit_date.desc()).first()
# Get PKD codes (all)
pkd_codes = db.query(CompanyPKD).filter(
CompanyPKD.company_id == company.id
).order_by(CompanyPKD.is_primary.desc(), CompanyPKD.pkd_code).all()
pkd_count = len(pkd_codes)
# Get people count
people_count = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company.id
).count()
companies.append({
'id': company.id,
'name': company.name,
'slug': company.slug,
'krs': company.krs,
'nip': company.nip,
'capital_amount': company.capital_amount,
'krs_last_audit_at': company.krs_last_audit_at,
'krs_pdf_path': company.krs_pdf_path,
'audit': latest_audit,
'pkd_count': pkd_count,
'pkd_codes': [{
'code': pkd.pkd_code,
'description': pkd.pkd_description,
'is_primary': pkd.is_primary
} for pkd in pkd_codes],
'people_count': people_count,
'capital_shares_count': company.capital_shares_count
})
# Calculate stats
total_with_krs = len(companies)
audited_count = len([c for c in companies if c['krs_last_audit_at']])
not_audited_count = total_with_krs - audited_count
with_capital = len([c for c in companies if c['capital_amount']])
with_people = len([c for c in companies if c['people_count'] > 0])
with_pkd = len([c for c in companies if c['pkd_count'] > 0])
# Companies without KRS
no_krs_count = db.query(Company).filter(
Company.status == 'active',
(Company.krs.is_(None)) | (Company.krs == '')
).count()
stats = {
'total_with_krs': total_with_krs,
'audited_count': audited_count,
'not_audited_count': not_audited_count,
'no_krs_count': no_krs_count,
'with_capital': with_capital,
'with_people': with_people,
'with_pkd': with_pkd
}
return render_template('admin/krs_audit_dashboard.html',
companies=companies,
stats=stats,
krs_audit_available=KRS_AUDIT_AVAILABLE,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# IT AUDIT ADMIN DASHBOARD
# ============================================================
@bp.route('/it-audit')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_it_audit():
"""
Admin dashboard for IT audit overview.
Displays:
- Summary stats (audit count, average scores, maturity distribution)
- Technology adoption stats (Azure AD, M365, PBS, Zabbix, EDR, DR)
- Collaboration flags distribution
- Company table with IT audit data
- Collaboration matches matrix
Access: Office Manager and above
"""
if not is_audit_owner():
abort(404)
db = SessionLocal()
try:
from sqlalchemy import func
# Import IT audit service helper
try:
from it_audit_service import get_maturity_level_label
except ImportError:
def get_maturity_level_label(level):
labels = {
'basic': 'Podstawowy',
'developing': 'Rozwijający się',
'established': 'Ustalony',
'advanced': 'Zaawansowany'
}
return labels.get(level, level)
# Get all active companies with their latest IT audit
# Using subquery to get only the latest audit per company
latest_audit_subq = db.query(
ITAudit.company_id,
func.max(ITAudit.audit_date).label('max_date')
).group_by(ITAudit.company_id).subquery()
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
ITAudit.id.label('audit_id'),
ITAudit.overall_score,
ITAudit.security_score,
ITAudit.collaboration_score,
ITAudit.completeness_score,
ITAudit.maturity_level,
ITAudit.audit_date,
ITAudit.has_azure_ad,
ITAudit.has_m365,
ITAudit.has_proxmox_pbs,
ITAudit.monitoring_solution,
ITAudit.has_edr,
ITAudit.has_dr_plan
).outerjoin(
latest_audit_subq,
Company.id == latest_audit_subq.c.company_id
).outerjoin(
ITAudit,
(Company.id == ITAudit.company_id) &
(ITAudit.audit_date == latest_audit_subq.c.max_date)
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
# Detect Zabbix from monitoring_solution field
has_zabbix = row.monitoring_solution and 'zabbix' in str(row.monitoring_solution).lower()
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'audit_id': row.audit_id,
'overall_score': row.overall_score,
'security_score': row.security_score,
'collaboration_score': row.collaboration_score,
'completeness_score': row.completeness_score,
'maturity_level': row.maturity_level,
'maturity_label': get_maturity_level_label(row.maturity_level) if row.maturity_level else None,
'audit_date': row.audit_date,
'has_azure_ad': row.has_azure_ad,
'has_m365': row.has_m365,
'has_proxmox_pbs': row.has_proxmox_pbs,
'has_zabbix': has_zabbix,
'has_edr': row.has_edr,
'has_dr_plan': row.has_dr_plan
})
# Calculate statistics
audited_companies = [c for c in companies if c['overall_score'] is not None]
not_audited = [c for c in companies if c['overall_score'] is None]
# Maturity distribution
maturity_counts = {
'basic': 0,
'developing': 0,
'established': 0,
'advanced': 0
}
for c in audited_companies:
level = c['maturity_level']
if level in maturity_counts:
maturity_counts[level] += 1
# Calculate average scores
if audited_companies:
avg_overall = round(sum(c['overall_score'] for c in audited_companies) / len(audited_companies))
avg_security = round(sum(c['security_score'] or 0 for c in audited_companies) / len(audited_companies))
avg_collaboration = round(sum(c['collaboration_score'] or 0 for c in audited_companies) / len(audited_companies))
else:
avg_overall = None
avg_security = None
avg_collaboration = None
# Technology adoption stats
tech_stats = {
'azure_ad': len([c for c in audited_companies if c['has_azure_ad']]),
'm365': len([c for c in audited_companies if c['has_m365']]),
'proxmox_pbs': len([c for c in audited_companies if c['has_proxmox_pbs']]),
'zabbix': len([c for c in audited_companies if c['has_zabbix']]),
'edr': len([c for c in audited_companies if c['has_edr']]),
'dr_plan': len([c for c in audited_companies if c['has_dr_plan']])
}
# Collaboration flags stats from latest audits
collab_stats = {}
if audited_companies:
collab_flags = [
'open_to_shared_licensing',
'open_to_backup_replication',
'open_to_teams_federation',
'open_to_shared_monitoring',
'open_to_collective_purchasing',
'open_to_knowledge_sharing'
]
for flag in collab_flags:
count = db.query(func.count(ITAudit.id)).filter(
ITAudit.id.in_([c['audit_id'] for c in audited_companies if c['audit_id']]),
getattr(ITAudit, flag) == True
).scalar()
collab_stats[flag] = count
# Get collaboration matches with both companies' info
matches = db.query(ITCollaborationMatch).order_by(
ITCollaborationMatch.match_score.desc()
).all()
# Build flat list of collaboration matches with all necessary attributes
class CollabMatchRow:
"""Helper class for template attribute access"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
collaboration_matches = []
for match in matches:
# Get company A and B info
company_a = db.query(Company).filter(Company.id == match.company_a_id).first()
company_b = db.query(Company).filter(Company.id == match.company_b_id).first()
collaboration_matches.append(CollabMatchRow(
id=match.id,
match_type=match.match_type,
company_a_id=match.company_a_id,
company_a_name=company_a.name if company_a else 'Nieznana',
company_a_slug=company_a.slug if company_a else '',
company_b_id=match.company_b_id,
company_b_name=company_b.name if company_b else 'Nieznana',
company_b_slug=company_b.slug if company_b else '',
match_reason=match.match_reason,
match_score=match.match_score,
status=match.status,
created_at=match.created_at
))
stats = {
# Main stats
'total_audits': len(audited_companies),
'total_companies': len(companies),
'companies_without_audit': len(not_audited),
# Score averages
'avg_overall_score': avg_overall,
'avg_security_score': avg_security,
'avg_collaboration_score': avg_collaboration,
# Maturity distribution (flattened for template)
'maturity_basic': maturity_counts['basic'],
'maturity_developing': maturity_counts['developing'],
'maturity_established': maturity_counts['established'],
'maturity_advanced': maturity_counts['advanced'],
# Technology adoption stats (matching template naming with has_* prefix)
'has_azure_ad': tech_stats['azure_ad'],
'has_m365': tech_stats['m365'],
'has_proxmox_pbs': tech_stats['proxmox_pbs'],
'has_zabbix': tech_stats['zabbix'],
'has_edr': tech_stats['edr'],
'has_dr_plan': tech_stats['dr_plan'],
# Collaboration flags
'open_to_shared_licensing': collab_stats.get('open_to_shared_licensing', 0),
'open_to_backup_replication': collab_stats.get('open_to_backup_replication', 0),
'open_to_teams_federation': collab_stats.get('open_to_teams_federation', 0),
'open_to_shared_monitoring': collab_stats.get('open_to_shared_monitoring', 0),
'open_to_collective_purchasing': collab_stats.get('open_to_collective_purchasing', 0),
'open_to_knowledge_sharing': collab_stats.get('open_to_knowledge_sharing', 0),
# Legacy nested structures (for any templates that still use them)
'maturity_counts': maturity_counts,
'tech_stats': tech_stats,
'collab_stats': collab_stats,
'total_matches': len(collaboration_matches)
}
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin/it_audit_dashboard.html',
companies=companies_objects,
stats=stats,
collaboration_matches=collaboration_matches,
now=datetime.now()
)
finally:
db.close()
# ============================================================
# ACCESS OVERVIEW DASHBOARD
# ============================================================
@bp.route('/access-overview')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_access_overview():
"""Panel kontroli dostepu — kto widzi co."""
if not is_audit_owner():
abort(404)
from database import User
from sqlalchemy import func
db = SessionLocal()
try:
# All active users
all_users = db.query(User).filter(
User.is_active == True
).order_by(User.role.desc(), User.name).all()
# Role distribution
role_counts = {}
for user in all_users:
role = user.role or 'UNAFFILIATED'
role_counts[role] = role_counts.get(role, 0) + 1
# Rada Izby members
rada_members = [u for u in all_users if u.is_rada_member]
from utils.decorators import AUDIT_OWNER_EMAIL
return render_template('admin/access_overview.html',
all_users=all_users,
role_counts=role_counts,
rada_members=rada_members,
audit_owner_email=AUDIT_OWNER_EMAIL
)
finally:
db.close()