nordabiz/blueprints/admin/routes_analytics.py
Maciej Pienczyn c9d133cf90
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
refactor: simplify AI monitoring dashboard with PLN costs
Replace complex dashboard (11 stat cards, token stats, model breakdown,
recent logs, advanced filters) with clean 3-card PLN cost view,
usage by type, user ranking, company ranking, and daily history.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 04:22:27 +01:00

708 lines
27 KiB
Python

"""
Admin Analytics Routes
=======================
User analytics dashboard - sessions, page views, clicks, conversions.
"""
import csv
import io
import logging
from datetime import date, timedelta
from flask import render_template, request, redirect, url_for, flash, Response
from flask_login import login_required, current_user
from sqlalchemy import func, desc
from sqlalchemy.orm import joinedload
from . import bp
from database import (
SessionLocal, User, UserSession, PageView, SearchQuery,
ConversionEvent, JSError, Company, AIUsageLog, AIUsageDaily,
SystemRole
)
from utils.decorators import role_required
logger = logging.getLogger(__name__)
# ============================================================
# ANALYTICS DASHBOARD
# ============================================================
@bp.route('/analytics-old')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_analytics():
"""Redirect old analytics to consolidated view."""
return redirect(url_for('admin.user_insights', **request.args))
@bp.route('/analytics/data-export')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_analytics_export():
"""Export analytics data as CSV"""
export_type = request.args.get('type', 'sessions')
period = request.args.get('period', 'month')
today = date.today()
if period == 'day':
start_date = today
elif period == 'week':
start_date = today - timedelta(days=7)
elif period == 'month':
start_date = today - timedelta(days=30)
else:
start_date = today - timedelta(days=365) # year
db = SessionLocal()
try:
output = io.StringIO()
writer = csv.writer(output)
if export_type == 'sessions':
writer.writerow(['ID', 'User ID', 'Started At', 'Duration (s)', 'Page Views', 'Clicks',
'Device', 'Browser', 'OS', 'Country', 'UTM Source', 'UTM Campaign'])
sessions = db.query(UserSession).filter(
func.date(UserSession.started_at) >= start_date
).order_by(UserSession.started_at.desc()).all()
for s in sessions:
writer.writerow([
s.id, s.user_id, s.started_at.isoformat() if s.started_at else '',
s.duration_seconds or 0, s.page_views_count or 0, s.clicks_count or 0,
s.device_type or '', s.browser or '', s.os or '',
s.country or '', s.utm_source or '', s.utm_campaign or ''
])
elif export_type == 'pageviews':
writer.writerow(['ID', 'Session ID', 'User ID', 'Path', 'Viewed At', 'Time on Page (s)',
'Scroll Depth (%)', 'Company ID'])
views = db.query(PageView).filter(
func.date(PageView.viewed_at) >= start_date
).order_by(PageView.viewed_at.desc()).limit(10000).all()
for v in views:
writer.writerow([
v.id, v.session_id, v.user_id, v.path,
v.viewed_at.isoformat() if v.viewed_at else '',
v.time_on_page_seconds or 0, v.scroll_depth_percent or 0, v.company_id or ''
])
elif export_type == 'searches':
writer.writerow(['ID', 'User ID', 'Query', 'Results Count', 'Has Results', 'Clicked Company',
'Search Type', 'Searched At'])
searches = db.query(SearchQuery).filter(
func.date(SearchQuery.searched_at) >= start_date
).order_by(SearchQuery.searched_at.desc()).limit(10000).all()
for s in searches:
writer.writerow([
s.id, s.user_id, s.query, s.results_count, s.has_results,
s.clicked_company_id or '', s.search_type,
s.searched_at.isoformat() if s.searched_at else ''
])
elif export_type == 'conversions':
writer.writerow(['ID', 'User ID', 'Event Type', 'Event Category', 'Company ID',
'Target Type', 'Converted At'])
conversions = db.query(ConversionEvent).filter(
func.date(ConversionEvent.converted_at) >= start_date
).order_by(ConversionEvent.converted_at.desc()).all()
for c in conversions:
writer.writerow([
c.id, c.user_id, c.event_type, c.event_category or '',
c.company_id or '', c.target_type or '',
c.converted_at.isoformat() if c.converted_at else ''
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename=analytics_{export_type}_{period}.csv'}
)
except Exception as e:
logger.error(f"Export error: {e}")
flash('Blad podczas eksportu.', 'error')
return redirect(url_for('admin.user_insights'))
finally:
db.close()
# ============================================================
# AI USAGE DASHBOARD
# ============================================================
@bp.route('/ai-usage')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_ai_usage():
"""Admin dashboard for AI (Gemini) API usage monitoring"""
period = request.args.get('period', 'month')
db = SessionLocal()
try:
today = date.today()
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)
# Determine date filter based on period
period_end = None
period_labels = {
'day': ('Dzisiaj', today),
'week': ('Ten tydzień', week_ago),
'month': ('Ten miesiąc', month_ago),
'all': ('Od początku', None)
}
period_label, period_start = period_labels.get(period, period_labels['month'])
# Apply period filter to queries
def apply_period(query):
if period_start:
query = query.filter(func.date(AIUsageLog.created_at) >= period_start)
if period == 'custom' and period_end:
query = query.filter(func.date(AIUsageLog.created_at) <= period_end)
return query
# Today's stats (always show, unfiltered)
today_stats = db.query(
func.count(AIUsageLog.id).label('requests'),
func.coalesce(func.sum(AIUsageLog.tokens_input), 0).label('tokens_input'),
func.coalesce(func.sum(AIUsageLog.tokens_output), 0).label('tokens_output'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).filter(
func.date(AIUsageLog.created_at) == today
).first()
# Month stats
month_stats = db.query(
func.count(AIUsageLog.id).label('requests'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).filter(
func.date(AIUsageLog.created_at) >= month_ago
).first()
# All-time stats
all_time_stats = db.query(
func.count(AIUsageLog.id).label('requests'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).first()
# PLN conversion rate (approximate, updated manually)
USD_TO_PLN = 4.05
# Usage by type (filtered)
type_query = db.query(
AIUsageLog.request_type,
func.count(AIUsageLog.id).label('count'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents'),
func.coalesce(func.sum(AIUsageLog.tokens_input), 0).label('tokens_input'),
func.coalesce(func.sum(AIUsageLog.tokens_output), 0).label('tokens_output')
)
type_query = apply_period(type_query)
type_stats = type_query.group_by(AIUsageLog.request_type).order_by(desc('count')).all()
# Calculate percentages for type breakdown
total_type_count = sum(t.count for t in type_stats) if type_stats else 0
type_labels = {
'ai_chat': ('Chat AI', 'chat'),
'zopk_news_evaluation': ('Ocena newsów ZOP Kaszubia', 'news'),
'ai_user_parse': ('Tworzenie user', 'user'),
'gbp_audit_ai': ('Audyt GBP', 'image'),
'general': ('Ogólne', 'other')
}
usage_by_type = []
for t in type_stats:
label, css_class = type_labels.get(t.request_type, (t.request_type, 'other'))
percentage = (t.count / total_type_count * 100) if total_type_count > 0 else 0
cost_usd = float(t.cost_cents or 0) / 100
usage_by_type.append({
'type_label': label,
'type_class': css_class,
'count': t.count,
'percentage': round(percentage, 1),
'cost_pln': round(cost_usd * USD_TO_PLN, 2),
})
# User statistics (filtered)
user_query = db.query(
User.id,
User.name.label('user_name'),
User.email,
Company.name.label('company_name'),
func.count(AIUsageLog.id).label('requests'),
func.coalesce(func.sum(AIUsageLog.tokens_input), 0).label('tokens_input'),
func.coalesce(func.sum(AIUsageLog.tokens_output), 0).label('tokens_output'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).join(
AIUsageLog, AIUsageLog.user_id == User.id
).outerjoin(
Company, User.company_id == Company.id
)
user_query = apply_period(user_query)
user_stats = user_query.group_by(
User.id, User.name, User.email, Company.name
).order_by(desc('cost_cents')).all()
# Format user stats
user_rankings = []
for u in user_stats:
cost_usd = float(u.cost_cents or 0) / 100
user_rankings.append({
'id': u.id,
'name': u.user_name or u.email,
'company': u.company_name or '-',
'requests': u.requests,
'cost_pln': round(cost_usd * USD_TO_PLN, 2),
})
# Company statistics (filtered)
company_query = db.query(
Company.id,
Company.name,
func.count(AIUsageLog.id).label('requests'),
func.count(func.distinct(AIUsageLog.user_id)).label('unique_users'),
func.coalesce(func.sum(AIUsageLog.tokens_input), 0).label('tokens_input'),
func.coalesce(func.sum(AIUsageLog.tokens_output), 0).label('tokens_output'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).join(
User, User.company_id == Company.id
).join(
AIUsageLog, AIUsageLog.user_id == User.id
)
company_query = apply_period(company_query)
company_stats = company_query.group_by(
Company.id, Company.name
).order_by(desc('cost_cents')).all()
# Format company stats
company_rankings = []
for c in company_stats:
cost_usd = float(c.cost_cents or 0) / 100
company_rankings.append({
'id': c.id,
'name': c.name,
'requests': c.requests,
'unique_users': c.unique_users,
'cost_pln': round(cost_usd * USD_TO_PLN, 2),
})
# Daily history (last 14 days)
daily_history = db.query(AIUsageDaily).filter(
AIUsageDaily.date >= today - timedelta(days=14)
).order_by(desc(AIUsageDaily.date)).all()
today_cost_usd = float(today_stats.cost_cents or 0) / 100
month_cost_usd = float(month_stats.cost_cents or 0) / 100
all_cost_usd = float(all_time_stats.cost_cents or 0) / 100
stats = {
'today_requests': today_stats.requests or 0,
'today_cost_pln': round(today_cost_usd * USD_TO_PLN, 2),
'month_requests': month_stats.requests or 0,
'month_cost_pln': round(month_cost_usd * USD_TO_PLN, 2),
'all_requests': all_time_stats.requests or 0,
'all_cost_pln': round(all_cost_usd * USD_TO_PLN, 2),
'usd_to_pln': USD_TO_PLN,
}
return render_template(
'admin/ai_usage_dashboard.html',
stats=stats,
usage_by_type=usage_by_type,
daily_history=daily_history,
user_rankings=user_rankings,
company_rankings=company_rankings,
current_period=period,
period_label=period_label,
)
finally:
db.close()
@bp.route('/ai-usage/user/<int:user_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_ai_usage_user(user_id):
"""Detailed AI usage for a specific user"""
db = SessionLocal()
try:
# Get user info
user = db.query(User).filter_by(id=user_id).first()
if not user:
flash('Użytkownik nie istnieje.', 'error')
return redirect(url_for('admin.admin_ai_usage'))
company = None
if user.company_id:
company = db.query(Company).filter_by(id=user.company_id).first()
# Get overall stats for this user
stats = db.query(
func.count(AIUsageLog.id).label('total_requests'),
func.coalesce(func.sum(AIUsageLog.tokens_input), 0).label('tokens_input'),
func.coalesce(func.sum(AIUsageLog.tokens_output), 0).label('tokens_output'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents'),
func.count(func.nullif(AIUsageLog.success, True)).label('errors')
).filter(AIUsageLog.user_id == user_id).first()
# Usage by type
type_labels = {
'ai_chat': 'Chat AI',
'zopk_news_evaluation': 'Ocena newsów ZOP Kaszubia',
'ai_user_parse': 'Tworzenie user',
'gbp_audit_ai': 'Audyt GBP',
'general': 'Ogólne'
}
type_stats = db.query(
AIUsageLog.request_type,
func.count(AIUsageLog.id).label('count'),
func.coalesce(func.sum(AIUsageLog.tokens_input + AIUsageLog.tokens_output), 0).label('tokens'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).filter(
AIUsageLog.user_id == user_id
).group_by(AIUsageLog.request_type).order_by(desc('count')).all()
# Calculate total for percentages
total_type_count = sum(t.count for t in type_stats) if type_stats else 1
type_classes = {
'ai_chat': 'chat',
'zopk_news_evaluation': 'news_evaluation',
'ai_user_parse': 'user_creation',
'gbp_audit_ai': 'image_analysis',
'general': 'other'
}
usage_by_type = []
for t in type_stats:
usage_by_type.append({
'type': t.request_type,
'type_label': type_labels.get(t.request_type, t.request_type),
'type_class': type_classes.get(t.request_type, 'other'),
'count': t.count,
'tokens': int(t.tokens),
'cost_usd': float(t.cost_cents) / 100,
'percentage': round(t.count / total_type_count * 100, 1) if total_type_count > 0 else 0
})
# Get all requests for this user (paginated)
page = request.args.get('page', 1, type=int)
per_page = 50
requests_query = db.query(AIUsageLog).filter(
AIUsageLog.user_id == user_id
).order_by(desc(AIUsageLog.created_at))
total_requests = requests_query.count()
total_pages = (total_requests + per_page - 1) // per_page
logs = requests_query.offset((page - 1) * per_page).limit(per_page).all()
# Enrich logs with type labels and cost
for log in logs:
log.type_label = type_labels.get(log.request_type, log.request_type)
log.cost_usd = float(log.cost_cents or 0) / 100
user_stats = {
'total_requests': stats.total_requests or 0,
'tokens_total': int(stats.tokens_input or 0) + int(stats.tokens_output or 0),
'tokens_input': int(stats.tokens_input or 0),
'tokens_output': int(stats.tokens_output or 0),
'cost_usd': float(stats.cost_cents or 0) / 100,
'errors': stats.errors or 0
}
return render_template(
'admin/ai_usage_user.html',
user=user,
company=company,
stats=user_stats,
usage_by_type=usage_by_type,
logs=logs,
page=page,
total_pages=total_pages,
total_requests=total_requests
)
finally:
db.close()
@bp.route('/ai-usage/company/<int:company_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_ai_usage_company(company_id):
"""Detailed AI usage for a specific company"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
flash('Firma nie istnieje.', 'error')
return redirect(url_for('admin.admin_ai_usage'))
# Get all users from this company
company_users = db.query(User).filter(User.company_id == company_id).all()
company_user_ids = [u.id for u in company_users]
if not company_user_ids:
flash('Firma nie ma przypisanych użytkowników.', 'error')
return redirect(url_for('admin.admin_ai_usage'))
# Overall stats for this company
stats = db.query(
func.count(AIUsageLog.id).label('total_requests'),
func.coalesce(func.sum(AIUsageLog.tokens_input), 0).label('tokens_input'),
func.coalesce(func.sum(AIUsageLog.tokens_output), 0).label('tokens_output'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents'),
func.count(func.nullif(AIUsageLog.success, True)).label('errors')
).filter(AIUsageLog.user_id.in_(company_user_ids)).first()
# Per-user stats
type_labels = {
'ai_chat': 'Chat AI',
'zopk_news_evaluation': 'Ocena newsów ZOP Kaszubia',
'ai_user_parse': 'Tworzenie user',
'gbp_audit_ai': 'Audyt GBP',
'general': 'Ogólne'
}
user_stats_list = db.query(
User.id,
User.name,
User.email,
func.count(AIUsageLog.id).label('requests'),
func.coalesce(func.sum(AIUsageLog.tokens_input + AIUsageLog.tokens_output), 0).label('tokens'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).join(
AIUsageLog, AIUsageLog.user_id == User.id
).filter(
User.company_id == company_id
).group_by(User.id, User.name, User.email).order_by(desc('cost_cents')).all()
user_rankings = []
for u in user_stats_list:
user_rankings.append({
'id': u.id,
'name': u.name or u.email,
'email': u.email,
'requests': u.requests,
'tokens': int(u.tokens),
'cost_usd': float(u.cost_cents or 0) / 100
})
# Usage by type
type_classes = {
'ai_chat': 'chat',
'zopk_news_evaluation': 'news_evaluation',
'ai_user_parse': 'user_creation',
'gbp_audit_ai': 'image_analysis',
'general': 'other'
}
type_stats = db.query(
AIUsageLog.request_type,
func.count(AIUsageLog.id).label('count'),
func.coalesce(func.sum(AIUsageLog.tokens_input + AIUsageLog.tokens_output), 0).label('tokens'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).filter(
AIUsageLog.user_id.in_(company_user_ids)
).group_by(AIUsageLog.request_type).order_by(desc('count')).all()
total_type_count = sum(t.count for t in type_stats) if type_stats else 1
usage_by_type = []
for t in type_stats:
usage_by_type.append({
'type': t.request_type,
'type_label': type_labels.get(t.request_type, t.request_type),
'type_class': type_classes.get(t.request_type, 'other'),
'count': t.count,
'tokens': int(t.tokens),
'cost_usd': float(t.cost_cents) / 100,
'percentage': round(t.count / total_type_count * 100, 1)
})
# Model breakdown
model_stats = db.query(
AIUsageLog.model,
func.count(AIUsageLog.id).label('count'),
func.coalesce(func.sum(AIUsageLog.cost_cents), 0).label('cost_cents')
).filter(
AIUsageLog.user_id.in_(company_user_ids)
).group_by(AIUsageLog.model).order_by(desc('count')).all()
total_model_count = sum(m.count for m in model_stats) if model_stats else 1
model_breakdown = []
for m in model_stats:
model_breakdown.append({
'model': m.model or 'unknown',
'count': m.count,
'cost_usd': float(m.cost_cents or 0) / 100,
'percentage': round(m.count / total_model_count * 100, 1)
})
# Recent logs (paginated)
page = request.args.get('page', 1, type=int)
per_page = 50
logs_query = db.query(AIUsageLog).filter(
AIUsageLog.user_id.in_(company_user_ids)
).order_by(desc(AIUsageLog.created_at))
total_requests = logs_query.count()
total_pages = (total_requests + per_page - 1) // per_page
logs = logs_query.offset((page - 1) * per_page).limit(per_page).all()
for log in logs:
log.type_label = type_labels.get(log.request_type, log.request_type)
log.cost_usd = float(log.cost_cents or 0) / 100
if log.user_id:
user = db.query(User).filter_by(id=log.user_id).first()
log.user_name = (user.name or user.email) if user else None
else:
log.user_name = None
company_stats = {
'total_requests': stats.total_requests or 0,
'tokens_total': int(stats.tokens_input or 0) + int(stats.tokens_output or 0),
'tokens_input': int(stats.tokens_input or 0),
'tokens_output': int(stats.tokens_output or 0),
'cost_usd': float(stats.cost_cents or 0) / 100,
'errors': stats.errors or 0,
'unique_users': len(user_rankings)
}
return render_template(
'admin/ai_usage_company.html',
company=company,
stats=company_stats,
user_rankings=user_rankings,
usage_by_type=usage_by_type,
model_breakdown=model_breakdown,
logs=logs,
page=page,
total_pages=total_pages,
total_requests=total_requests
)
finally:
db.close()
@bp.route('/ai-usage/export')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_ai_usage_export():
"""Export AI usage logs as CSV with filters"""
from datetime import datetime
period = request.args.get('period', 'month')
filter_user_id = request.args.get('user_id', type=int)
filter_company_id = request.args.get('company_id', type=int)
filter_model = request.args.get('model', '')
date_from = request.args.get('date_from', '')
date_to = request.args.get('date_to', '')
today = date.today()
db = SessionLocal()
try:
# Determine period
if period == 'custom' and date_from:
try:
period_start = date.fromisoformat(date_from)
period_end = date.fromisoformat(date_to) if date_to else today
except ValueError:
period_start = today - timedelta(days=30)
period_end = today
else:
period_end = None
periods = {
'day': today,
'week': today - timedelta(days=7),
'month': today - timedelta(days=30),
'all': None
}
period_start = periods.get(period, today - timedelta(days=30))
# Build query
query = db.query(AIUsageLog).options(
joinedload(AIUsageLog.user),
joinedload(AIUsageLog.company)
)
if period_start:
query = query.filter(func.date(AIUsageLog.created_at) >= period_start)
if period == 'custom' and period_end:
query = query.filter(func.date(AIUsageLog.created_at) <= period_end)
if filter_user_id:
query = query.filter(AIUsageLog.user_id == filter_user_id)
if filter_company_id:
company_user_ids = [u.id for u in db.query(User.id).filter(User.company_id == filter_company_id).all()]
if company_user_ids:
query = query.filter(AIUsageLog.user_id.in_(company_user_ids))
else:
query = query.filter(AIUsageLog.id == -1)
if filter_model:
query = query.filter(AIUsageLog.model == filter_model)
logs = query.order_by(desc(AIUsageLog.created_at)).limit(10000).all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
'Data', 'Uzytkownik', 'Email', 'Firma', 'Model', 'Typ',
'Tokeny_in', 'Tokeny_out', 'Koszt_USD', 'Czas_ms', 'Sukces', 'Blad'
])
for log in logs:
user_name = ''
user_email = ''
company_name = ''
if log.user:
user_name = log.user.name or ''
user_email = log.user.email or ''
if log.company:
company_name = log.company.name or ''
elif log.user and log.user.company_id:
comp = db.query(Company).filter_by(id=log.user.company_id).first()
company_name = comp.name if comp else ''
writer.writerow([
log.created_at.strftime('%Y-%m-%d %H:%M:%S') if log.created_at else '',
user_name,
user_email,
company_name,
log.model or '',
log.request_type or '',
log.tokens_input or 0,
log.tokens_output or 0,
f'{float(log.cost_cents or 0) / 100:.6f}',
log.response_time_ms or 0,
'TAK' if log.success else 'NIE',
log.error_message or ''
])
output.seek(0)
filename = f'ai_usage_{period}_{today.isoformat()}.csv'
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
except Exception as e:
logger.error(f"AI usage export error: {e}")
flash('Blad podczas eksportu.', 'error')
return redirect(url_for('admin.admin_ai_usage'))
finally:
db.close()