nordabiz/blueprints/admin/routes_security.py
Maciej Pienczyn c0d60481f0
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
refactor(rbac): Migrate legacy is_admin checks to role-based has_role()/set_role()
Replace ~20 remaining is_admin references across backend, templates and scripts
with proper SystemRole checks. Column is_admin stays as deprecated (synced by
set_role()) until DB migration removes it.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 21:06:22 +01:00

309 lines
10 KiB
Python

"""
Admin Security Routes
======================
Security dashboard, alerts, and account management for admin panel.
"""
import logging
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, User, AuditLog, SecurityAlert, SystemRole
from utils.decorators import role_required
logger = logging.getLogger(__name__)
# Check if security service is available
try:
from security_service import log_audit, _get_geoip_enabled
SECURITY_SERVICE_AVAILABLE = True
except ImportError:
SECURITY_SERVICE_AVAILABLE = False
def log_audit(*args, **kwargs):
pass
def _get_geoip_enabled():
return False
# ============================================================
# SECURITY DASHBOARD
# ============================================================
@bp.route('/security')
@login_required
@role_required(SystemRole.ADMIN)
def admin_security():
"""Security dashboard - audit logs, alerts, GeoIP stats"""
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get recent audit logs
audit_logs = db.query(AuditLog).order_by(
desc(AuditLog.created_at)
).limit(50).all()
# Get security alerts
alerts = db.query(SecurityAlert).order_by(
desc(SecurityAlert.created_at)
).limit(50).all()
# Alert stats
new_alerts_count = db.query(SecurityAlert).filter(
SecurityAlert.status == 'new'
).count()
# Recent locked accounts
locked_accounts = db.query(User).filter(
User.locked_until > datetime.now()
).all()
# Users with 2FA enabled
users_with_2fa = db.query(User).filter(
User.totp_enabled == True
).count()
total_admins = db.query(User).filter(
User.role == 'ADMIN'
).count()
# Alert type breakdown
alert_breakdown = db.query(
SecurityAlert.alert_type,
func.count(SecurityAlert.id).label('count')
).group_by(SecurityAlert.alert_type).all()
stats = {
'new_alerts': new_alerts_count,
'locked_accounts': len(locked_accounts),
'users_with_2fa': users_with_2fa,
'total_admins': total_admins,
'alert_breakdown': {a.alert_type: a.count for a in alert_breakdown}
}
# GeoIP stats
geoip_enabled = _get_geoip_enabled()
geoip_stats = {'today': 0, 'this_month': 0, 'this_year': 0, 'total': 0, 'by_country': []}
if geoip_enabled:
today = datetime.now().date()
first_of_month = today.replace(day=1)
first_of_year = today.replace(month=1, day=1)
# Count geo_blocked alerts
geoip_stats['today'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) == today
).count()
geoip_stats['this_month'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_month
).count()
geoip_stats['this_year'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_year
).count()
geoip_stats['total'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).count()
# Country breakdown (from details JSON)
country_flags = {
'RU': ('🇷🇺', 'Rosja'), 'CN': ('🇨🇳', 'Chiny'), 'KP': ('🇰🇵', 'Korea Płn.'),
'IR': ('🇮🇷', 'Iran'), 'BY': ('🇧🇾', 'Białoruś'), 'SY': ('🇸🇾', 'Syria'),
'VE': ('🇻🇪', 'Wenezuela'), 'CU': ('🇨🇺', 'Kuba')
}
geo_alerts = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).all()
country_counts = {}
for alert in geo_alerts:
if alert.details and 'country' in alert.details:
country = alert.details['country']
if country:
country_counts[country] = country_counts.get(country, 0) + 1
# Sort by count descending
sorted_countries = sorted(country_counts.items(), key=lambda x: x[1], reverse=True)
for code, count in sorted_countries:
flag, name = country_flags.get(code, ('🏴', code))
geoip_stats['by_country'].append({
'code': code, 'flag': flag, 'name': name, 'count': count
})
return render_template(
'admin/security_dashboard.html',
audit_logs=audit_logs,
alerts=alerts,
locked_accounts=locked_accounts,
stats=stats,
geoip_enabled=geoip_enabled,
geoip_stats=geoip_stats,
generated_at=datetime.now()
)
finally:
db.close()
@bp.route('/security/alert/<int:alert_id>/acknowledge', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def acknowledge_security_alert(alert_id):
"""Acknowledge a security alert"""
db = SessionLocal()
try:
alert = db.query(SecurityAlert).get(alert_id)
if not alert:
return jsonify({'success': False, 'error': 'Alert not found'}), 404
alert.status = 'acknowledged'
alert.acknowledged_by = current_user.id
alert.acknowledged_at = datetime.now()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'alert.acknowledge', 'security_alert', alert_id,
details={'alert_type': alert.alert_type})
db.commit()
return jsonify({'success': True})
finally:
db.close()
@bp.route('/security/alert/<int:alert_id>/resolve', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def resolve_security_alert(alert_id):
"""Resolve a security alert"""
note = request.form.get('note', '')
db = SessionLocal()
try:
alert = db.query(SecurityAlert).get(alert_id)
if not alert:
return jsonify({'success': False, 'error': 'Alert not found'}), 404
alert.status = 'resolved'
alert.resolution_note = note
if not alert.acknowledged_by:
alert.acknowledged_by = current_user.id
alert.acknowledged_at = datetime.now()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'alert.resolve', 'security_alert', alert_id,
details={'alert_type': alert.alert_type, 'note': note})
db.commit()
flash('Alert został rozwiązany.', 'success')
return redirect(url_for('admin.admin_security'))
finally:
db.close()
@bp.route('/security/unlock-account/<int:user_id>', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def unlock_account(user_id):
"""Unlock a locked user account"""
db = SessionLocal()
try:
user = db.query(User).get(user_id)
if not user:
return jsonify({'success': False, 'error': 'User not found'}), 404
user.locked_until = None
user.failed_login_attempts = 0
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'user.unlock', 'user', user_id, user.email)
db.commit()
flash(f'Konto {user.email} zostało odblokowane.', 'success')
return redirect(url_for('admin.admin_security'))
finally:
db.close()
@bp.route('/security/geoip-stats')
@login_required
@role_required(SystemRole.ADMIN)
def api_geoip_stats():
"""API endpoint for GeoIP stats auto-refresh"""
from sqlalchemy import func
db = SessionLocal()
try:
now = datetime.now()
geoip_enabled = _get_geoip_enabled()
if not geoip_enabled:
return jsonify({
'enabled': False,
'timestamp': now.isoformat()
})
today = now.date()
first_of_month = today.replace(day=1)
first_of_year = today.replace(month=1, day=1)
stats = {
'enabled': True,
'timestamp': now.isoformat(),
'today': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) == today
).count(),
'this_month': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_month
).count(),
'this_year': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_year
).count(),
'total': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).count()
}
# Country breakdown
country_flags = {
'RU': ('🇷🇺', 'Rosja'), 'CN': ('🇨🇳', 'Chiny'), 'KP': ('🇰🇵', 'Korea Płn.'),
'IR': ('🇮🇷', 'Iran'), 'BY': ('🇧🇾', 'Białoruś'), 'SY': ('🇸🇾', 'Syria'),
'VE': ('🇻🇪', 'Wenezuela'), 'CU': ('🇨🇺', 'Kuba')
}
geo_alerts = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).all()
country_counts = {}
for alert in geo_alerts:
if alert.details and 'country' in alert.details:
country = alert.details['country']
if country:
country_counts[country] = country_counts.get(country, 0) + 1
by_country = []
for code, count in sorted(country_counts.items(), key=lambda x: x[1], reverse=True):
flag, name = country_flags.get(code, ('🏴', code))
by_country.append({'code': code, 'flag': flag, 'name': name, 'count': count})
stats['by_country'] = by_country
return jsonify(stats)
finally:
db.close()