""" Request Middleware ================== Before/after request hooks for security, analytics, etc. """ import logging from flask import request, abort logger = logging.getLogger(__name__) def register_middleware(app): """Register all middleware with the app.""" @app.before_request def check_geoip(): """Block requests from high-risk countries (RU, CN, KP, IR, BY, SY, VE, CU).""" # Skip static files and health checks if request.path.startswith('/static') or request.path == '/health': return try: from security_service import is_ip_allowed, get_country_code, create_security_alert from database import SessionLocal if not is_ip_allowed(): ip = request.headers.get('X-Forwarded-For', request.remote_addr) if ip: ip = ip.split(',')[0].strip() country = get_country_code(ip) logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}") # Create alert for blocked access try: db = SessionLocal() create_security_alert( db, 'geo_blocked', 'low', ip_address=ip, details={ 'country': country, 'path': request.path, 'user_agent': request.user_agent.string[:200] } ) db.commit() db.close() except Exception as e: logger.error(f"Failed to create geo block alert: {e}") abort(403) except ImportError: # Security service not available, skip GeoIP check pass @app.before_request def track_page_view(): """Track page views (excluding static files and API calls).""" # Skip static files if request.path.startswith('/static'): return # Skip API calls if request.path.startswith('/api'): return # Skip analytics tracking endpoints if request.path in ['/api/analytics/track', '/api/analytics/heartbeat']: return # Skip health checks if request.path == '/health': return # Skip favicon if request.path == '/favicon.ico': return # Skip bot/AJAX utility paths skip_exact = {'/robots.txt', '/sitemap.xml', '/manifest.json', '/check-verification-status', '/resend-verification'} skip_prefixes = ('/.well-known/',) if request.path in skip_exact or any(request.path.startswith(p) for p in skip_prefixes): return try: from utils.analytics import ( track_page_view_for_request, set_current_page_view_id ) page_view_id = track_page_view_for_request() if page_view_id: set_current_page_view_id(page_view_id) except Exception as e: logger.error(f"Page view tracking error: {e}") @app.after_request def set_security_headers(response): """Add security headers to all responses.""" response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'SAMEORIGIN' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' # Content Security Policy csp = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; " "img-src 'self' data: https:; " "font-src 'self' https://cdn.jsdelivr.net https://fonts.gstatic.com; " "connect-src 'self'" ) response.headers['Content-Security-Policy'] = csp return response @app.teardown_request def cleanup_page_view_id(exception=None): """Clean up page_view_id from global dict after request.""" try: from utils.analytics import cleanup_page_view_id cleanup_page_view_id() except Exception: pass