nordabiz/blueprints/admin/routes_user_insights.py
Maciej Pienczyn c3ecd86a8c
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: add User Insights dashboard with 5 tabs and user profiles
New admin dashboard at /admin/user-insights providing:
- Problem detection tab (problem scoring, locked accounts, failed logins)
- Engagement ranking tab (engagement scoring, WoW comparison, sparklines)
- Page map tab (section heatmap, top 50 pages, unused pages)
- Paths tab (entry/exit pages, transitions, drop-off analysis)
- Overview tab (Chart.js charts, hourly heatmap, device breakdown)
- Individual user drill-down profiles with timelines and gauges

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 21:42:43 +01:00

994 lines
36 KiB
Python

"""
Admin User Insights Routes
============================
User Insights Dashboard - problem detection, engagement scoring,
page popularity, user flows, and behavioral profiles.
"""
import csv
import io
import logging
from datetime import date, timedelta, datetime
from flask import render_template, request, redirect, url_for, flash, Response
from flask_login import login_required
from sqlalchemy import func, desc, text, or_
from sqlalchemy.orm import joinedload
from . import bp
from database import (
SessionLocal, User, UserSession, PageView, SearchQuery,
ConversionEvent, JSError, EmailLog, SecurityAlert,
AnalyticsDaily, SystemRole
)
from utils.decorators import role_required
logger = logging.getLogger(__name__)
def _get_period_dates(period):
"""Return (start_date, days) for given period string."""
today = date.today()
if period == 'day':
return today, 1
elif period == 'month':
return today - timedelta(days=30), 30
else: # week (default)
return today - timedelta(days=7), 7
# ============================================================
# MAIN DASHBOARD
# ============================================================
@bp.route('/user-insights')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def user_insights():
"""User Insights Dashboard - 5 tabs."""
tab = request.args.get('tab', 'problems')
period = request.args.get('period', 'week')
start_date, days = _get_period_dates(period)
db = SessionLocal()
try:
data = {}
if tab == 'problems':
data = _tab_problems(db, start_date, days)
elif tab == 'engagement':
data = _tab_engagement(db, start_date, days)
elif tab == 'pages':
data = _tab_pages(db, start_date, days)
elif tab == 'paths':
data = _tab_paths(db, start_date, days)
elif tab == 'overview':
data = _tab_overview(db, start_date, days)
return render_template(
'admin/user_insights.html',
tab=tab,
period=period,
data=data
)
except Exception as e:
logger.error(f"User insights error: {e}", exc_info=True)
flash('Błąd ładowania danych insights.', 'error')
return redirect(url_for('admin.admin_analytics'))
finally:
db.close()
# ============================================================
# TAB 1: PROBLEMS
# ============================================================
def _tab_problems(db, start_date, days):
"""Problem detection tab - identify users with issues."""
now = datetime.now()
start_dt = datetime.combine(start_date, datetime.min.time())
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
# Stat cards
locked_accounts = db.query(func.count(User.id)).filter(
User.locked_until > now, User.is_active == True
).scalar() or 0
failed_logins_7d = db.query(func.count(SecurityAlert.id)).filter(
SecurityAlert.alert_type.in_(['brute_force', 'account_locked']),
SecurityAlert.created_at >= start_dt
).scalar() or 0
password_resets_7d = db.query(func.count(EmailLog.id)).filter(
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_dt
).scalar() or 0
js_errors_7d = db.query(func.count(JSError.id)).filter(
JSError.occurred_at >= start_dt
).scalar() or 0
# Problem users - raw data per user
users = db.query(User).filter(User.is_active == True).all()
problem_users = []
for user in users:
# Failed logins
fl = user.failed_login_attempts or 0
# Security alerts 7d
sa_7d = db.query(func.count(SecurityAlert.id)).filter(
SecurityAlert.user_email == user.email,
SecurityAlert.created_at >= start_dt
).scalar() or 0
# Password resets 30d
pr_30d = db.query(func.count(EmailLog.id)).filter(
EmailLog.user_id == user.id,
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_30d
).scalar() or 0
# JS errors 7d (via sessions)
je_7d = db.query(func.count(JSError.id)).join(
UserSession, JSError.session_id == UserSession.id
).filter(
UserSession.user_id == user.id,
JSError.occurred_at >= start_dt
).scalar() or 0
# Slow pages 7d
sp_7d = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= start_dt,
PageView.load_time_ms > 3000
).scalar() or 0
is_locked = 1 if user.locked_until and user.locked_until > now else 0
score = min(100,
fl * 10 +
pr_30d * 15 +
je_7d * 3 +
sp_7d * 2 +
sa_7d * 20 +
is_locked * 40
)
if score > 0:
problem_users.append({
'user': user,
'score': score,
'failed_logins': fl,
'password_resets': pr_30d,
'js_errors': je_7d,
'slow_pages': sp_7d,
'security_alerts': sa_7d,
'is_locked': is_locked,
'last_login': user.last_login,
})
problem_users.sort(key=lambda x: x['score'], reverse=True)
return {
'locked_accounts': locked_accounts,
'failed_logins': failed_logins_7d,
'password_resets': password_resets_7d,
'js_errors': js_errors_7d,
'problem_users': problem_users[:50],
}
# ============================================================
# TAB 2: ENGAGEMENT
# ============================================================
def _tab_engagement(db, start_date, days):
"""Engagement ranking tab."""
now = datetime.now()
start_dt = datetime.combine(start_date, datetime.min.time())
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
prev_start = datetime.combine(start_date - timedelta(days=days), datetime.min.time())
# Stat cards
active_7d = db.query(func.count(func.distinct(UserSession.user_id))).filter(
UserSession.user_id.isnot(None),
UserSession.started_at >= start_dt
).scalar() or 0
all_users = db.query(User).filter(User.is_active == True).all()
at_risk = 0
dormant = 0
new_this_month = 0
first_of_month = date.today().replace(day=1)
for u in all_users:
if u.created_at and u.created_at.date() >= first_of_month:
new_this_month += 1
if u.last_login:
days_since = (date.today() - u.last_login.date()).days
if 8 <= days_since <= 30:
at_risk += 1
elif days_since > 30:
dormant += 1
elif u.last_login is None:
dormant += 1
# Engagement ranking - compute per user
registered_users = db.query(User).filter(
User.is_active == True, User.role != 'UNAFFILIATED'
).all()
engagement_list = []
for user in registered_users:
# Current period
sessions_cur = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_dt
).scalar() or 0
pv_cur = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= start_dt
).scalar() or 0
# Previous period for WoW
sessions_prev = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= prev_start,
UserSession.started_at < start_dt
).scalar() or 0
pv_prev = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= prev_start,
PageView.viewed_at < start_dt
).scalar() or 0
# 30d engagement score components
s30 = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_30d
).scalar() or 0
pv30 = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= start_30d
).scalar() or 0
clicks30 = db.query(func.sum(UserSession.clicks_count)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_30d
).scalar() or 0
dur30 = db.query(func.sum(UserSession.duration_seconds)).filter(
UserSession.user_id == user.id,
UserSession.started_at >= start_30d
).scalar() or 0
conv30 = db.query(func.count(ConversionEvent.id)).filter(
ConversionEvent.user_id == user.id,
ConversionEvent.converted_at >= start_30d
).scalar() or 0
search30 = db.query(func.count(SearchQuery.id)).filter(
SearchQuery.user_id == user.id,
SearchQuery.searched_at >= start_30d
).scalar() or 0
score = min(100,
s30 * 3 + pv30 * 1 + int(clicks30) * 0.5 +
int(dur30) / 60 * 2 + conv30 * 10 + search30 * 2
)
score = int(score)
# WoW change
wow = None
if pv_prev > 0:
wow = round((pv_cur - pv_prev) / pv_prev * 100)
elif pv_cur > 0:
wow = 100
# Status
days_since_login = None
if user.last_login:
days_since_login = (date.today() - user.last_login.date()).days
if days_since_login is not None and days_since_login <= 7 and score >= 20:
status = 'active'
elif (days_since_login is not None and 8 <= days_since_login <= 30) or (5 <= score < 20):
status = 'at_risk'
else:
status = 'dormant'
# Daily sparkline (7 days)
sparkline = []
for i in range(7):
d = date.today() - timedelta(days=6 - i)
d_start = datetime.combine(d, datetime.min.time())
d_end = datetime.combine(d + timedelta(days=1), datetime.min.time())
cnt = db.query(func.count(PageView.id)).filter(
PageView.user_id == user.id,
PageView.viewed_at >= d_start,
PageView.viewed_at < d_end
).scalar() or 0
sparkline.append(cnt)
if sessions_cur > 0 or score > 0:
engagement_list.append({
'user': user,
'score': score,
'sessions': sessions_cur,
'page_views': pv_cur,
'wow': wow,
'status': status,
'sparkline': sparkline,
})
engagement_list.sort(key=lambda x: x['score'], reverse=True)
return {
'active_7d': active_7d,
'at_risk': at_risk,
'dormant': dormant,
'new_this_month': new_this_month,
'engagement_list': engagement_list[:50],
}
# ============================================================
# TAB 3: PAGE MAP
# ============================================================
def _tab_pages(db, start_date, days):
"""Page popularity map."""
start_dt = datetime.combine(start_date, datetime.min.time())
# Page sections with grouping
section_map = {
'Strona główna': ['/'],
'Profile firm': ['/company/'],
'Forum': ['/forum'],
'Chat': ['/chat'],
'Wyszukiwarka': ['/search', '/szukaj'],
'Wydarzenia': ['/events', '/wydarzenia'],
'Ogłoszenia': ['/classifieds', '/ogloszenia'],
'Członkostwo': ['/membership', '/czlonkostwo'],
'Admin': ['/admin'],
}
sections = []
for name, prefixes in section_map.items():
conditions = [PageView.path.like(p + '%') for p in prefixes]
if prefixes == ['/']:
conditions = [PageView.path == '/']
q = db.query(
func.count(PageView.id).label('views'),
func.count(func.distinct(PageView.user_id)).label('unique_users'),
func.avg(PageView.time_on_page_seconds).label('avg_time')
).filter(
or_(*conditions),
PageView.viewed_at >= start_dt
).first()
sections.append({
'name': name,
'views': q.views or 0,
'unique_users': q.unique_users or 0,
'avg_time': int(q.avg_time or 0),
})
max_views = max((s['views'] for s in sections), default=1) or 1
for s in sections:
s['intensity'] = min(100, int(s['views'] / max_views * 100))
# Top 50 pages
top_pages = db.query(
PageView.path,
func.count(PageView.id).label('views'),
func.count(func.distinct(PageView.user_id)).label('unique_users'),
func.avg(PageView.time_on_page_seconds).label('avg_time'),
func.avg(PageView.scroll_depth_percent).label('avg_scroll'),
func.avg(PageView.load_time_ms).label('avg_load'),
).filter(
PageView.viewed_at >= start_dt
).group_by(PageView.path).order_by(desc('views')).limit(50).all()
max_page_views = top_pages[0].views if top_pages else 1
pages_list = []
for p in top_pages:
pages_list.append({
'path': p.path,
'views': p.views,
'unique_users': p.unique_users,
'avg_time': int(p.avg_time or 0),
'avg_scroll': int(p.avg_scroll or 0),
'avg_load': int(p.avg_load or 0),
'bar_pct': int(p.views / max_page_views * 100),
})
# Ignored pages (< 5 views in 30d)
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
ignored = db.query(
PageView.path,
func.count(PageView.id).label('views'),
).filter(
PageView.viewed_at >= start_30d
).group_by(PageView.path).having(
func.count(PageView.id) < 5
).order_by('views').limit(30).all()
return {
'sections': sections,
'top_pages': pages_list,
'ignored_pages': [{'path': p.path, 'views': p.views} for p in ignored],
}
# ============================================================
# TAB 4: PATHS
# ============================================================
def _tab_paths(db, start_date, days):
"""User flow analysis."""
start_dt = datetime.combine(start_date, datetime.min.time())
# Entry pages - first page in each session
entry_sql = text("""
WITH first_pages AS (
SELECT DISTINCT ON (session_id) path
FROM page_views
WHERE viewed_at >= :start_dt AND session_id IS NOT NULL
ORDER BY session_id, viewed_at ASC
)
SELECT path, COUNT(*) as cnt
FROM first_pages
GROUP BY path ORDER BY cnt DESC LIMIT 10
""")
entry_pages = db.execute(entry_sql, {'start_dt': start_dt}).fetchall()
# Exit pages - last page in each session
exit_sql = text("""
WITH last_pages AS (
SELECT DISTINCT ON (session_id) path
FROM page_views
WHERE viewed_at >= :start_dt AND session_id IS NOT NULL
ORDER BY session_id, viewed_at DESC
)
SELECT path, COUNT(*) as cnt
FROM last_pages
GROUP BY path ORDER BY cnt DESC LIMIT 10
""")
exit_pages = db.execute(exit_sql, {'start_dt': start_dt}).fetchall()
max_entry = entry_pages[0].cnt if entry_pages else 1
max_exit = exit_pages[0].cnt if exit_pages else 1
# Top transitions
transitions_sql = text("""
WITH ordered AS (
SELECT session_id, path,
LEAD(path) OVER (PARTITION BY session_id ORDER BY viewed_at) AS next_path
FROM page_views
WHERE viewed_at >= :start_dt AND session_id IS NOT NULL
)
SELECT path, next_path, COUNT(*) as cnt
FROM ordered
WHERE next_path IS NOT NULL AND path != next_path
GROUP BY path, next_path ORDER BY cnt DESC LIMIT 30
""")
transitions = db.execute(transitions_sql, {'start_dt': start_dt}).fetchall()
# Drop-off pages (high exit rate)
dropoff_sql = text("""
WITH page_stats AS (
SELECT path, COUNT(*) as total_views
FROM page_views
WHERE viewed_at >= :start_dt AND session_id IS NOT NULL
GROUP BY path HAVING COUNT(*) >= 5
),
exit_stats AS (
SELECT path, COUNT(*) as exit_count
FROM (
SELECT DISTINCT ON (session_id) path
FROM page_views
WHERE viewed_at >= :start_dt AND session_id IS NOT NULL
ORDER BY session_id, viewed_at DESC
) lp
GROUP BY path
)
SELECT ps.path, ps.total_views as views,
COALESCE(es.exit_count, 0) as exits,
ROUND(COALESCE(es.exit_count, 0)::numeric / ps.total_views * 100, 1) as exit_rate
FROM page_stats ps
LEFT JOIN exit_stats es ON ps.path = es.path
ORDER BY exit_rate DESC LIMIT 20
""")
dropoff = db.execute(dropoff_sql, {'start_dt': start_dt}).fetchall()
# Session length distribution
session_length_sql = text("""
SELECT
CASE
WHEN pv_count = 1 THEN '1 strona'
WHEN pv_count = 2 THEN '2 strony'
WHEN pv_count BETWEEN 3 AND 5 THEN '3-5 stron'
WHEN pv_count BETWEEN 6 AND 10 THEN '6-10 stron'
ELSE '10+ stron'
END as bucket,
COUNT(*) as cnt
FROM (
SELECT session_id, COUNT(*) as pv_count
FROM page_views
WHERE viewed_at >= :start_dt AND session_id IS NOT NULL
GROUP BY session_id
) session_counts
GROUP BY bucket
ORDER BY MIN(pv_count)
""")
session_lengths = db.execute(session_length_sql, {'start_dt': start_dt}).fetchall()
max_sl = max((r.cnt for r in session_lengths), default=1) or 1
return {
'entry_pages': [{'path': r.path, 'count': r.cnt, 'bar_pct': int(r.cnt / max_entry * 100)} for r in entry_pages],
'exit_pages': [{'path': r.path, 'count': r.cnt, 'bar_pct': int(r.cnt / max_exit * 100)} for r in exit_pages],
'transitions': [{'from': r.path, 'to': r.next_path, 'count': r.cnt} for r in transitions],
'dropoff': [{'path': r.path, 'views': r.views, 'exits': r.exits, 'exit_rate': float(r.exit_rate)} for r in dropoff],
'session_lengths': [{'bucket': r.bucket, 'count': r.cnt, 'bar_pct': int(r.cnt / max_sl * 100)} for r in session_lengths],
}
# ============================================================
# TAB 5: OVERVIEW
# ============================================================
def _tab_overview(db, start_date, days):
"""Overview charts - sessions, hourly heatmap, devices."""
filter_type = request.args.get('filter', 'all') # all, logged, anonymous
start_dt = datetime.combine(start_date, datetime.min.time())
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
# Daily sessions + page views (30d)
daily_data = db.query(AnalyticsDaily).filter(
AnalyticsDaily.date >= date.today() - timedelta(days=30)
).order_by(AnalyticsDaily.date).all()
chart_labels = []
chart_sessions = []
chart_pageviews = []
for d in daily_data:
chart_labels.append(d.date.strftime('%d.%m'))
if filter_type == 'logged':
chart_sessions.append(d.total_sessions - (d.anonymous_sessions or 0))
elif filter_type == 'anonymous':
chart_sessions.append(d.anonymous_sessions or 0)
else:
chart_sessions.append(d.total_sessions or 0)
chart_pageviews.append(d.total_page_views or 0)
# Hourly heatmap (7 days x 24 hours)
heatmap_sql = text("""
SELECT EXTRACT(DOW FROM started_at)::int as dow,
EXTRACT(HOUR FROM started_at)::int as hour,
COUNT(*) as cnt
FROM user_sessions
WHERE started_at >= :start_dt
GROUP BY dow, hour
""")
heatmap_raw = db.execute(heatmap_sql, {'start_dt': start_30d}).fetchall()
heatmap = {}
max_heat = 1
for r in heatmap_raw:
key = (r.dow, r.hour)
heatmap[key] = r.cnt
if r.cnt > max_heat:
max_heat = r.cnt
heatmap_grid = []
dow_names = ['Nd', 'Pn', 'Wt', 'Śr', 'Cz', 'Pt', 'Sb']
for dow in range(7):
row = {'name': dow_names[dow], 'hours': []}
for h in range(24):
cnt = heatmap.get((dow, h), 0)
intensity = int(cnt / max_heat * 100) if max_heat else 0
row['hours'].append({'count': cnt, 'intensity': intensity})
heatmap_grid.append(row)
# Logged vs Anonymous
total_logged = db.query(func.count(UserSession.id)).filter(
UserSession.started_at >= start_30d,
UserSession.user_id.isnot(None)
).scalar() or 0
total_anon = db.query(func.count(UserSession.id)).filter(
UserSession.started_at >= start_30d,
UserSession.user_id.is_(None)
).scalar() or 0
# Devices over time (weekly)
devices_sql = text("""
SELECT DATE_TRUNC('week', started_at)::date as week,
device_type,
COUNT(*) as cnt
FROM user_sessions
WHERE started_at >= :start_dt
GROUP BY week, device_type
ORDER BY week
""")
devices_raw = db.execute(devices_sql, {'start_dt': start_30d}).fetchall()
weeks_set = sorted(set(r.week for r in devices_raw))
device_map = {}
for r in devices_raw:
if r.week not in device_map:
device_map[r.week] = {}
device_map[r.week][r.device_type or 'unknown'] = r.cnt
device_labels = [w.strftime('%d.%m') for w in weeks_set]
device_desktop = [device_map.get(w, {}).get('desktop', 0) for w in weeks_set]
device_mobile = [device_map.get(w, {}).get('mobile', 0) for w in weeks_set]
device_tablet = [device_map.get(w, {}).get('tablet', 0) for w in weeks_set]
return {
'filter_type': filter_type,
'chart_data': {
'labels': chart_labels,
'sessions': chart_sessions,
'pageviews': chart_pageviews,
},
'heatmap': heatmap_grid,
'logged_vs_anon': {'logged': total_logged, 'anonymous': total_anon},
'devices': {
'labels': device_labels,
'desktop': device_desktop,
'mobile': device_mobile,
'tablet': device_tablet,
},
}
# ============================================================
# USER PROFILE DRILL-DOWN
# ============================================================
@bp.route('/user-insights/user/<int:user_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def user_insights_profile(user_id):
"""Individual user behavioral profile."""
db = SessionLocal()
try:
user = db.query(User).options(joinedload(User.company)).get(user_id)
if not user:
flash('Użytkownik nie znaleziony.', 'error')
return redirect(url_for('admin.user_insights'))
now = datetime.now()
start_30d = datetime.combine(date.today() - timedelta(days=30), datetime.min.time())
start_7d = datetime.combine(date.today() - timedelta(days=7), datetime.min.time())
# Engagement score (30d)
s30 = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user_id, UserSession.started_at >= start_30d
).scalar() or 0
pv30 = db.query(func.count(PageView.id)).filter(
PageView.user_id == user_id, PageView.viewed_at >= start_30d
).scalar() or 0
clicks30 = db.query(func.sum(UserSession.clicks_count)).filter(
UserSession.user_id == user_id, UserSession.started_at >= start_30d
).scalar() or 0
dur30 = db.query(func.sum(UserSession.duration_seconds)).filter(
UserSession.user_id == user_id, UserSession.started_at >= start_30d
).scalar() or 0
conv30 = db.query(func.count(ConversionEvent.id)).filter(
ConversionEvent.user_id == user_id, ConversionEvent.converted_at >= start_30d
).scalar() or 0
search30 = db.query(func.count(SearchQuery.id)).filter(
SearchQuery.user_id == user_id, SearchQuery.searched_at >= start_30d
).scalar() or 0
engagement_score = min(100, int(
s30 * 3 + pv30 * 1 + int(clicks30) * 0.5 +
int(dur30) / 60 * 2 + conv30 * 10 + search30 * 2
))
# Problem score
fl = user.failed_login_attempts or 0
sa_7d = db.query(func.count(SecurityAlert.id)).filter(
SecurityAlert.user_email == user.email,
SecurityAlert.created_at >= start_7d
).scalar() or 0
pr_30d = db.query(func.count(EmailLog.id)).filter(
EmailLog.user_id == user_id,
EmailLog.email_type == 'password_reset',
EmailLog.created_at >= start_30d
).scalar() or 0
je_7d = db.query(func.count(JSError.id)).join(
UserSession, JSError.session_id == UserSession.id
).filter(
UserSession.user_id == user_id,
JSError.occurred_at >= start_7d
).scalar() or 0
sp_7d = db.query(func.count(PageView.id)).filter(
PageView.user_id == user_id,
PageView.viewed_at >= start_7d,
PageView.load_time_ms > 3000
).scalar() or 0
is_locked = 1 if user.locked_until and user.locked_until > now else 0
problem_score = min(100,
fl * 10 + pr_30d * 15 + je_7d * 3 + sp_7d * 2 + sa_7d * 20 + is_locked * 40
)
# Timeline (last 100 events)
timeline = []
# Recent sessions (logins)
sessions = db.query(UserSession).filter(
UserSession.user_id == user_id
).order_by(desc(UserSession.started_at)).limit(20).all()
for s in sessions:
timeline.append({
'type': 'login',
'icon': 'key',
'time': s.started_at,
'desc': f'Sesja ({s.device_type or "?"}, {s.browser or "?"})',
})
# Recent page views (key pages only)
key_paths = ['/', '/forum', '/chat', '/search', '/admin', '/events', '/membership']
recent_pvs = db.query(PageView).filter(
PageView.user_id == user_id,
).order_by(desc(PageView.viewed_at)).limit(50).all()
for pv in recent_pvs:
is_key = any(pv.path == p or pv.path.startswith(p + '/') for p in key_paths)
if is_key or '/company/' in pv.path:
timeline.append({
'type': 'pageview',
'icon': 'eye',
'time': pv.viewed_at,
'desc': f'Odwiedzono: {pv.path}',
})
# Recent searches
searches = db.query(SearchQuery).filter(
SearchQuery.user_id == user_id
).order_by(desc(SearchQuery.searched_at)).limit(10).all()
for s in searches:
timeline.append({
'type': 'search',
'icon': 'search',
'time': s.searched_at,
'desc': f'Szukano: "{s.query}"',
})
# Conversions
convs = db.query(ConversionEvent).filter(
ConversionEvent.user_id == user_id
).order_by(desc(ConversionEvent.converted_at)).limit(10).all()
for c in convs:
timeline.append({
'type': 'conversion',
'icon': 'check',
'time': c.converted_at,
'desc': f'Konwersja: {c.event_type}',
})
# Password resets
resets = db.query(EmailLog).filter(
EmailLog.user_id == user_id,
EmailLog.email_type == 'password_reset'
).order_by(desc(EmailLog.created_at)).limit(5).all()
for r in resets:
timeline.append({
'type': 'problem',
'icon': 'alert',
'time': r.created_at,
'desc': 'Reset hasła',
})
# Security alerts
alerts = db.query(SecurityAlert).filter(
SecurityAlert.user_email == user.email
).order_by(desc(SecurityAlert.created_at)).limit(10).all()
for a in alerts:
timeline.append({
'type': 'problem',
'icon': 'shield',
'time': a.created_at,
'desc': f'Alert: {a.alert_type} ({a.severity})',
})
timeline.sort(key=lambda x: x['time'], reverse=True)
timeline = timeline[:100]
# Favorite pages (top 10)
fav_pages = db.query(
PageView.path,
func.count(PageView.id).label('cnt')
).filter(
PageView.user_id == user_id,
PageView.viewed_at >= start_30d
).group_by(PageView.path).order_by(desc('cnt')).limit(10).all()
max_fav = fav_pages[0].cnt if fav_pages else 1
# Device/browser breakdown
devices = db.query(
UserSession.device_type,
func.count(UserSession.id).label('cnt')
).filter(
UserSession.user_id == user_id,
UserSession.started_at >= start_30d
).group_by(UserSession.device_type).all()
browsers = db.query(
UserSession.browser,
func.count(UserSession.id).label('cnt')
).filter(
UserSession.user_id == user_id,
UserSession.started_at >= start_30d
).group_by(UserSession.browser).order_by(desc('cnt')).limit(5).all()
# Hourly activity pattern (24 bars)
hourly_sql = text("""
SELECT EXTRACT(HOUR FROM started_at)::int as hour, COUNT(*) as cnt
FROM user_sessions
WHERE user_id = :uid AND started_at >= :start_dt
GROUP BY hour ORDER BY hour
""")
hourly_raw = db.execute(hourly_sql, {'uid': user_id, 'start_dt': start_30d}).fetchall()
hourly = {r.hour: r.cnt for r in hourly_raw}
max_hourly = max(hourly.values(), default=1) or 1
hourly_bars = []
for h in range(24):
cnt = hourly.get(h, 0)
hourly_bars.append({'hour': h, 'count': cnt, 'pct': int(cnt / max_hourly * 100)})
# Daily engagement trend (30d for Chart.js)
trend_labels = []
trend_scores = []
for i in range(30):
d = date.today() - timedelta(days=29 - i)
d_start = datetime.combine(d, datetime.min.time())
d_end = datetime.combine(d + timedelta(days=1), datetime.min.time())
d_sessions = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user_id,
UserSession.started_at >= d_start,
UserSession.started_at < d_end
).scalar() or 0
d_pv = db.query(func.count(PageView.id)).filter(
PageView.user_id == user_id,
PageView.viewed_at >= d_start,
PageView.viewed_at < d_end
).scalar() or 0
daily_score = min(30, d_sessions * 3 + d_pv)
trend_labels.append(d.strftime('%d.%m'))
trend_scores.append(daily_score)
# Problem history
js_errors_list = db.query(JSError).join(
UserSession, JSError.session_id == UserSession.id
).filter(
UserSession.user_id == user_id
).order_by(desc(JSError.occurred_at)).limit(10).all()
slow_pages_list = db.query(PageView).filter(
PageView.user_id == user_id,
PageView.load_time_ms > 3000
).order_by(desc(PageView.viewed_at)).limit(10).all()
# Avg sessions per week
weeks_active = max(1, (date.today() - (user.created_at.date() if user.created_at else date.today())).days / 7)
total_sessions_all = db.query(func.count(UserSession.id)).filter(
UserSession.user_id == user_id
).scalar() or 0
avg_sessions_week = round(total_sessions_all / weeks_active, 1)
avg_session_dur = db.query(func.avg(UserSession.duration_seconds)).filter(
UserSession.user_id == user_id,
UserSession.duration_seconds.isnot(None)
).scalar() or 0
return render_template(
'admin/user_insights_profile.html',
user=user,
engagement_score=engagement_score,
problem_score=problem_score,
timeline=timeline,
fav_pages=[{'path': p.path, 'count': p.cnt, 'bar_pct': int(p.cnt / max_fav * 100)} for p in fav_pages],
devices=[{'type': d.device_type or 'unknown', 'count': d.cnt} for d in devices],
browsers=[{'name': b.browser or 'unknown', 'count': b.cnt} for b in browsers],
hourly_bars=hourly_bars,
trend_data={'labels': trend_labels, 'scores': trend_scores},
js_errors=js_errors_list,
slow_pages=slow_pages_list,
password_resets=pr_30d,
security_alerts_count=sa_7d,
avg_sessions_week=avg_sessions_week,
avg_session_duration=int(avg_session_dur),
search_queries=searches,
)
except Exception as e:
logger.error(f"User insights profile error: {e}", exc_info=True)
flash('Błąd ładowania profilu użytkownika.', 'error')
return redirect(url_for('admin.user_insights'))
finally:
db.close()
# ============================================================
# CSV EXPORT
# ============================================================
@bp.route('/user-insights/export')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def user_insights_export():
"""Export user insights data as CSV."""
export_type = request.args.get('type', 'engagement')
period = request.args.get('period', 'week')
start_date, days = _get_period_dates(period)
db = SessionLocal()
try:
output = io.StringIO()
writer = csv.writer(output)
if export_type == 'problems':
data = _tab_problems(db, start_date, days)
writer.writerow(['Użytkownik', 'Email', 'Problem Score', 'Nieudane logowania',
'Resety hasła', 'Błędy JS', 'Wolne strony', 'Ostatni login'])
for p in data['problem_users']:
writer.writerow([
p['user'].name, p['user'].email, p['score'],
p['failed_logins'], p['password_resets'], p['js_errors'],
p['slow_pages'], p['last_login'] or 'Nigdy'
])
elif export_type == 'engagement':
data = _tab_engagement(db, start_date, days)
writer.writerow(['Użytkownik', 'Email', 'Score', 'Sesje', 'Odsłony',
'Zmiana WoW %', 'Status'])
for e in data['engagement_list']:
writer.writerow([
e['user'].name, e['user'].email, e['score'],
e['sessions'], e['page_views'],
f"{e['wow']}%" if e['wow'] is not None else 'N/A',
e['status']
])
elif export_type == 'pages':
data = _tab_pages(db, start_date, days)
writer.writerow(['Ścieżka', 'Odsłony', 'Unikalni', 'Śr. czas (s)',
'Śr. scroll %', 'Śr. ładowanie (ms)'])
for p in data['top_pages']:
writer.writerow([
p['path'], p['views'], p['unique_users'],
p['avg_time'], p['avg_scroll'], p['avg_load']
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename=user_insights_{export_type}_{period}.csv'}
)
except Exception as e:
logger.error(f"User insights export error: {e}")
flash('Błąd eksportu danych.', 'error')
return redirect(url_for('admin.user_insights'))
finally:
db.close()