Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
134 lines
4.8 KiB
Python
134 lines
4.8 KiB
Python
"""
|
|
Request Middleware
|
|
==================
|
|
|
|
Before/after request hooks for security, analytics, etc.
|
|
"""
|
|
|
|
import logging
|
|
from flask import request, abort
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def register_middleware(app):
|
|
"""Register all middleware with the app."""
|
|
|
|
@app.before_request
|
|
def check_geoip():
|
|
"""Block requests from high-risk countries (RU, CN, KP, IR, BY, SY, VE, CU)."""
|
|
# Skip static files and health checks
|
|
if request.path.startswith('/static') or request.path == '/health':
|
|
return
|
|
|
|
try:
|
|
from security_service import is_ip_allowed, get_country_code, create_security_alert
|
|
from database import SessionLocal
|
|
|
|
if not is_ip_allowed():
|
|
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
if ip:
|
|
ip = ip.split(',')[0].strip()
|
|
country = get_country_code(ip)
|
|
logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}")
|
|
|
|
# Create alert for blocked access
|
|
try:
|
|
db = SessionLocal()
|
|
create_security_alert(
|
|
db, 'geo_blocked', 'low',
|
|
ip_address=ip,
|
|
details={
|
|
'country': country,
|
|
'path': request.path,
|
|
'user_agent': request.user_agent.string[:200]
|
|
}
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
except Exception as e:
|
|
logger.error(f"Failed to create geo block alert: {e}")
|
|
|
|
abort(403)
|
|
except ImportError:
|
|
# Security service not available, skip GeoIP check
|
|
pass
|
|
|
|
@app.before_request
|
|
def track_page_view():
|
|
"""Track page views (excluding static files and API calls)."""
|
|
# Skip static files
|
|
if request.path.startswith('/static'):
|
|
return
|
|
|
|
# Skip API calls
|
|
if request.path.startswith('/api'):
|
|
return
|
|
|
|
# Skip analytics tracking endpoints
|
|
if request.path in ['/api/analytics/track', '/api/analytics/heartbeat']:
|
|
return
|
|
|
|
# Skip health checks
|
|
if request.path == '/health':
|
|
return
|
|
|
|
# Skip favicon
|
|
if request.path == '/favicon.ico':
|
|
return
|
|
|
|
# Skip bot/AJAX utility paths
|
|
skip_exact = {'/robots.txt', '/sitemap.xml', '/manifest.json',
|
|
'/check-verification-status', '/resend-verification'}
|
|
skip_prefixes = ('/.well-known/',)
|
|
if request.path in skip_exact or any(request.path.startswith(p) for p in skip_prefixes):
|
|
return
|
|
|
|
try:
|
|
# DEBUG: middleware-level mobile cookie check
|
|
_ua = request.headers.get('User-Agent', '').lower()
|
|
if 'mobile' in _ua:
|
|
with open('/tmp/pwa_debug.log', 'a') as _mf:
|
|
_mf.write(f"MIDDLEWARE: path={request.path} pwa_mode={request.cookies.get('pwa_mode')} pwa_display={request.cookies.get('pwa_display')}\n")
|
|
|
|
from utils.analytics import (
|
|
track_page_view_for_request,
|
|
set_current_page_view_id
|
|
)
|
|
page_view_id = track_page_view_for_request()
|
|
if page_view_id:
|
|
set_current_page_view_id(page_view_id)
|
|
except Exception as e:
|
|
logger.error(f"Page view tracking error: {e}")
|
|
|
|
@app.after_request
|
|
def set_security_headers(response):
|
|
"""Add security headers to all responses."""
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
|
|
response.headers['X-XSS-Protection'] = '1; mode=block'
|
|
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
|
|
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
|
|
|
|
# Content Security Policy
|
|
csp = (
|
|
"default-src 'self'; "
|
|
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
|
|
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
|
|
"img-src 'self' data: https:; "
|
|
"font-src 'self' https://cdn.jsdelivr.net https://fonts.gstatic.com; "
|
|
"connect-src 'self'"
|
|
)
|
|
response.headers['Content-Security-Policy'] = csp
|
|
|
|
return response
|
|
|
|
@app.teardown_request
|
|
def cleanup_page_view_id(exception=None):
|
|
"""Clean up page_view_id from global dict after request."""
|
|
try:
|
|
from utils.analytics import cleanup_page_view_id
|
|
cleanup_page_view_id()
|
|
except Exception:
|
|
pass
|