Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1666 lines
64 KiB
Python
1666 lines
64 KiB
Python
"""
|
|
Admin Audit Routes
|
|
==================
|
|
|
|
SEO and GBP audit dashboards for admin panel.
|
|
"""
|
|
|
|
import fcntl
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
from flask import abort, render_template, request, redirect, url_for, flash, jsonify
|
|
from flask_login import login_required, current_user
|
|
|
|
from . import bp
|
|
from database import (
|
|
SessionLocal, Company, Category, CompanyWebsiteAnalysis, GBPAudit,
|
|
CompanyDigitalMaturity, KRSAudit, CompanyPKD, CompanyPerson,
|
|
ITAudit, ITCollaborationMatch, SystemRole
|
|
)
|
|
from utils.decorators import role_required, is_audit_owner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================
|
|
# GBP BATCH AUDIT STATE (shared file for multi-worker safety)
|
|
# ============================================================
|
|
|
|
_GBP_BATCH_STATE_FILE = os.path.join(tempfile.gettempdir(), 'nordabiz_gbp_batch_state.json')
|
|
|
|
_GBP_BATCH_DEFAULT = {
|
|
'running': False,
|
|
'progress': 0,
|
|
'total': 0,
|
|
'completed': 0,
|
|
'errors': 0,
|
|
'results': [],
|
|
'pending_changes': [],
|
|
'approved': False,
|
|
}
|
|
|
|
|
|
def _read_gbp_batch_state():
|
|
try:
|
|
with open(_GBP_BATCH_STATE_FILE, 'r') as f:
|
|
fcntl.flock(f, fcntl.LOCK_SH)
|
|
data = json.load(f)
|
|
fcntl.flock(f, fcntl.LOCK_UN)
|
|
return data
|
|
except (FileNotFoundError, json.JSONDecodeError, IOError):
|
|
return dict(_GBP_BATCH_DEFAULT)
|
|
|
|
|
|
def _write_gbp_batch_state(state):
|
|
try:
|
|
tmp_path = _GBP_BATCH_STATE_FILE + '.tmp'
|
|
with open(tmp_path, 'w') as f:
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
json.dump(state, f, default=str)
|
|
fcntl.flock(f, fcntl.LOCK_UN)
|
|
os.replace(tmp_path, _GBP_BATCH_STATE_FILE)
|
|
except IOError as e:
|
|
logger.error(f"Failed to write GBP batch state: {e}")
|
|
|
|
|
|
def _get_old_audit(db, company_id):
|
|
"""Get latest existing GBP audit for comparison."""
|
|
from sqlalchemy import func
|
|
audit = (
|
|
db.query(GBPAudit)
|
|
.filter_by(company_id=company_id)
|
|
.order_by(GBPAudit.audit_date.desc())
|
|
.first()
|
|
)
|
|
if not audit:
|
|
return None
|
|
return {
|
|
'score': audit.completeness_score,
|
|
'rating': float(audit.average_rating) if audit.average_rating else None,
|
|
'review_count': audit.review_count or 0,
|
|
'photo_count': audit.photo_count or 0,
|
|
'has_name': audit.has_name,
|
|
'has_address': audit.has_address,
|
|
'has_phone': audit.has_phone,
|
|
'has_website': audit.has_website,
|
|
'has_hours': audit.has_hours,
|
|
'has_categories': audit.has_categories,
|
|
'has_photos': audit.has_photos,
|
|
'has_description': audit.has_description,
|
|
'has_services': audit.has_services,
|
|
'has_reviews': audit.has_reviews,
|
|
'audit_date': str(audit.audit_date) if audit.audit_date else None,
|
|
}
|
|
|
|
|
|
_GBP_FIELD_LABELS = {
|
|
'has_name': 'Nazwa firmy',
|
|
'has_address': 'Adres',
|
|
'has_phone': 'Telefon',
|
|
'has_website': 'Strona WWW',
|
|
'has_hours': 'Godziny otwarcia',
|
|
'has_categories': 'Kategorie',
|
|
'has_photos': 'Zdjęcia',
|
|
'has_description': 'Opis',
|
|
'has_services': 'Usługi',
|
|
'has_reviews': 'Opinie',
|
|
}
|
|
|
|
|
|
def _run_gbp_batch_background(company_ids, fetch_google):
|
|
"""Background thread: audit all companies, collect changes without saving."""
|
|
from gbp_audit_service import GBPAuditService, fetch_google_business_data
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
service = GBPAuditService(db)
|
|
state = _read_gbp_batch_state()
|
|
|
|
for i, company_id in enumerate(company_ids):
|
|
company_name = '?'
|
|
try:
|
|
company = db.get(Company, company_id)
|
|
company_name = company.name if company else f'ID {company_id}'
|
|
|
|
# Get old audit for comparison
|
|
old_audit = _get_old_audit(db, company_id)
|
|
|
|
if fetch_google:
|
|
fetch_google_business_data(db, company_id, force_refresh=True)
|
|
|
|
result = service.audit_company(company_id)
|
|
# DO NOT save — collect for review
|
|
|
|
# Build new audit data
|
|
new_audit = {
|
|
'score': result.completeness_score,
|
|
'rating': float(result.average_rating) if result.average_rating else None,
|
|
'review_count': result.review_count or 0,
|
|
'photo_count': result.photo_count or 0,
|
|
}
|
|
for field_key in _GBP_FIELD_LABELS:
|
|
field_name = field_key.replace('has_', '')
|
|
field_status = result.fields.get(field_name)
|
|
new_audit[field_key] = field_status.status in ('complete', 'partial') if field_status else False
|
|
|
|
# Detect changes
|
|
changes = []
|
|
if old_audit:
|
|
if old_audit['score'] != new_audit['score']:
|
|
changes.append({
|
|
'field': 'score', 'label': 'Kompletność',
|
|
'old': f"{old_audit['score']}%" if old_audit['score'] is not None else 'brak',
|
|
'new': f"{new_audit['score']}%",
|
|
})
|
|
if old_audit['rating'] != new_audit['rating']:
|
|
changes.append({
|
|
'field': 'rating', 'label': 'Ocena Google',
|
|
'old': str(old_audit['rating'] or '-'),
|
|
'new': str(new_audit['rating'] or '-'),
|
|
})
|
|
if old_audit['review_count'] != new_audit['review_count']:
|
|
changes.append({
|
|
'field': 'review_count', 'label': 'Liczba opinii',
|
|
'old': str(old_audit['review_count']),
|
|
'new': str(new_audit['review_count']),
|
|
})
|
|
if old_audit['photo_count'] != new_audit['photo_count']:
|
|
changes.append({
|
|
'field': 'photo_count', 'label': 'Liczba zdjęć',
|
|
'old': str(old_audit['photo_count']),
|
|
'new': str(new_audit['photo_count']),
|
|
})
|
|
for field_key, label in _GBP_FIELD_LABELS.items():
|
|
old_val = old_audit.get(field_key)
|
|
new_val = new_audit.get(field_key)
|
|
if old_val != new_val:
|
|
changes.append({
|
|
'field': field_key, 'label': label,
|
|
'old': 'Tak' if old_val else 'Nie',
|
|
'new': 'Tak' if new_val else 'Nie',
|
|
})
|
|
else:
|
|
# First audit — mark as new
|
|
changes.append({
|
|
'field': 'score', 'label': 'Kompletność',
|
|
'old': 'brak audytu',
|
|
'new': f"{new_audit['score']}%",
|
|
})
|
|
|
|
has_changes = len(changes) > 0
|
|
|
|
# Store pending audit result for later save
|
|
# Serialize the AuditResult fields we need to reconstruct it
|
|
pending_entry = {
|
|
'company_id': company_id,
|
|
'company_name': company_name,
|
|
'old_score': old_audit['score'] if old_audit else None,
|
|
'new_score': new_audit['score'],
|
|
'changes': changes,
|
|
'has_changes': has_changes,
|
|
# Store full result data for saving on approve
|
|
'audit_data': {
|
|
'completeness_score': result.completeness_score,
|
|
'review_count': result.review_count,
|
|
'average_rating': float(result.average_rating) if result.average_rating else None,
|
|
'photo_count': result.photo_count,
|
|
'logo_present': result.logo_present,
|
|
'cover_photo_present': result.cover_photo_present,
|
|
'google_place_id': result.google_place_id,
|
|
'google_maps_url': result.google_maps_url,
|
|
'has_name': new_audit['has_name'],
|
|
'has_address': new_audit['has_address'],
|
|
'has_phone': new_audit['has_phone'],
|
|
'has_website': new_audit['has_website'],
|
|
'has_hours': new_audit['has_hours'],
|
|
'has_categories': new_audit['has_categories'],
|
|
'has_photos': new_audit['has_photos'],
|
|
'has_description': new_audit['has_description'],
|
|
'has_services': new_audit['has_services'],
|
|
'has_reviews': new_audit['has_reviews'],
|
|
},
|
|
}
|
|
state['pending_changes'].append(pending_entry)
|
|
|
|
state['results'].append({
|
|
'company_id': company_id,
|
|
'company_name': company_name,
|
|
'score': result.completeness_score,
|
|
'old_score': old_audit['score'] if old_audit else None,
|
|
'status': 'changes' if has_changes else 'no_changes',
|
|
'changes_count': len(changes),
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"GBP batch audit failed for company {company_id}: {e}")
|
|
state['errors'] += 1
|
|
state['results'].append({
|
|
'company_id': company_id,
|
|
'company_name': company_name,
|
|
'score': None,
|
|
'status': 'error',
|
|
'error': str(e)[:100],
|
|
})
|
|
|
|
state['completed'] = i + 1
|
|
state['progress'] = round((i + 1) / state['total'] * 100)
|
|
_write_gbp_batch_state(state)
|
|
|
|
state['running'] = False
|
|
_write_gbp_batch_state(state)
|
|
except Exception as e:
|
|
logger.error(f"GBP batch audit thread crashed: {e}")
|
|
state = _read_gbp_batch_state()
|
|
state['running'] = False
|
|
_write_gbp_batch_state(state)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# SEO ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@bp.route('/seo')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_seo():
|
|
"""
|
|
Admin dashboard for SEO metrics overview.
|
|
|
|
Displays:
|
|
- Summary stats (score distribution, average score)
|
|
- Sortable table of all companies with SEO scores
|
|
- Color-coded score badges (green 90-100, yellow 50-89, red 0-49)
|
|
- Filtering by category, score range, and search text
|
|
- Last audit date with staleness indicator
|
|
- Actions: view profile, trigger single company audit
|
|
|
|
Query Parameters:
|
|
- company: Slug of company to highlight/filter (optional)
|
|
"""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
# Get optional company filter from URL
|
|
filter_company_slug = request.args.get('company', '')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
|
|
# Get all active companies with their latest SEO analysis data
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
Category.name.label('category_name'),
|
|
CompanyWebsiteAnalysis.pagespeed_seo_score,
|
|
CompanyWebsiteAnalysis.pagespeed_performance_score,
|
|
CompanyWebsiteAnalysis.pagespeed_accessibility_score,
|
|
CompanyWebsiteAnalysis.pagespeed_best_practices_score,
|
|
CompanyWebsiteAnalysis.seo_audited_at
|
|
).outerjoin(
|
|
Category,
|
|
Company.category_id == Category.id
|
|
).outerjoin(
|
|
CompanyWebsiteAnalysis,
|
|
Company.id == CompanyWebsiteAnalysis.company_id
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(
|
|
Company.name
|
|
).all()
|
|
|
|
# Build companies list with named attributes for template
|
|
companies = []
|
|
for row in companies_query:
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'website': row.website,
|
|
'category': row.category_name,
|
|
'seo_score': row.pagespeed_seo_score,
|
|
'performance_score': row.pagespeed_performance_score,
|
|
'accessibility_score': row.pagespeed_accessibility_score,
|
|
'best_practices_score': row.pagespeed_best_practices_score,
|
|
'seo_audited_at': row.seo_audited_at
|
|
})
|
|
|
|
# Calculate statistics
|
|
audited_companies = [c for c in companies if c['seo_score'] is not None]
|
|
not_audited = [c for c in companies if c['seo_score'] is None]
|
|
|
|
good_count = len([c for c in audited_companies if c['seo_score'] >= 90])
|
|
medium_count = len([c for c in audited_companies if 50 <= c['seo_score'] < 90])
|
|
poor_count = len([c for c in audited_companies if c['seo_score'] < 50])
|
|
not_audited_count = len(not_audited)
|
|
|
|
# Calculate average score (only for audited companies)
|
|
if audited_companies:
|
|
avg_score = round(sum(c['seo_score'] for c in audited_companies) / len(audited_companies))
|
|
else:
|
|
avg_score = None
|
|
|
|
stats = {
|
|
'good_count': good_count,
|
|
'medium_count': medium_count,
|
|
'poor_count': poor_count,
|
|
'not_audited_count': not_audited_count,
|
|
'avg_score': avg_score
|
|
}
|
|
|
|
# Get unique categories for filter dropdown
|
|
categories = sorted(set(c['category'] for c in companies if c['category']))
|
|
|
|
# Convert companies list to objects with attribute access for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
|
|
return render_template('admin_seo_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
categories=categories,
|
|
now=datetime.now(),
|
|
filter_company=filter_company_slug
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/seo/<int:company_id>')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_seo_detail(company_id):
|
|
"""Detailed SEO audit view for a single company."""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
flash('Firma nie istnieje.', 'error')
|
|
return redirect(url_for('admin.admin_seo'))
|
|
|
|
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
|
|
company_id=company_id
|
|
).first()
|
|
|
|
# Build recommendations from available data
|
|
recommendations = []
|
|
|
|
if not analysis:
|
|
recommendations.append({
|
|
'severity': 'info',
|
|
'text': 'Brak danych audytu. Uruchom audyt SEO dla tej firmy.'
|
|
})
|
|
else:
|
|
# SEO score
|
|
if analysis.pagespeed_seo_score is not None:
|
|
if analysis.pagespeed_seo_score < 50:
|
|
recommendations.append({
|
|
'severity': 'critical',
|
|
'text': 'Wynik SEO jest bardzo niski. Strona ma powazne problemy z optymalizacja pod wyszukiwarki.'
|
|
})
|
|
elif analysis.pagespeed_seo_score < 90:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Wynik SEO jest sredni. Sa elementy do poprawy w optymalizacji pod wyszukiwarki.'
|
|
})
|
|
|
|
# Performance
|
|
if analysis.pagespeed_performance_score is not None:
|
|
if analysis.pagespeed_performance_score < 50:
|
|
recommendations.append({
|
|
'severity': 'critical',
|
|
'text': 'Strona laduje sie bardzo wolno. Uzytkownicy moga ja opuszczac zanim sie zaladuje.'
|
|
})
|
|
elif analysis.pagespeed_performance_score < 90:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Szybkosc strony jest srednia. Mozna poprawic czas ladowania.'
|
|
})
|
|
|
|
# Meta tags
|
|
if not analysis.seo_title and not analysis.meta_title:
|
|
recommendations.append({
|
|
'severity': 'critical',
|
|
'text': 'Brak tytulu strony (meta title). Wyszukiwarki nie wiedza, czego dotyczy strona.'
|
|
})
|
|
if not analysis.seo_description and not analysis.meta_description:
|
|
recommendations.append({
|
|
'severity': 'critical',
|
|
'text': 'Brak opisu strony (meta description). To tekst wyswietlany w wynikach Google.'
|
|
})
|
|
|
|
# SSL
|
|
if analysis.has_ssl is False:
|
|
recommendations.append({
|
|
'severity': 'critical',
|
|
'text': 'Brak certyfikatu SSL (HTTPS). Przegladarki oznaczaja strone jako niebezpieczna.'
|
|
})
|
|
|
|
# Sitemap & robots
|
|
if analysis.has_sitemap is False:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Brak pliku sitemap.xml. Google moze nie odkryc wszystkich podstron.'
|
|
})
|
|
if analysis.has_robots_txt is False:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Brak pliku robots.txt. Nie ma instrukcji dla robotow wyszukiwarek.'
|
|
})
|
|
|
|
# Structured data
|
|
if analysis.has_structured_data is False:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Brak danych strukturalnych (Schema.org). Strona nie bedzie miala rozszerzonych wynikow w Google.'
|
|
})
|
|
|
|
# Images
|
|
if analysis.images_without_alt and analysis.images_without_alt > 0:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': f'{analysis.images_without_alt} obrazkow bez opisu (alt). Problem z dostepnoscia i SEO.'
|
|
})
|
|
|
|
# H1
|
|
if analysis.h1_count is not None and analysis.h1_count != 1:
|
|
if analysis.h1_count == 0:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Brak naglowka H1. Strona powinna miec dokladnie jeden glowny naglowek.'
|
|
})
|
|
elif analysis.h1_count > 1:
|
|
recommendations.append({
|
|
'severity': 'info',
|
|
'text': f'Strona ma {analysis.h1_count} naglowkow H1 (powinien byc 1).'
|
|
})
|
|
|
|
# Open Graph
|
|
if analysis.has_og_tags is False:
|
|
recommendations.append({
|
|
'severity': 'info',
|
|
'text': 'Brak tagow Open Graph. Linki udostepnione na Facebooku nie beda mialy podgladu.'
|
|
})
|
|
|
|
# Canonical
|
|
if analysis.has_canonical is False:
|
|
recommendations.append({
|
|
'severity': 'info',
|
|
'text': 'Brak tagu canonical. Moze prowadzic do duplikacji tresci w wyszukiwarkach.'
|
|
})
|
|
|
|
# Accessibility
|
|
if analysis.pagespeed_accessibility_score is not None and analysis.pagespeed_accessibility_score < 50:
|
|
recommendations.append({
|
|
'severity': 'warning',
|
|
'text': 'Niska dostepnosc strony. Osoby z niepelnosprawnosciami moga miec problem z korzystaniem.'
|
|
})
|
|
|
|
# Mobile
|
|
if analysis.is_responsive is False and analysis.is_mobile_friendly is False:
|
|
recommendations.append({
|
|
'severity': 'critical',
|
|
'text': 'Strona nie jest dostosowana do urzadzen mobilnych. Google karze takie strony w wynikach.'
|
|
})
|
|
|
|
# Security headers
|
|
if analysis.security_headers_count is not None and analysis.security_headers_count < 2:
|
|
recommendations.append({
|
|
'severity': 'info',
|
|
'text': 'Brak waznych naglowkow bezpieczenstwa (HSTS, CSP). Strona jest slabiej zabezpieczona.'
|
|
})
|
|
|
|
# All good?
|
|
if not recommendations:
|
|
recommendations.append({
|
|
'severity': 'success',
|
|
'text': 'Strona wyglada dobrze! Nie znaleziono powaznych problemow.'
|
|
})
|
|
|
|
# Add issues from seo_issues JSONB if available
|
|
if analysis.seo_issues:
|
|
for issue in analysis.seo_issues:
|
|
if isinstance(issue, dict):
|
|
recommendations.append({
|
|
'severity': issue.get('severity', 'info'),
|
|
'text': issue.get('message', issue.get('text', str(issue)))
|
|
})
|
|
|
|
# IP geolocation lookup for hosting details
|
|
ip_info = {}
|
|
if analysis and analysis.hosting_ip:
|
|
try:
|
|
import requests as req
|
|
resp = req.get(
|
|
f'http://ip-api.com/json/{analysis.hosting_ip}?fields=status,org,isp,city,regionName,country,as',
|
|
timeout=3
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get('status') == 'success':
|
|
ip_info = {
|
|
'org': data.get('org', ''),
|
|
'isp': data.get('isp', ''),
|
|
'city': data.get('city', ''),
|
|
'region': data.get('regionName', ''),
|
|
'country': data.get('country', ''),
|
|
'as_number': data.get('as', ''),
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# Category benchmarks — average scores in same category
|
|
benchmarks = {}
|
|
if company.category_id and analysis:
|
|
try:
|
|
from sqlalchemy import func as sqlfunc
|
|
cat_stats = db.query(
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_seo_score),
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_performance_score),
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_accessibility_score),
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_best_practices_score),
|
|
sqlfunc.count(CompanyWebsiteAnalysis.id),
|
|
).join(Company, Company.id == CompanyWebsiteAnalysis.company_id
|
|
).filter(
|
|
Company.category_id == company.category_id,
|
|
Company.status == 'active',
|
|
CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None),
|
|
).first()
|
|
|
|
if cat_stats and cat_stats[4] > 1:
|
|
from database import Category
|
|
cat_name = db.query(Category.name).filter_by(id=company.category_id).scalar()
|
|
benchmarks = {
|
|
'category_name': cat_name or '',
|
|
'count': cat_stats[4],
|
|
'avg_seo': round(float(cat_stats[0] or 0)),
|
|
'avg_performance': round(float(cat_stats[1] or 0)),
|
|
'avg_accessibility': round(float(cat_stats[2] or 0)),
|
|
'avg_best_practices': round(float(cat_stats[3] or 0)),
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# All-members benchmark
|
|
all_benchmarks = {}
|
|
if analysis:
|
|
try:
|
|
from sqlalchemy import func as sqlfunc
|
|
all_stats = db.query(
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_seo_score),
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_performance_score),
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_accessibility_score),
|
|
sqlfunc.avg(CompanyWebsiteAnalysis.pagespeed_best_practices_score),
|
|
sqlfunc.count(CompanyWebsiteAnalysis.id),
|
|
).join(Company, Company.id == CompanyWebsiteAnalysis.company_id
|
|
).filter(
|
|
Company.status == 'active',
|
|
CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None),
|
|
).first()
|
|
|
|
if all_stats and all_stats[4] > 1:
|
|
all_benchmarks = {
|
|
'count': all_stats[4],
|
|
'avg_seo': round(float(all_stats[0] or 0)),
|
|
'avg_performance': round(float(all_stats[1] or 0)),
|
|
'avg_accessibility': round(float(all_stats[2] or 0)),
|
|
'avg_best_practices': round(float(all_stats[3] or 0)),
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# WHOIS domain expiry lookup via RDAP
|
|
domain_info = {}
|
|
if analysis and company.website:
|
|
try:
|
|
import re
|
|
import requests as req
|
|
domain_match = re.search(r'https?://(?:www\.)?([^/]+)', company.website)
|
|
if domain_match:
|
|
domain = domain_match.group(1)
|
|
# Map TLD to direct RDAP server (bypass rdap.org Cloudflare)
|
|
tld = domain.rsplit('.', 1)[-1].lower()
|
|
rdap_servers = {
|
|
'pl': 'https://rdap.dns.pl',
|
|
'com': 'https://rdap.verisign.com/com/v1',
|
|
'net': 'https://rdap.verisign.com/net/v1',
|
|
'org': 'https://rdap.org/domain', # fallback
|
|
'eu': 'https://rdap.eu/domain',
|
|
'de': 'https://rdap.denic.de',
|
|
}
|
|
rdap_base = rdap_servers.get(tld, f'https://rdap.org')
|
|
rdap_url = f'{rdap_base}/domain/{domain}'
|
|
|
|
resp = req.get(rdap_url, timeout=5, headers={
|
|
'Accept': 'application/rdap+json',
|
|
'User-Agent': 'NordaBiz-SEO-Audit/1.0'
|
|
})
|
|
if resp.status_code == 200:
|
|
rdap = resp.json()
|
|
for event in rdap.get('events', []):
|
|
action = event.get('eventAction')
|
|
date_str = event.get('eventDate', '')[:10]
|
|
if action == 'expiration':
|
|
domain_info['expires'] = date_str
|
|
elif action == 'registration':
|
|
domain_info['registered'] = date_str
|
|
elif action == 'last changed':
|
|
domain_info['updated'] = date_str
|
|
# Registrar from entities
|
|
for entity in rdap.get('entities', []):
|
|
if 'registrar' in entity.get('roles', []):
|
|
vcard = entity.get('vcardArray', [None, []])[1]
|
|
for item in (vcard or []):
|
|
if item[0] == 'fn':
|
|
domain_info['registrar'] = item[3]
|
|
break
|
|
domain_info['domain'] = domain
|
|
except Exception:
|
|
pass
|
|
|
|
return render_template('admin/seo_detail.html',
|
|
company=company,
|
|
analysis=analysis,
|
|
recommendations=recommendations,
|
|
ip_info=ip_info,
|
|
benchmarks=benchmarks,
|
|
all_benchmarks=all_benchmarks,
|
|
domain_info=domain_info
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# GBP AUDIT ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@bp.route('/gbp-audit')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_gbp_audit():
|
|
"""
|
|
Admin dashboard for GBP (Google Business Profile) audit overview.
|
|
|
|
Displays:
|
|
- Summary stats (completeness score distribution, field coverage)
|
|
- Sortable table of all companies with GBP audit data
|
|
- Review metrics (avg rating, review counts)
|
|
- Photo statistics
|
|
"""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
|
|
# Subquery to get latest audit for each company
|
|
latest_audit_subq = db.query(
|
|
GBPAudit.company_id,
|
|
func.max(GBPAudit.audit_date).label('max_date')
|
|
).group_by(GBPAudit.company_id).subquery()
|
|
|
|
# Get all companies with their latest GBP audit data
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
Category.name.label('category_name'),
|
|
GBPAudit.completeness_score,
|
|
GBPAudit.average_rating,
|
|
GBPAudit.review_count,
|
|
GBPAudit.photo_count,
|
|
GBPAudit.has_name,
|
|
GBPAudit.has_address,
|
|
GBPAudit.has_phone,
|
|
GBPAudit.has_website,
|
|
GBPAudit.has_hours,
|
|
GBPAudit.has_categories,
|
|
GBPAudit.has_photos,
|
|
GBPAudit.has_description,
|
|
GBPAudit.has_services,
|
|
GBPAudit.has_reviews,
|
|
GBPAudit.audit_date
|
|
).outerjoin(
|
|
Category,
|
|
Company.category_id == Category.id
|
|
).outerjoin(
|
|
latest_audit_subq,
|
|
Company.id == latest_audit_subq.c.company_id
|
|
).outerjoin(
|
|
GBPAudit,
|
|
(Company.id == GBPAudit.company_id) &
|
|
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(Company.name).all()
|
|
|
|
# Build companies list
|
|
companies = []
|
|
for row in companies_query:
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'website': row.website,
|
|
'category': row.category_name,
|
|
'completeness_score': row.completeness_score,
|
|
'average_rating': float(row.average_rating) if row.average_rating else None,
|
|
'review_count': row.review_count or 0,
|
|
'photo_count': row.photo_count or 0,
|
|
'has_name': row.has_name,
|
|
'has_address': row.has_address,
|
|
'has_phone': row.has_phone,
|
|
'has_website': row.has_website,
|
|
'has_hours': row.has_hours,
|
|
'has_categories': row.has_categories,
|
|
'has_photos': row.has_photos,
|
|
'has_description': row.has_description,
|
|
'has_services': row.has_services,
|
|
'has_reviews': row.has_reviews,
|
|
'audit_date': row.audit_date
|
|
})
|
|
|
|
# Calculate statistics
|
|
total_companies = len(companies)
|
|
audited = [c for c in companies if c['completeness_score'] is not None]
|
|
not_audited = [c for c in companies if c['completeness_score'] is None]
|
|
|
|
# Score distribution
|
|
excellent_count = len([c for c in audited if c['completeness_score'] >= 90])
|
|
good_count = len([c for c in audited if 70 <= c['completeness_score'] < 90])
|
|
poor_count = len([c for c in audited if c['completeness_score'] < 70])
|
|
not_audited_count = len(not_audited)
|
|
|
|
# Average completeness
|
|
avg_completeness = round(sum(c['completeness_score'] for c in audited) / len(audited)) if audited else None
|
|
|
|
# Average rating (only for companies with reviews)
|
|
companies_with_rating = [c for c in audited if c['average_rating']]
|
|
avg_rating = round(sum(c['average_rating'] for c in companies_with_rating) / len(companies_with_rating), 1) if companies_with_rating else None
|
|
|
|
# Total reviews
|
|
total_reviews = sum(c['review_count'] for c in companies)
|
|
|
|
# Field coverage stats (percentage of audited companies with each field)
|
|
if audited:
|
|
field_coverage = {
|
|
'name': round(len([c for c in audited if c['has_name']]) / len(audited) * 100),
|
|
'address': round(len([c for c in audited if c['has_address']]) / len(audited) * 100),
|
|
'phone': round(len([c for c in audited if c['has_phone']]) / len(audited) * 100),
|
|
'website': round(len([c for c in audited if c['has_website']]) / len(audited) * 100),
|
|
'hours': round(len([c for c in audited if c['has_hours']]) / len(audited) * 100),
|
|
'categories': round(len([c for c in audited if c['has_categories']]) / len(audited) * 100),
|
|
'photos': round(len([c for c in audited if c['has_photos']]) / len(audited) * 100),
|
|
'description': round(len([c for c in audited if c['has_description']]) / len(audited) * 100),
|
|
'services': round(len([c for c in audited if c['has_services']]) / len(audited) * 100),
|
|
'reviews': round(len([c for c in audited if c['has_reviews']]) / len(audited) * 100),
|
|
}
|
|
else:
|
|
field_coverage = {k: 0 for k in ['name', 'address', 'phone', 'website', 'hours', 'categories', 'photos', 'description', 'services', 'reviews']}
|
|
|
|
stats = {
|
|
'total_companies': total_companies,
|
|
'audited_count': len(audited),
|
|
'excellent_count': excellent_count,
|
|
'good_count': good_count,
|
|
'poor_count': poor_count,
|
|
'not_audited_count': not_audited_count,
|
|
'avg_completeness': avg_completeness,
|
|
'avg_rating': avg_rating,
|
|
'total_reviews': total_reviews,
|
|
'field_coverage': field_coverage
|
|
}
|
|
|
|
# Get unique categories
|
|
categories = sorted(set(c['category'] for c in companies if c['category']))
|
|
|
|
# Convert to objects for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
|
|
return render_template('admin/gbp_audit_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
categories=categories,
|
|
now=datetime.now()
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/gbp-audit/run-batch', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_audit_run_batch():
|
|
"""Start batch GBP audit for all active companies."""
|
|
if not is_audit_owner():
|
|
return jsonify({'error': 'Brak uprawnień'}), 403
|
|
|
|
state = _read_gbp_batch_state()
|
|
if state.get('running'):
|
|
return jsonify({
|
|
'error': 'Audyt już działa',
|
|
'progress': state.get('progress', 0),
|
|
'completed': state.get('completed', 0),
|
|
'total': state.get('total', 0),
|
|
}), 409
|
|
|
|
fetch_google = request.form.get('fetch_google', '0') == '1'
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company_ids = [c.id for c in db.query(Company.id).filter(Company.status == 'active').all()]
|
|
finally:
|
|
db.close()
|
|
|
|
if not company_ids:
|
|
return jsonify({'error': 'Brak aktywnych firm'}), 400
|
|
|
|
_write_gbp_batch_state({
|
|
'running': True,
|
|
'progress': 0,
|
|
'total': len(company_ids),
|
|
'completed': 0,
|
|
'errors': 0,
|
|
'results': [],
|
|
'pending_changes': [],
|
|
'approved': False,
|
|
})
|
|
|
|
thread = threading.Thread(
|
|
target=_run_gbp_batch_background,
|
|
args=(company_ids, fetch_google),
|
|
daemon=True
|
|
)
|
|
thread.start()
|
|
|
|
return jsonify({
|
|
'status': 'started',
|
|
'total': len(company_ids),
|
|
'message': f'Rozpoczęto audyt GBP dla {len(company_ids)} firm.',
|
|
})
|
|
|
|
|
|
@bp.route('/gbp-audit/batch-status')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_gbp_audit_batch_status():
|
|
"""Get current batch GBP audit status with live results feed."""
|
|
state = _read_gbp_batch_state()
|
|
results = state.get('results', [])
|
|
|
|
since = request.args.get('since', 0, type=int)
|
|
new_results = results[since:]
|
|
|
|
return jsonify({
|
|
'running': state.get('running', False),
|
|
'progress': state.get('progress', 0),
|
|
'completed': state.get('completed', 0),
|
|
'total': state.get('total', 0),
|
|
'errors': state.get('errors', 0),
|
|
'results': new_results,
|
|
'results_total': len(results),
|
|
'pending_count': len(state.get('pending_changes', [])),
|
|
})
|
|
|
|
|
|
@bp.route('/gbp-audit/batch-review')
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_audit_batch_review():
|
|
"""Review pending GBP batch audit changes before saving."""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
|
|
state = _read_gbp_batch_state()
|
|
|
|
if state.get('running'):
|
|
flash('Audyt jeszcze trwa. Poczekaj na zakończenie.', 'warning')
|
|
return redirect(url_for('admin.admin_gbp_audit'))
|
|
|
|
pending = state.get('pending_changes', [])
|
|
results = state.get('results', [])
|
|
approved = state.get('approved', False)
|
|
|
|
# Summary stats
|
|
with_changes = [p for p in pending if p.get('has_changes')]
|
|
without_changes = [p for p in pending if not p.get('has_changes')]
|
|
errors = [r for r in results if r.get('status') == 'error']
|
|
|
|
summary = {
|
|
'total_companies': len(pending) + len(errors),
|
|
'with_changes': len(with_changes),
|
|
'without_changes': len(without_changes),
|
|
'errors': len(errors),
|
|
}
|
|
|
|
return render_template('admin/gbp_audit_batch_review.html',
|
|
pending=with_changes,
|
|
no_changes=without_changes,
|
|
errors=errors,
|
|
summary=summary,
|
|
approved=approved,
|
|
)
|
|
|
|
|
|
@bp.route('/gbp-audit/batch-approve', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_audit_batch_approve():
|
|
"""Approve and save pending GBP audit results to database."""
|
|
if not is_audit_owner():
|
|
return jsonify({'error': 'Brak uprawnień'}), 403
|
|
|
|
state = _read_gbp_batch_state()
|
|
if state.get('running'):
|
|
return jsonify({'error': 'Audyt jeszcze trwa'}), 409
|
|
if state.get('approved'):
|
|
return jsonify({'error': 'Wyniki już zostały zatwierdzone'}), 409
|
|
|
|
pending = state.get('pending_changes', [])
|
|
if not pending:
|
|
return jsonify({'error': 'Brak wyników do zatwierdzenia'}), 400
|
|
|
|
db = SessionLocal()
|
|
applied = 0
|
|
errors = 0
|
|
try:
|
|
for entry in pending:
|
|
try:
|
|
data = entry['audit_data']
|
|
audit = GBPAudit(
|
|
company_id=entry['company_id'],
|
|
completeness_score=data['completeness_score'],
|
|
review_count=data.get('review_count', 0),
|
|
average_rating=data.get('average_rating'),
|
|
photo_count=data.get('photo_count', 0),
|
|
logo_present=data.get('logo_present', False),
|
|
cover_photo_present=data.get('cover_photo_present', False),
|
|
google_place_id=data.get('google_place_id'),
|
|
google_maps_url=data.get('google_maps_url'),
|
|
has_name=data.get('has_name', False),
|
|
has_address=data.get('has_address', False),
|
|
has_phone=data.get('has_phone', False),
|
|
has_website=data.get('has_website', False),
|
|
has_hours=data.get('has_hours', False),
|
|
has_categories=data.get('has_categories', False),
|
|
has_photos=data.get('has_photos', False),
|
|
has_description=data.get('has_description', False),
|
|
has_services=data.get('has_services', False),
|
|
has_reviews=data.get('has_reviews', False),
|
|
audit_date=datetime.now(),
|
|
)
|
|
db.add(audit)
|
|
applied += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to save GBP audit for company {entry.get('company_id')}: {e}")
|
|
errors += 1
|
|
|
|
db.commit()
|
|
|
|
# Mark as approved in state
|
|
state['approved'] = True
|
|
_write_gbp_batch_state(state)
|
|
|
|
return jsonify({
|
|
'status': 'approved',
|
|
'applied': applied,
|
|
'errors': errors,
|
|
'message': f'Zapisano {applied} audytów do bazy danych.',
|
|
})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"GBP batch approve failed: {e}")
|
|
return jsonify({'error': f'Błąd zapisu: {str(e)[:100]}'}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/gbp-audit/batch-discard', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_audit_batch_discard():
|
|
"""Discard pending GBP batch audit results without saving."""
|
|
if not is_audit_owner():
|
|
return jsonify({'error': 'Brak uprawnień'}), 403
|
|
|
|
state = _read_gbp_batch_state()
|
|
count = len(state.get('pending_changes', []))
|
|
|
|
_write_gbp_batch_state(dict(_GBP_BATCH_DEFAULT))
|
|
|
|
return jsonify({
|
|
'status': 'discarded',
|
|
'count': count,
|
|
'message': f'Odrzucono {count} wyników audytu.',
|
|
})
|
|
|
|
|
|
# ============================================================
|
|
# GBP PLACE ID MATCHING (manual review)
|
|
# ============================================================
|
|
|
|
@bp.route('/gbp-audit/match-places')
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_match_places():
|
|
"""Show companies without google_place_id for manual matching."""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Get companies without place_id
|
|
subq = db.query(CompanyWebsiteAnalysis.company_id).filter(
|
|
CompanyWebsiteAnalysis.google_place_id.isnot(None)
|
|
).subquery()
|
|
|
|
companies = db.query(
|
|
Company.id, Company.name, Company.address_city, Company.website
|
|
).filter(
|
|
Company.status == 'active',
|
|
~Company.id.in_(subq)
|
|
).order_by(Company.name).all()
|
|
|
|
return render_template('admin/gbp_match_places.html', companies=companies)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/gbp-audit/search-place', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_search_place():
|
|
"""Search Google Places for a company (no name filter — raw results)."""
|
|
if not is_audit_owner():
|
|
return jsonify({'error': 'Brak uprawnień'}), 403
|
|
|
|
company_id = request.form.get('company_id', type=int)
|
|
if not company_id:
|
|
return jsonify({'error': 'Brak company_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.get(Company, company_id)
|
|
if not company:
|
|
return jsonify({'error': 'Firma nie znaleziona'}), 404
|
|
|
|
try:
|
|
from google_places_service import GooglePlacesService
|
|
places_service = GooglePlacesService()
|
|
except (ImportError, ValueError) as e:
|
|
return jsonify({'error': f'Places API niedostępne: {e}'}), 500
|
|
|
|
city = company.address_city or 'Wejherowo'
|
|
query = f'{company.name} {city}'
|
|
location_bias = {'latitude': 54.6059, 'longitude': 18.2350, 'radius': 50000.0}
|
|
|
|
# Search via raw API call — return ALL results for manual review
|
|
results = places_service.search_places_raw(query, location_bias=location_bias)
|
|
|
|
if not results:
|
|
# Try broader search with just company name
|
|
results = places_service.search_places_raw(company.name, location_bias=location_bias)
|
|
|
|
if not results:
|
|
return jsonify({'results': [], 'query': query})
|
|
|
|
places = []
|
|
for p in results:
|
|
place_id = p.get('id', '')
|
|
if place_id.startswith('places/'):
|
|
place_id = place_id.replace('places/', '')
|
|
places.append({
|
|
'place_id': place_id,
|
|
'name': p.get('displayName', {}).get('text', ''),
|
|
'address': p.get('formattedAddress', ''),
|
|
'types': ', '.join((p.get('types') or [])[:3]),
|
|
'rating': p.get('rating'),
|
|
'reviews_count': p.get('userRatingCount'),
|
|
})
|
|
|
|
return jsonify({'results': places, 'query': query})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/gbp-audit/confirm-place', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def admin_gbp_confirm_place():
|
|
"""Save confirmed google_place_id for a company."""
|
|
if not is_audit_owner():
|
|
return jsonify({'error': 'Brak uprawnień'}), 403
|
|
|
|
company_id = request.form.get('company_id', type=int)
|
|
place_id = request.form.get('place_id', '').strip()
|
|
google_name = request.form.get('google_name', '').strip()
|
|
|
|
if not company_id or not place_id:
|
|
return jsonify({'error': 'Brak company_id lub place_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.get(Company, company_id)
|
|
if not company:
|
|
return jsonify({'error': 'Firma nie znaleziona'}), 404
|
|
|
|
analysis = db.query(CompanyWebsiteAnalysis).filter(
|
|
CompanyWebsiteAnalysis.company_id == company_id
|
|
).first()
|
|
|
|
if not analysis:
|
|
analysis = CompanyWebsiteAnalysis(
|
|
company_id=company_id,
|
|
url=company.website,
|
|
analyzed_at=datetime.now()
|
|
)
|
|
db.add(analysis)
|
|
|
|
analysis.google_place_id = place_id
|
|
if google_name:
|
|
analysis.google_name = google_name
|
|
analysis.analyzed_at = datetime.now()
|
|
db.commit()
|
|
|
|
logger.info(f"Place ID confirmed for company {company_id} ({company.name}): {place_id}")
|
|
return jsonify({'status': 'ok', 'message': f'Place ID zapisany dla {company.name}'})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to confirm place_id for company {company_id}: {e}")
|
|
return jsonify({'error': str(e)[:100]}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# DIGITAL MATURITY DASHBOARD
|
|
# ============================================================
|
|
|
|
@bp.route('/digital-maturity')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def digital_maturity_dashboard():
|
|
"""Admin dashboard for digital maturity assessment results"""
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, desc
|
|
|
|
# Get all companies with maturity data
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
CompanyDigitalMaturity.overall_score,
|
|
CompanyDigitalMaturity.online_presence_score,
|
|
CompanyDigitalMaturity.sales_readiness,
|
|
CompanyDigitalMaturity.total_opportunity_value,
|
|
CompanyWebsiteAnalysis.opportunity_score,
|
|
CompanyWebsiteAnalysis.has_blog,
|
|
CompanyWebsiteAnalysis.has_portfolio,
|
|
CompanyWebsiteAnalysis.has_contact_form,
|
|
CompanyWebsiteAnalysis.content_richness_score,
|
|
CompanyDigitalMaturity.critical_gaps,
|
|
CompanyWebsiteAnalysis.missing_features
|
|
).join(
|
|
CompanyDigitalMaturity, Company.id == CompanyDigitalMaturity.company_id
|
|
).join(
|
|
CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id
|
|
).filter(
|
|
CompanyDigitalMaturity.overall_score > 0
|
|
).order_by(
|
|
desc(CompanyDigitalMaturity.overall_score)
|
|
).all()
|
|
|
|
# Calculate stats
|
|
total_analyzed = len(companies_query)
|
|
avg_score = round(sum(c.overall_score for c in companies_query) / total_analyzed, 1) if total_analyzed else 0
|
|
total_opportunity = sum(float(c.total_opportunity_value or 0) for c in companies_query)
|
|
|
|
warm_leads = [c for c in companies_query if c.sales_readiness == 'warm']
|
|
cold_leads = [c for c in companies_query if c.sales_readiness == 'cold']
|
|
|
|
# Top 10 and bottom 10
|
|
top_performers = companies_query[:10]
|
|
bottom_performers = sorted(companies_query, key=lambda c: c.overall_score)[:10]
|
|
|
|
# Top opportunities
|
|
top_opportunities = sorted(
|
|
companies_query,
|
|
key=lambda c: float(c.total_opportunity_value or 0),
|
|
reverse=True
|
|
)[:10]
|
|
|
|
return render_template('admin/digital_maturity.html',
|
|
total_analyzed=total_analyzed,
|
|
avg_score=avg_score,
|
|
total_opportunity=total_opportunity,
|
|
warm_leads_count=len(warm_leads),
|
|
cold_leads_count=len(cold_leads),
|
|
top_performers=top_performers,
|
|
bottom_performers=bottom_performers,
|
|
top_opportunities=top_opportunities,
|
|
all_companies=companies_query
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# KRS AUDIT DASHBOARD
|
|
# ============================================================
|
|
|
|
@bp.route('/krs-audit')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_krs_audit():
|
|
"""
|
|
Admin dashboard for KRS (Krajowy Rejestr Sądowy) audit.
|
|
|
|
Displays:
|
|
- Summary stats (with KRS, audited count, data extraction status)
|
|
- List of companies with KRS numbers
|
|
- Audit progress and status for each company
|
|
- Links to source PDF files
|
|
"""
|
|
# Check if KRS audit service is available
|
|
try:
|
|
from krs_audit_service import KRS_AUDIT_AVAILABLE
|
|
except ImportError:
|
|
KRS_AUDIT_AVAILABLE = False
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
|
|
# Get all active companies with KRS numbers
|
|
companies_query = db.query(Company).filter(
|
|
Company.status == 'active',
|
|
Company.krs.isnot(None),
|
|
Company.krs != ''
|
|
).order_by(Company.name).all()
|
|
|
|
# Get latest audit for each company
|
|
companies = []
|
|
for company in companies_query:
|
|
# Get latest audit
|
|
latest_audit = db.query(KRSAudit).filter(
|
|
KRSAudit.company_id == company.id
|
|
).order_by(KRSAudit.audit_date.desc()).first()
|
|
|
|
# Get PKD codes (all)
|
|
pkd_codes = db.query(CompanyPKD).filter(
|
|
CompanyPKD.company_id == company.id
|
|
).order_by(CompanyPKD.is_primary.desc(), CompanyPKD.pkd_code).all()
|
|
pkd_count = len(pkd_codes)
|
|
|
|
# Get people count
|
|
people_count = db.query(CompanyPerson).filter(
|
|
CompanyPerson.company_id == company.id
|
|
).count()
|
|
|
|
companies.append({
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'krs': company.krs,
|
|
'nip': company.nip,
|
|
'capital_amount': company.capital_amount,
|
|
'krs_last_audit_at': company.krs_last_audit_at,
|
|
'krs_pdf_path': company.krs_pdf_path,
|
|
'audit': latest_audit,
|
|
'pkd_count': pkd_count,
|
|
'pkd_codes': [{
|
|
'code': pkd.pkd_code,
|
|
'description': pkd.pkd_description,
|
|
'is_primary': pkd.is_primary
|
|
} for pkd in pkd_codes],
|
|
'people_count': people_count,
|
|
'capital_shares_count': company.capital_shares_count
|
|
})
|
|
|
|
# Calculate stats
|
|
total_with_krs = len(companies)
|
|
audited_count = len([c for c in companies if c['krs_last_audit_at']])
|
|
not_audited_count = total_with_krs - audited_count
|
|
with_capital = len([c for c in companies if c['capital_amount']])
|
|
with_people = len([c for c in companies if c['people_count'] > 0])
|
|
with_pkd = len([c for c in companies if c['pkd_count'] > 0])
|
|
|
|
# Companies without KRS
|
|
no_krs_count = db.query(Company).filter(
|
|
Company.status == 'active',
|
|
(Company.krs.is_(None)) | (Company.krs == '')
|
|
).count()
|
|
|
|
stats = {
|
|
'total_with_krs': total_with_krs,
|
|
'audited_count': audited_count,
|
|
'not_audited_count': not_audited_count,
|
|
'no_krs_count': no_krs_count,
|
|
'with_capital': with_capital,
|
|
'with_people': with_people,
|
|
'with_pkd': with_pkd
|
|
}
|
|
|
|
return render_template('admin/krs_audit_dashboard.html',
|
|
companies=companies,
|
|
stats=stats,
|
|
krs_audit_available=KRS_AUDIT_AVAILABLE,
|
|
now=datetime.now()
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# IT AUDIT ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@bp.route('/it-audit')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_it_audit():
|
|
"""
|
|
Admin dashboard for IT audit overview.
|
|
|
|
Displays:
|
|
- Summary stats (audit count, average scores, maturity distribution)
|
|
- Technology adoption stats (Azure AD, M365, PBS, Zabbix, EDR, DR)
|
|
- Collaboration flags distribution
|
|
- Company table with IT audit data
|
|
- Collaboration matches matrix
|
|
|
|
Access: Office Manager and above
|
|
"""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
|
|
# Import IT audit service helper
|
|
try:
|
|
from it_audit_service import get_maturity_level_label
|
|
except ImportError:
|
|
def get_maturity_level_label(level):
|
|
labels = {
|
|
'basic': 'Podstawowy',
|
|
'developing': 'Rozwijający się',
|
|
'established': 'Ustalony',
|
|
'advanced': 'Zaawansowany'
|
|
}
|
|
return labels.get(level, level)
|
|
|
|
# Get all active companies with their latest IT audit
|
|
# Using subquery to get only the latest audit per company
|
|
latest_audit_subq = db.query(
|
|
ITAudit.company_id,
|
|
func.max(ITAudit.audit_date).label('max_date')
|
|
).group_by(ITAudit.company_id).subquery()
|
|
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
ITAudit.id.label('audit_id'),
|
|
ITAudit.overall_score,
|
|
ITAudit.security_score,
|
|
ITAudit.collaboration_score,
|
|
ITAudit.completeness_score,
|
|
ITAudit.maturity_level,
|
|
ITAudit.audit_date,
|
|
ITAudit.has_azure_ad,
|
|
ITAudit.has_m365,
|
|
ITAudit.has_proxmox_pbs,
|
|
ITAudit.monitoring_solution,
|
|
ITAudit.has_edr,
|
|
ITAudit.has_dr_plan
|
|
).outerjoin(
|
|
latest_audit_subq,
|
|
Company.id == latest_audit_subq.c.company_id
|
|
).outerjoin(
|
|
ITAudit,
|
|
(Company.id == ITAudit.company_id) &
|
|
(ITAudit.audit_date == latest_audit_subq.c.max_date)
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(
|
|
Company.name
|
|
).all()
|
|
|
|
# Build companies list with named attributes for template
|
|
companies = []
|
|
for row in companies_query:
|
|
# Detect Zabbix from monitoring_solution field
|
|
has_zabbix = row.monitoring_solution and 'zabbix' in str(row.monitoring_solution).lower()
|
|
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'audit_id': row.audit_id,
|
|
'overall_score': row.overall_score,
|
|
'security_score': row.security_score,
|
|
'collaboration_score': row.collaboration_score,
|
|
'completeness_score': row.completeness_score,
|
|
'maturity_level': row.maturity_level,
|
|
'maturity_label': get_maturity_level_label(row.maturity_level) if row.maturity_level else None,
|
|
'audit_date': row.audit_date,
|
|
'has_azure_ad': row.has_azure_ad,
|
|
'has_m365': row.has_m365,
|
|
'has_proxmox_pbs': row.has_proxmox_pbs,
|
|
'has_zabbix': has_zabbix,
|
|
'has_edr': row.has_edr,
|
|
'has_dr_plan': row.has_dr_plan
|
|
})
|
|
|
|
# Calculate statistics
|
|
audited_companies = [c for c in companies if c['overall_score'] is not None]
|
|
not_audited = [c for c in companies if c['overall_score'] is None]
|
|
|
|
# Maturity distribution
|
|
maturity_counts = {
|
|
'basic': 0,
|
|
'developing': 0,
|
|
'established': 0,
|
|
'advanced': 0
|
|
}
|
|
for c in audited_companies:
|
|
level = c['maturity_level']
|
|
if level in maturity_counts:
|
|
maturity_counts[level] += 1
|
|
|
|
# Calculate average scores
|
|
if audited_companies:
|
|
avg_overall = round(sum(c['overall_score'] for c in audited_companies) / len(audited_companies))
|
|
avg_security = round(sum(c['security_score'] or 0 for c in audited_companies) / len(audited_companies))
|
|
avg_collaboration = round(sum(c['collaboration_score'] or 0 for c in audited_companies) / len(audited_companies))
|
|
else:
|
|
avg_overall = None
|
|
avg_security = None
|
|
avg_collaboration = None
|
|
|
|
# Technology adoption stats
|
|
tech_stats = {
|
|
'azure_ad': len([c for c in audited_companies if c['has_azure_ad']]),
|
|
'm365': len([c for c in audited_companies if c['has_m365']]),
|
|
'proxmox_pbs': len([c for c in audited_companies if c['has_proxmox_pbs']]),
|
|
'zabbix': len([c for c in audited_companies if c['has_zabbix']]),
|
|
'edr': len([c for c in audited_companies if c['has_edr']]),
|
|
'dr_plan': len([c for c in audited_companies if c['has_dr_plan']])
|
|
}
|
|
|
|
# Collaboration flags stats from latest audits
|
|
collab_stats = {}
|
|
if audited_companies:
|
|
collab_flags = [
|
|
'open_to_shared_licensing',
|
|
'open_to_backup_replication',
|
|
'open_to_teams_federation',
|
|
'open_to_shared_monitoring',
|
|
'open_to_collective_purchasing',
|
|
'open_to_knowledge_sharing'
|
|
]
|
|
for flag in collab_flags:
|
|
count = db.query(func.count(ITAudit.id)).filter(
|
|
ITAudit.id.in_([c['audit_id'] for c in audited_companies if c['audit_id']]),
|
|
getattr(ITAudit, flag) == True
|
|
).scalar()
|
|
collab_stats[flag] = count
|
|
|
|
# Get collaboration matches with both companies' info
|
|
matches = db.query(ITCollaborationMatch).order_by(
|
|
ITCollaborationMatch.match_score.desc()
|
|
).all()
|
|
|
|
# Build flat list of collaboration matches with all necessary attributes
|
|
class CollabMatchRow:
|
|
"""Helper class for template attribute access"""
|
|
def __init__(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
collaboration_matches = []
|
|
for match in matches:
|
|
# Get company A and B info
|
|
company_a = db.query(Company).filter(Company.id == match.company_a_id).first()
|
|
company_b = db.query(Company).filter(Company.id == match.company_b_id).first()
|
|
|
|
collaboration_matches.append(CollabMatchRow(
|
|
id=match.id,
|
|
match_type=match.match_type,
|
|
company_a_id=match.company_a_id,
|
|
company_a_name=company_a.name if company_a else 'Nieznana',
|
|
company_a_slug=company_a.slug if company_a else '',
|
|
company_b_id=match.company_b_id,
|
|
company_b_name=company_b.name if company_b else 'Nieznana',
|
|
company_b_slug=company_b.slug if company_b else '',
|
|
match_reason=match.match_reason,
|
|
match_score=match.match_score,
|
|
status=match.status,
|
|
created_at=match.created_at
|
|
))
|
|
|
|
stats = {
|
|
# Main stats
|
|
'total_audits': len(audited_companies),
|
|
'total_companies': len(companies),
|
|
'companies_without_audit': len(not_audited),
|
|
|
|
# Score averages
|
|
'avg_overall_score': avg_overall,
|
|
'avg_security_score': avg_security,
|
|
'avg_collaboration_score': avg_collaboration,
|
|
|
|
# Maturity distribution (flattened for template)
|
|
'maturity_basic': maturity_counts['basic'],
|
|
'maturity_developing': maturity_counts['developing'],
|
|
'maturity_established': maturity_counts['established'],
|
|
'maturity_advanced': maturity_counts['advanced'],
|
|
|
|
# Technology adoption stats (matching template naming with has_* prefix)
|
|
'has_azure_ad': tech_stats['azure_ad'],
|
|
'has_m365': tech_stats['m365'],
|
|
'has_proxmox_pbs': tech_stats['proxmox_pbs'],
|
|
'has_zabbix': tech_stats['zabbix'],
|
|
'has_edr': tech_stats['edr'],
|
|
'has_dr_plan': tech_stats['dr_plan'],
|
|
|
|
# Collaboration flags
|
|
'open_to_shared_licensing': collab_stats.get('open_to_shared_licensing', 0),
|
|
'open_to_backup_replication': collab_stats.get('open_to_backup_replication', 0),
|
|
'open_to_teams_federation': collab_stats.get('open_to_teams_federation', 0),
|
|
'open_to_shared_monitoring': collab_stats.get('open_to_shared_monitoring', 0),
|
|
'open_to_collective_purchasing': collab_stats.get('open_to_collective_purchasing', 0),
|
|
'open_to_knowledge_sharing': collab_stats.get('open_to_knowledge_sharing', 0),
|
|
|
|
# Legacy nested structures (for any templates that still use them)
|
|
'maturity_counts': maturity_counts,
|
|
'tech_stats': tech_stats,
|
|
'collab_stats': collab_stats,
|
|
'total_matches': len(collaboration_matches)
|
|
}
|
|
|
|
# Convert companies list to objects with attribute access for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
|
|
return render_template('admin/it_audit_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
collaboration_matches=collaboration_matches,
|
|
now=datetime.now()
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# ACCESS OVERVIEW DASHBOARD
|
|
# ============================================================
|
|
|
|
@bp.route('/access-overview')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_access_overview():
|
|
"""Panel kontroli dostepu — kto widzi co."""
|
|
if not is_audit_owner():
|
|
abort(404)
|
|
|
|
from database import User
|
|
from sqlalchemy import func
|
|
db = SessionLocal()
|
|
try:
|
|
# All active users
|
|
all_users = db.query(User).filter(
|
|
User.is_active == True
|
|
).order_by(User.role.desc(), User.name).all()
|
|
|
|
# Role distribution
|
|
role_counts = {}
|
|
for user in all_users:
|
|
role = user.role or 'UNAFFILIATED'
|
|
role_counts[role] = role_counts.get(role, 0) + 1
|
|
|
|
# Rada Izby members
|
|
rada_members = [u for u in all_users if u.is_rada_member]
|
|
|
|
return render_template('admin/access_overview.html',
|
|
all_users=all_users,
|
|
role_counts=role_counts,
|
|
rada_members=rada_members,
|
|
audit_owner_email=current_user.email
|
|
)
|
|
finally:
|
|
db.close()
|