nordabiz/utils/middleware.py
Maciej Pienczyn cca52301a6
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: filter bots from analytics, use audit_logs for failed logins, logarithmic engagement score
- Add is_bot column to user_sessions with backfill from user_agent patterns
- Update analytics_daily trigger to skip bot sessions
- Recalculate 90 days of analytics_daily without bot contamination
- Replace cumulative failed_login_attempts with time-based audit_logs queries
- Switch engagement score from linear (capped at 100) to log2 scale
- Expand section_map from 9 to 17 categories (~95% traffic coverage)
- Exclude robots.txt, sitemap.xml etc from page view tracking
- Add bot filter to all overview, pages, paths, and engagement queries

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 08:14:50 +01:00

128 lines
4.4 KiB
Python

"""
Request Middleware
==================
Before/after request hooks for security, analytics, etc.
"""
import logging
from flask import request, abort
logger = logging.getLogger(__name__)
def register_middleware(app):
"""Register all middleware with the app."""
@app.before_request
def check_geoip():
"""Block requests from high-risk countries (RU, CN, KP, IR, BY, SY, VE, CU)."""
# Skip static files and health checks
if request.path.startswith('/static') or request.path == '/health':
return
try:
from security_service import is_ip_allowed, get_country_code, create_security_alert
from database import SessionLocal
if not is_ip_allowed():
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if ip:
ip = ip.split(',')[0].strip()
country = get_country_code(ip)
logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}")
# Create alert for blocked access
try:
db = SessionLocal()
create_security_alert(
db, 'geo_blocked', 'low',
ip_address=ip,
details={
'country': country,
'path': request.path,
'user_agent': request.user_agent.string[:200]
}
)
db.commit()
db.close()
except Exception as e:
logger.error(f"Failed to create geo block alert: {e}")
abort(403)
except ImportError:
# Security service not available, skip GeoIP check
pass
@app.before_request
def track_page_view():
"""Track page views (excluding static files and API calls)."""
# Skip static files
if request.path.startswith('/static'):
return
# Skip API calls
if request.path.startswith('/api'):
return
# Skip analytics tracking endpoints
if request.path in ['/api/analytics/track', '/api/analytics/heartbeat']:
return
# Skip health checks
if request.path == '/health':
return
# Skip favicon
if request.path == '/favicon.ico':
return
# Skip bot/AJAX utility paths
skip_exact = {'/robots.txt', '/sitemap.xml', '/manifest.json',
'/check-verification-status', '/resend-verification'}
skip_prefixes = ('/.well-known/',)
if request.path in skip_exact or any(request.path.startswith(p) for p in skip_prefixes):
return
try:
from utils.analytics import (
track_page_view_for_request,
set_current_page_view_id
)
page_view_id = track_page_view_for_request()
if page_view_id:
set_current_page_view_id(page_view_id)
except Exception as e:
logger.error(f"Page view tracking error: {e}")
@app.after_request
def set_security_headers(response):
"""Add security headers to all responses."""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Content Security Policy
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
"img-src 'self' data: https:; "
"font-src 'self' https://cdn.jsdelivr.net https://fonts.gstatic.com; "
"connect-src 'self'"
)
response.headers['Content-Security-Policy'] = csp
return response
@app.teardown_request
def cleanup_page_view_id(exception=None):
"""Clean up page_view_id from global dict after request."""
try:
from utils.analytics import cleanup_page_view_id
cleanup_page_view_id()
except Exception:
pass