nordabiz/blueprints/admin/routes_security.py
Maciej Pienczyn 7073a56dc3
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: security panel - recent blocks table + top attacked paths
Added to GeoIP tab:
- Last 20 blocked requests with IP, country, path, timestamp
- Top 10 most targeted URL paths with hit counts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 10:45:29 +02:00

338 lines
11 KiB
Python

"""
Admin Security Routes
======================
Security dashboard, alerts, and account management for admin panel.
"""
import logging
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, User, AuditLog, SecurityAlert, SystemRole
from utils.decorators import role_required
logger = logging.getLogger(__name__)
# Check if security service is available
try:
from security_service import log_audit, _get_geoip_enabled
SECURITY_SERVICE_AVAILABLE = True
except ImportError:
SECURITY_SERVICE_AVAILABLE = False
def log_audit(*args, **kwargs):
pass
def _get_geoip_enabled():
return False
# ============================================================
# SECURITY DASHBOARD
# ============================================================
@bp.route('/security')
@login_required
@role_required(SystemRole.ADMIN)
def admin_security():
"""Security dashboard - audit logs, alerts, GeoIP stats"""
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get recent audit logs
audit_logs = db.query(AuditLog).order_by(
desc(AuditLog.created_at)
).limit(50).all()
# Get security alerts
alerts = db.query(SecurityAlert).order_by(
desc(SecurityAlert.created_at)
).limit(50).all()
# Alert stats
new_alerts_count = db.query(SecurityAlert).filter(
SecurityAlert.status == 'new'
).count()
# Recent locked accounts
locked_accounts = db.query(User).filter(
User.locked_until > datetime.now()
).all()
# Users with 2FA enabled
users_with_2fa = db.query(User).filter(
User.totp_enabled == True
).count()
total_admins = db.query(User).filter(
User.role == 'ADMIN'
).count()
# Alert type breakdown
alert_breakdown = db.query(
SecurityAlert.alert_type,
func.count(SecurityAlert.id).label('count')
).group_by(SecurityAlert.alert_type).all()
stats = {
'new_alerts': new_alerts_count,
'locked_accounts': len(locked_accounts),
'users_with_2fa': users_with_2fa,
'total_admins': total_admins,
'alert_breakdown': {a.alert_type: a.count for a in alert_breakdown}
}
# GeoIP stats
geoip_enabled = _get_geoip_enabled()
geoip_stats = {'today': 0, 'this_month': 0, 'this_year': 0, 'total': 0, 'by_country': []}
if geoip_enabled:
today = datetime.now().date()
first_of_month = today.replace(day=1)
first_of_year = today.replace(month=1, day=1)
# Count geo_blocked alerts
geoip_stats['today'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) == today
).count()
geoip_stats['this_month'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_month
).count()
geoip_stats['this_year'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_year
).count()
geoip_stats['total'] = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).count()
# Country breakdown (from details JSON)
country_flags = {
'RU': ('🇷🇺', 'Rosja'), 'CN': ('🇨🇳', 'Chiny'), 'KP': ('🇰🇵', 'Korea Płn.'),
'IR': ('🇮🇷', 'Iran'), 'BY': ('🇧🇾', 'Białoruś'), 'SY': ('🇸🇾', 'Syria'),
'VE': ('🇻🇪', 'Wenezuela'), 'CU': ('🇨🇺', 'Kuba')
}
geo_alerts = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).all()
country_counts = {}
for alert in geo_alerts:
if alert.details and 'country' in alert.details:
country = alert.details['country']
if country:
country_counts[country] = country_counts.get(country, 0) + 1
# Sort by count descending
sorted_countries = sorted(country_counts.items(), key=lambda x: x[1], reverse=True)
for code, count in sorted_countries:
flag, name = country_flags.get(code, ('🏴', code))
geoip_stats['by_country'].append({
'code': code, 'flag': flag, 'name': name, 'count': count
})
# Recent geo blocks (last 20) with details
recent_geo_blocks = []
top_paths = []
if geoip_enabled:
recent_blocks = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).order_by(desc(SecurityAlert.created_at)).limit(20).all()
for b in recent_blocks:
country_code = b.details.get('country', '??') if b.details else '??'
flag, name = country_flags.get(country_code, ('🏴', country_code))
recent_geo_blocks.append({
'ip': b.ip_address,
'country_flag': flag,
'country_name': name,
'path': b.details.get('path', '/') if b.details else '/',
'created_at': b.created_at
})
# Top attacked paths
path_counts = {}
for alert in geo_alerts:
if alert.details and 'path' in alert.details:
path = alert.details['path']
path_counts[path] = path_counts.get(path, 0) + 1
top_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:10]
return render_template(
'admin/security_dashboard.html',
audit_logs=audit_logs,
alerts=alerts,
locked_accounts=locked_accounts,
stats=stats,
geoip_enabled=geoip_enabled,
geoip_stats=geoip_stats,
recent_geo_blocks=recent_geo_blocks,
top_paths=top_paths,
generated_at=datetime.now()
)
finally:
db.close()
@bp.route('/security/alert/<int:alert_id>/acknowledge', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def acknowledge_security_alert(alert_id):
"""Acknowledge a security alert"""
db = SessionLocal()
try:
alert = db.query(SecurityAlert).get(alert_id)
if not alert:
return jsonify({'success': False, 'error': 'Alert not found'}), 404
alert.status = 'acknowledged'
alert.acknowledged_by = current_user.id
alert.acknowledged_at = datetime.now()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'alert.acknowledge', 'security_alert', alert_id,
details={'alert_type': alert.alert_type})
db.commit()
return jsonify({'success': True})
finally:
db.close()
@bp.route('/security/alert/<int:alert_id>/resolve', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def resolve_security_alert(alert_id):
"""Resolve a security alert"""
note = request.form.get('note', '')
db = SessionLocal()
try:
alert = db.query(SecurityAlert).get(alert_id)
if not alert:
return jsonify({'success': False, 'error': 'Alert not found'}), 404
alert.status = 'resolved'
alert.resolution_note = note
if not alert.acknowledged_by:
alert.acknowledged_by = current_user.id
alert.acknowledged_at = datetime.now()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'alert.resolve', 'security_alert', alert_id,
details={'alert_type': alert.alert_type, 'note': note})
db.commit()
flash('Alert został rozwiązany.', 'success')
return redirect(url_for('admin.admin_security'))
finally:
db.close()
@bp.route('/security/unlock-account/<int:user_id>', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def unlock_account(user_id):
"""Unlock a locked user account"""
db = SessionLocal()
try:
user = db.query(User).get(user_id)
if not user:
return jsonify({'success': False, 'error': 'User not found'}), 404
user.locked_until = None
user.failed_login_attempts = 0
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, 'user.unlock', 'user', user_id, user.email)
db.commit()
flash(f'Konto {user.email} zostało odblokowane.', 'success')
return redirect(url_for('admin.admin_security'))
finally:
db.close()
@bp.route('/security/geoip-stats')
@login_required
@role_required(SystemRole.ADMIN)
def api_geoip_stats():
"""API endpoint for GeoIP stats auto-refresh"""
from sqlalchemy import func
db = SessionLocal()
try:
now = datetime.now()
geoip_enabled = _get_geoip_enabled()
if not geoip_enabled:
return jsonify({
'enabled': False,
'timestamp': now.isoformat()
})
today = now.date()
first_of_month = today.replace(day=1)
first_of_year = today.replace(month=1, day=1)
stats = {
'enabled': True,
'timestamp': now.isoformat(),
'today': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) == today
).count(),
'this_month': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_month
).count(),
'this_year': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked',
func.date(SecurityAlert.created_at) >= first_of_year
).count(),
'total': db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).count()
}
# Country breakdown
country_flags = {
'RU': ('🇷🇺', 'Rosja'), 'CN': ('🇨🇳', 'Chiny'), 'KP': ('🇰🇵', 'Korea Płn.'),
'IR': ('🇮🇷', 'Iran'), 'BY': ('🇧🇾', 'Białoruś'), 'SY': ('🇸🇾', 'Syria'),
'VE': ('🇻🇪', 'Wenezuela'), 'CU': ('🇨🇺', 'Kuba')
}
geo_alerts = db.query(SecurityAlert).filter(
SecurityAlert.alert_type == 'geo_blocked'
).all()
country_counts = {}
for alert in geo_alerts:
if alert.details and 'country' in alert.details:
country = alert.details['country']
if country:
country_counts[country] = country_counts.get(country, 0) + 1
by_country = []
for code, count in sorted(country_counts.items(), key=lambda x: x[1], reverse=True):
flag, name = country_flags.get(code, ('🏴', code))
by_country.append({'code': code, 'flag': flag, 'name': name, 'count': count})
stats['by_country'] = by_country
return jsonify(stats)
finally:
db.close()