nordabiz/security_service.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

495 lines
14 KiB
Python

"""
Norda Biznes - Security Service
================================
Security utilities for NordaBiz platform:
- Audit logging (admin action tracking)
- Security alerting (email notifications)
- GeoIP blocking (block high-risk countries: RU, CN, KP, IR, BY, SY, VE, CU)
- 2FA (TOTP) helpers
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-01-14
"""
import os
import logging
import secrets
from datetime import datetime
from functools import wraps
from flask import request, abort, flash, redirect, url_for
from flask_login import current_user
logger = logging.getLogger(__name__)
# ============================================================
# AUDIT LOG
# ============================================================
def log_audit(db, action: str, entity_type: str, entity_id: int = None,
entity_name: str = None, details: dict = None, user=None):
"""
Log an administrative action to the audit log.
Args:
db: Database session
action: Action identifier (e.g., 'news.approve', 'company.edit')
entity_type: Type of entity ('news', 'company', 'user', 'event')
entity_id: ID of the affected entity
entity_name: Human-readable name of the entity
details: Additional details (old_value, new_value, reason, etc.)
user: User performing the action (defaults to current_user)
Example:
log_audit(db, 'news.approve', 'news', news_id=123,
entity_name='Artykuł o firmie XYZ',
details={'previous_status': 'pending'})
"""
from database import AuditLog
if user is None:
user = current_user if current_user.is_authenticated else None
audit_entry = AuditLog(
user_id=user.id if user else None,
user_email=user.email if user else 'system',
action=action,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
details=details,
ip_address=get_client_ip(),
user_agent=request.user_agent.string[:500] if request else None,
request_path=request.path if request else None
)
db.add(audit_entry)
# Don't commit here - let the caller manage the transaction
logger.info(f"AUDIT: {audit_entry.user_email} performed {action} on {entity_type}:{entity_id}")
def get_client_ip():
"""Get real client IP, handling X-Forwarded-For from reverse proxy."""
if request:
forwarded_for = request.headers.get('X-Forwarded-For', '')
if forwarded_for:
return forwarded_for.split(',')[0].strip()
return request.remote_addr
return None
# ============================================================
# SECURITY ALERTING
# ============================================================
# Alert thresholds
ALERT_THRESHOLDS = {
'brute_force': 5, # Failed logins from same IP
'honeypot_hit': 3, # Honeypot accesses from same IP
'account_locked': 1, # Always alert on account lockout
}
# Admin email for alerts
SECURITY_ALERT_EMAIL = os.getenv('SECURITY_ALERT_EMAIL', 'admin@nordabiznes.pl')
def create_security_alert(db, alert_type: str, severity: str = 'medium',
ip_address: str = None, user_email: str = None,
details: dict = None, send_email: bool = True):
"""
Create a security alert and optionally send email notification.
Args:
db: Database session
alert_type: Type of alert ('brute_force', 'honeypot_hit', 'account_locked', 'geo_blocked')
severity: Alert severity ('low', 'medium', 'high', 'critical')
ip_address: Source IP address
user_email: Associated user email (if any)
details: Additional context
send_email: Whether to send email notification
Returns:
SecurityAlert object
"""
from database import SecurityAlert
alert = SecurityAlert(
alert_type=alert_type,
severity=severity,
ip_address=ip_address or get_client_ip(),
user_email=user_email,
details=details or {}
)
db.add(alert)
db.flush() # Get the ID
logger.warning(f"SECURITY_ALERT: {alert_type} ({severity}) from {alert.ip_address}")
# Send email notification for high/critical alerts
if send_email and severity in ('high', 'critical'):
_send_alert_email(alert)
return alert
def _send_alert_email(alert):
"""Send email notification for a security alert."""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured, cannot send security alert email")
return False
subject = f"[ALERT] {alert.severity.upper()}: {alert.alert_type} - NordaBiznes"
body = f"""
<h2>Security Alert - NordaBiznes</h2>
<p><strong>Type:</strong> {alert.alert_type}</p>
<p><strong>Severity:</strong> {alert.severity.upper()}</p>
<p><strong>Time:</strong> {alert.created_at.strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>IP Address:</strong> {alert.ip_address or 'Unknown'}</p>
<p><strong>User Email:</strong> {alert.user_email or 'N/A'}</p>
<p><strong>Details:</strong></p>
<pre>{alert.details}</pre>
<hr>
<p>Review alerts at: <a href="https://nordabiznes.pl/admin/security">Security Dashboard</a></p>
"""
success = send_email(
to_email=SECURITY_ALERT_EMAIL,
subject=subject,
html_content=body
)
if success:
alert.email_sent = True
alert.email_sent_at = datetime.utcnow()
logger.info(f"Security alert email sent for alert {alert.id}")
return success
except Exception as e:
logger.error(f"Failed to send security alert email: {e}")
return False
# ============================================================
# GEOIP BLOCKING
# ============================================================
# GeoIP configuration (lazy loaded to respect load_dotenv timing)
def _get_geoip_enabled():
return os.getenv('GEOIP_ENABLED', 'false').lower() == 'true'
def _get_geoip_db_path():
return os.getenv('GEOIP_DB_PATH', '/var/www/nordabiznes/geoip/GeoLite2-Country.mmdb')
def _get_geoip_whitelist():
return set(os.getenv('GEOIP_WHITELIST', '').split(',')) - {''}
# Block high-risk countries (Russia, China, North Korea, Iran, etc.)
BLOCKED_COUNTRIES = {'RU', 'CN', 'KP', 'IR', 'BY', 'SY', 'VE', 'CU'}
# GeoIP reader (lazy loaded)
_geoip_reader = None
def get_geoip_reader():
"""Get or initialize GeoIP reader."""
global _geoip_reader
if _geoip_reader is not None:
return _geoip_reader
if not _get_geoip_enabled():
return None
try:
import geoip2.database
db_path = _get_geoip_db_path()
if os.path.exists(db_path):
_geoip_reader = geoip2.database.Reader(db_path)
logger.info(f"GeoIP database loaded from {db_path}")
else:
logger.warning(f"GeoIP database not found at {db_path}")
except ImportError:
logger.warning("geoip2 package not installed, GeoIP blocking disabled")
except Exception as e:
logger.error(f"Failed to load GeoIP database: {e}")
return _geoip_reader
def get_country_code(ip_address: str) -> str:
"""
Get country code for an IP address.
Returns:
ISO 3166-1 alpha-2 country code (e.g., 'PL', 'DE') or None
"""
reader = get_geoip_reader()
if not reader:
return None
try:
response = reader.country(ip_address)
return response.country.iso_code
except Exception:
return None
def get_geoip_info(ip_address: str) -> dict:
"""
Get full GeoIP information for an IP address.
Returns:
dict with keys: country, country_name, city, region
(city and region require GeoLite2-City database)
"""
reader = get_geoip_reader()
if not reader:
return None
# Skip local/private IPs
if ip_address and ip_address.startswith(('10.', '192.168.', '172.', '127.', '::1')):
return {'country': 'LOCAL', 'country_name': 'Local Network', 'city': None, 'region': None}
try:
response = reader.country(ip_address)
return {
'country': response.country.iso_code,
'country_name': response.country.name,
'city': None, # Wymaga bazy GeoLite2-City
'region': None # Wymaga bazy GeoLite2-City
}
except Exception:
return None
def is_ip_allowed(ip_address: str = None) -> bool:
"""
Check if an IP address is allowed (not from blocked high-risk countries).
Args:
ip_address: IP to check (defaults to current request IP)
Returns:
True if allowed, False if blocked
"""
if not _get_geoip_enabled():
return True
if ip_address is None:
ip_address = get_client_ip()
if not ip_address:
return True
# Check whitelist first
if ip_address in _get_geoip_whitelist():
return True
# Local/private IPs are always allowed
if ip_address.startswith(('10.', '192.168.', '172.', '127.')):
return True
country = get_country_code(ip_address)
# If we can't determine country, allow (fail open)
if country is None:
return True
# Block high-risk countries
return country not in BLOCKED_COUNTRIES
def geoip_check():
"""
Decorator to block non-Polish IPs.
Usage:
@app.route('/sensitive-endpoint')
@geoip_check()
def sensitive_endpoint():
...
"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if not is_ip_allowed():
ip = get_client_ip()
country = get_country_code(ip)
logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}")
# Create alert for blocked access
try:
from database import SessionLocal
db = SessionLocal()
create_security_alert(
db, 'geo_blocked', 'low',
ip_address=ip,
details={'country': country, 'path': request.path}
)
db.commit()
db.close()
except Exception as e:
logger.error(f"Failed to create geo block alert: {e}")
abort(403)
return f(*args, **kwargs)
return wrapped
return decorator
# ============================================================
# TWO-FACTOR AUTHENTICATION (TOTP)
# ============================================================
TOTP_ISSUER = 'NordaBiznes'
def generate_totp_secret() -> str:
"""Generate a new TOTP secret key."""
try:
import pyotp
return pyotp.random_base32()
except ImportError:
logger.error("pyotp package not installed")
return None
def get_totp_uri(user) -> str:
"""
Get TOTP provisioning URI for QR code generation.
Args:
user: User object with totp_secret set
Returns:
otpauth:// URI for authenticator apps
"""
if not user.totp_secret:
return None
try:
import pyotp
totp = pyotp.TOTP(user.totp_secret)
return totp.provisioning_uri(
name=user.email,
issuer_name=TOTP_ISSUER
)
except ImportError:
logger.error("pyotp package not installed")
return None
def verify_totp(user, code: str) -> bool:
"""
Verify a TOTP code for a user.
Args:
user: User object with totp_secret
code: 6-digit TOTP code to verify
Returns:
True if valid, False otherwise
"""
if not user.totp_secret:
return False
try:
import pyotp
totp = pyotp.TOTP(user.totp_secret)
return totp.verify(code, valid_window=1) # Allow 1 step drift
except ImportError:
logger.error("pyotp package not installed")
return False
def generate_backup_codes(count: int = 8) -> list:
"""
Generate backup codes for 2FA recovery.
Returns:
List of 8-character backup codes
"""
codes = []
for _ in range(count):
# Generate 8-character alphanumeric code
code = secrets.token_hex(4).upper()
codes.append(code)
return codes
def verify_backup_code(user, code: str, db) -> bool:
"""
Verify and consume a backup code.
Args:
user: User object with totp_backup_codes
code: Backup code to verify
db: Database session to update user
Returns:
True if valid (code is consumed), False otherwise
"""
if not user.totp_backup_codes:
return False
code = code.upper().strip()
if code in user.totp_backup_codes:
# Remove the used code
remaining_codes = [c for c in user.totp_backup_codes if c != code]
user.totp_backup_codes = remaining_codes
db.commit()
logger.info(f"Backup code used for user {user.email}, {len(remaining_codes)} codes remaining")
return True
return False
def requires_2fa(f):
"""
Decorator for routes that require 2FA verification.
If user has 2FA enabled but hasn't verified this session,
redirect to 2FA verification page.
Usage:
@app.route('/admin/sensitive')
@login_required
@requires_2fa
def admin_sensitive():
...
"""
@wraps(f)
def wrapped(*args, **kwargs):
from flask import session
if not current_user.is_authenticated:
return redirect(url_for('login'))
# If user has 2FA enabled and hasn't verified this session
if current_user.totp_enabled and not session.get('2fa_verified'):
flash('Wymagana weryfikacja 2FA.', 'warning')
session['2fa_next'] = request.url
return redirect(url_for('verify_2fa'))
return f(*args, **kwargs)
return wrapped
# ============================================================
# INITIALIZATION
# ============================================================
def init_security_service():
"""Initialize security service (load GeoIP database, etc.)."""
if _get_geoip_enabled():
get_geoip_reader()
logger.info(f"Security service initialized with GeoIP enabled, blocking: {BLOCKED_COUNTRIES}")
else:
logger.info("Security service initialized (GeoIP disabled)")