Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
495 lines
14 KiB
Python
495 lines
14 KiB
Python
"""
|
|
Norda Biznes - Security Service
|
|
================================
|
|
|
|
Security utilities for NordaBiz platform:
|
|
- Audit logging (admin action tracking)
|
|
- Security alerting (email notifications)
|
|
- GeoIP blocking (block high-risk countries: RU, CN, KP, IR, BY, SY, VE, CU)
|
|
- 2FA (TOTP) helpers
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-01-14
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import secrets
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from flask import request, abort, flash, redirect, url_for
|
|
from flask_login import current_user
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ============================================================
|
|
# AUDIT LOG
|
|
# ============================================================
|
|
|
|
def log_audit(db, action: str, entity_type: str, entity_id: int = None,
|
|
entity_name: str = None, details: dict = None, user=None):
|
|
"""
|
|
Log an administrative action to the audit log.
|
|
|
|
Args:
|
|
db: Database session
|
|
action: Action identifier (e.g., 'news.approve', 'company.edit')
|
|
entity_type: Type of entity ('news', 'company', 'user', 'event')
|
|
entity_id: ID of the affected entity
|
|
entity_name: Human-readable name of the entity
|
|
details: Additional details (old_value, new_value, reason, etc.)
|
|
user: User performing the action (defaults to current_user)
|
|
|
|
Example:
|
|
log_audit(db, 'news.approve', 'news', news_id=123,
|
|
entity_name='Artykuł o firmie XYZ',
|
|
details={'previous_status': 'pending'})
|
|
"""
|
|
from database import AuditLog
|
|
|
|
if user is None:
|
|
user = current_user if current_user.is_authenticated else None
|
|
|
|
audit_entry = AuditLog(
|
|
user_id=user.id if user else None,
|
|
user_email=user.email if user else 'system',
|
|
action=action,
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
details=details,
|
|
ip_address=get_client_ip(),
|
|
user_agent=request.user_agent.string[:500] if request else None,
|
|
request_path=request.path if request else None
|
|
)
|
|
|
|
db.add(audit_entry)
|
|
# Don't commit here - let the caller manage the transaction
|
|
logger.info(f"AUDIT: {audit_entry.user_email} performed {action} on {entity_type}:{entity_id}")
|
|
|
|
|
|
def get_client_ip():
|
|
"""Get real client IP, handling X-Forwarded-For from reverse proxy."""
|
|
if request:
|
|
forwarded_for = request.headers.get('X-Forwarded-For', '')
|
|
if forwarded_for:
|
|
return forwarded_for.split(',')[0].strip()
|
|
return request.remote_addr
|
|
return None
|
|
|
|
|
|
# ============================================================
|
|
# SECURITY ALERTING
|
|
# ============================================================
|
|
|
|
# Alert thresholds
|
|
ALERT_THRESHOLDS = {
|
|
'brute_force': 5, # Failed logins from same IP
|
|
'honeypot_hit': 3, # Honeypot accesses from same IP
|
|
'account_locked': 1, # Always alert on account lockout
|
|
}
|
|
|
|
# Admin email for alerts
|
|
SECURITY_ALERT_EMAIL = os.getenv('SECURITY_ALERT_EMAIL', 'admin@nordabiznes.pl')
|
|
|
|
|
|
def create_security_alert(db, alert_type: str, severity: str = 'medium',
|
|
ip_address: str = None, user_email: str = None,
|
|
details: dict = None, send_email: bool = True):
|
|
"""
|
|
Create a security alert and optionally send email notification.
|
|
|
|
Args:
|
|
db: Database session
|
|
alert_type: Type of alert ('brute_force', 'honeypot_hit', 'account_locked', 'geo_blocked')
|
|
severity: Alert severity ('low', 'medium', 'high', 'critical')
|
|
ip_address: Source IP address
|
|
user_email: Associated user email (if any)
|
|
details: Additional context
|
|
send_email: Whether to send email notification
|
|
|
|
Returns:
|
|
SecurityAlert object
|
|
"""
|
|
from database import SecurityAlert
|
|
|
|
alert = SecurityAlert(
|
|
alert_type=alert_type,
|
|
severity=severity,
|
|
ip_address=ip_address or get_client_ip(),
|
|
user_email=user_email,
|
|
details=details or {}
|
|
)
|
|
|
|
db.add(alert)
|
|
db.flush() # Get the ID
|
|
|
|
logger.warning(f"SECURITY_ALERT: {alert_type} ({severity}) from {alert.ip_address}")
|
|
|
|
# Send email notification for high/critical alerts
|
|
if send_email and severity in ('high', 'critical'):
|
|
_send_alert_email(alert)
|
|
|
|
return alert
|
|
|
|
|
|
def _send_alert_email(alert):
|
|
"""Send email notification for a security alert."""
|
|
try:
|
|
from email_service import send_email, is_configured
|
|
|
|
if not is_configured():
|
|
logger.warning("Email service not configured, cannot send security alert email")
|
|
return False
|
|
|
|
subject = f"[ALERT] {alert.severity.upper()}: {alert.alert_type} - NordaBiznes"
|
|
|
|
body = f"""
|
|
<h2>Security Alert - NordaBiznes</h2>
|
|
<p><strong>Type:</strong> {alert.alert_type}</p>
|
|
<p><strong>Severity:</strong> {alert.severity.upper()}</p>
|
|
<p><strong>Time:</strong> {alert.created_at.strftime('%Y-%m-%d %H:%M:%S')}</p>
|
|
<p><strong>IP Address:</strong> {alert.ip_address or 'Unknown'}</p>
|
|
<p><strong>User Email:</strong> {alert.user_email or 'N/A'}</p>
|
|
<p><strong>Details:</strong></p>
|
|
<pre>{alert.details}</pre>
|
|
<hr>
|
|
<p>Review alerts at: <a href="https://nordabiznes.pl/admin/security">Security Dashboard</a></p>
|
|
"""
|
|
|
|
success = send_email(
|
|
to_email=SECURITY_ALERT_EMAIL,
|
|
subject=subject,
|
|
html_content=body
|
|
)
|
|
|
|
if success:
|
|
alert.email_sent = True
|
|
alert.email_sent_at = datetime.utcnow()
|
|
logger.info(f"Security alert email sent for alert {alert.id}")
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send security alert email: {e}")
|
|
return False
|
|
|
|
|
|
# ============================================================
|
|
# GEOIP BLOCKING
|
|
# ============================================================
|
|
|
|
# GeoIP configuration (lazy loaded to respect load_dotenv timing)
|
|
def _get_geoip_enabled():
|
|
return os.getenv('GEOIP_ENABLED', 'false').lower() == 'true'
|
|
|
|
def _get_geoip_db_path():
|
|
return os.getenv('GEOIP_DB_PATH', '/var/www/nordabiznes/geoip/GeoLite2-Country.mmdb')
|
|
|
|
def _get_geoip_whitelist():
|
|
return set(os.getenv('GEOIP_WHITELIST', '').split(',')) - {''}
|
|
|
|
# Block high-risk countries (Russia, China, North Korea, Iran, etc.)
|
|
BLOCKED_COUNTRIES = {'RU', 'CN', 'KP', 'IR', 'BY', 'SY', 'VE', 'CU'}
|
|
|
|
# GeoIP reader (lazy loaded)
|
|
_geoip_reader = None
|
|
|
|
|
|
def get_geoip_reader():
|
|
"""Get or initialize GeoIP reader."""
|
|
global _geoip_reader
|
|
|
|
if _geoip_reader is not None:
|
|
return _geoip_reader
|
|
|
|
if not _get_geoip_enabled():
|
|
return None
|
|
|
|
try:
|
|
import geoip2.database
|
|
db_path = _get_geoip_db_path()
|
|
if os.path.exists(db_path):
|
|
_geoip_reader = geoip2.database.Reader(db_path)
|
|
logger.info(f"GeoIP database loaded from {db_path}")
|
|
else:
|
|
logger.warning(f"GeoIP database not found at {db_path}")
|
|
except ImportError:
|
|
logger.warning("geoip2 package not installed, GeoIP blocking disabled")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load GeoIP database: {e}")
|
|
|
|
return _geoip_reader
|
|
|
|
|
|
def get_country_code(ip_address: str) -> str:
|
|
"""
|
|
Get country code for an IP address.
|
|
|
|
Returns:
|
|
ISO 3166-1 alpha-2 country code (e.g., 'PL', 'DE') or None
|
|
"""
|
|
reader = get_geoip_reader()
|
|
if not reader:
|
|
return None
|
|
|
|
try:
|
|
response = reader.country(ip_address)
|
|
return response.country.iso_code
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_geoip_info(ip_address: str) -> dict:
|
|
"""
|
|
Get full GeoIP information for an IP address.
|
|
|
|
Returns:
|
|
dict with keys: country, country_name, city, region
|
|
(city and region require GeoLite2-City database)
|
|
"""
|
|
reader = get_geoip_reader()
|
|
if not reader:
|
|
return None
|
|
|
|
# Skip local/private IPs
|
|
if ip_address and ip_address.startswith(('10.', '192.168.', '172.', '127.', '::1')):
|
|
return {'country': 'LOCAL', 'country_name': 'Local Network', 'city': None, 'region': None}
|
|
|
|
try:
|
|
response = reader.country(ip_address)
|
|
return {
|
|
'country': response.country.iso_code,
|
|
'country_name': response.country.name,
|
|
'city': None, # Wymaga bazy GeoLite2-City
|
|
'region': None # Wymaga bazy GeoLite2-City
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def is_ip_allowed(ip_address: str = None) -> bool:
|
|
"""
|
|
Check if an IP address is allowed (not from blocked high-risk countries).
|
|
|
|
Args:
|
|
ip_address: IP to check (defaults to current request IP)
|
|
|
|
Returns:
|
|
True if allowed, False if blocked
|
|
"""
|
|
if not _get_geoip_enabled():
|
|
return True
|
|
|
|
if ip_address is None:
|
|
ip_address = get_client_ip()
|
|
|
|
if not ip_address:
|
|
return True
|
|
|
|
# Check whitelist first
|
|
if ip_address in _get_geoip_whitelist():
|
|
return True
|
|
|
|
# Local/private IPs are always allowed
|
|
if ip_address.startswith(('10.', '192.168.', '172.', '127.')):
|
|
return True
|
|
|
|
country = get_country_code(ip_address)
|
|
|
|
# If we can't determine country, allow (fail open)
|
|
if country is None:
|
|
return True
|
|
|
|
# Block high-risk countries
|
|
return country not in BLOCKED_COUNTRIES
|
|
|
|
|
|
def geoip_check():
|
|
"""
|
|
Decorator to block non-Polish IPs.
|
|
|
|
Usage:
|
|
@app.route('/sensitive-endpoint')
|
|
@geoip_check()
|
|
def sensitive_endpoint():
|
|
...
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def wrapped(*args, **kwargs):
|
|
if not is_ip_allowed():
|
|
ip = get_client_ip()
|
|
country = get_country_code(ip)
|
|
logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}")
|
|
|
|
# Create alert for blocked access
|
|
try:
|
|
from database import SessionLocal
|
|
db = SessionLocal()
|
|
create_security_alert(
|
|
db, 'geo_blocked', 'low',
|
|
ip_address=ip,
|
|
details={'country': country, 'path': request.path}
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
except Exception as e:
|
|
logger.error(f"Failed to create geo block alert: {e}")
|
|
|
|
abort(403)
|
|
|
|
return f(*args, **kwargs)
|
|
return wrapped
|
|
return decorator
|
|
|
|
|
|
# ============================================================
|
|
# TWO-FACTOR AUTHENTICATION (TOTP)
|
|
# ============================================================
|
|
|
|
TOTP_ISSUER = 'NordaBiznes'
|
|
|
|
|
|
def generate_totp_secret() -> str:
|
|
"""Generate a new TOTP secret key."""
|
|
try:
|
|
import pyotp
|
|
return pyotp.random_base32()
|
|
except ImportError:
|
|
logger.error("pyotp package not installed")
|
|
return None
|
|
|
|
|
|
def get_totp_uri(user) -> str:
|
|
"""
|
|
Get TOTP provisioning URI for QR code generation.
|
|
|
|
Args:
|
|
user: User object with totp_secret set
|
|
|
|
Returns:
|
|
otpauth:// URI for authenticator apps
|
|
"""
|
|
if not user.totp_secret:
|
|
return None
|
|
|
|
try:
|
|
import pyotp
|
|
totp = pyotp.TOTP(user.totp_secret)
|
|
return totp.provisioning_uri(
|
|
name=user.email,
|
|
issuer_name=TOTP_ISSUER
|
|
)
|
|
except ImportError:
|
|
logger.error("pyotp package not installed")
|
|
return None
|
|
|
|
|
|
def verify_totp(user, code: str) -> bool:
|
|
"""
|
|
Verify a TOTP code for a user.
|
|
|
|
Args:
|
|
user: User object with totp_secret
|
|
code: 6-digit TOTP code to verify
|
|
|
|
Returns:
|
|
True if valid, False otherwise
|
|
"""
|
|
if not user.totp_secret:
|
|
return False
|
|
|
|
try:
|
|
import pyotp
|
|
totp = pyotp.TOTP(user.totp_secret)
|
|
return totp.verify(code, valid_window=1) # Allow 1 step drift
|
|
except ImportError:
|
|
logger.error("pyotp package not installed")
|
|
return False
|
|
|
|
|
|
def generate_backup_codes(count: int = 8) -> list:
|
|
"""
|
|
Generate backup codes for 2FA recovery.
|
|
|
|
Returns:
|
|
List of 8-character backup codes
|
|
"""
|
|
codes = []
|
|
for _ in range(count):
|
|
# Generate 8-character alphanumeric code
|
|
code = secrets.token_hex(4).upper()
|
|
codes.append(code)
|
|
return codes
|
|
|
|
|
|
def verify_backup_code(user, code: str, db) -> bool:
|
|
"""
|
|
Verify and consume a backup code.
|
|
|
|
Args:
|
|
user: User object with totp_backup_codes
|
|
code: Backup code to verify
|
|
db: Database session to update user
|
|
|
|
Returns:
|
|
True if valid (code is consumed), False otherwise
|
|
"""
|
|
if not user.totp_backup_codes:
|
|
return False
|
|
|
|
code = code.upper().strip()
|
|
if code in user.totp_backup_codes:
|
|
# Remove the used code
|
|
remaining_codes = [c for c in user.totp_backup_codes if c != code]
|
|
user.totp_backup_codes = remaining_codes
|
|
db.commit()
|
|
logger.info(f"Backup code used for user {user.email}, {len(remaining_codes)} codes remaining")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def requires_2fa(f):
|
|
"""
|
|
Decorator for routes that require 2FA verification.
|
|
|
|
If user has 2FA enabled but hasn't verified this session,
|
|
redirect to 2FA verification page.
|
|
|
|
Usage:
|
|
@app.route('/admin/sensitive')
|
|
@login_required
|
|
@requires_2fa
|
|
def admin_sensitive():
|
|
...
|
|
"""
|
|
@wraps(f)
|
|
def wrapped(*args, **kwargs):
|
|
from flask import session
|
|
|
|
if not current_user.is_authenticated:
|
|
return redirect(url_for('login'))
|
|
|
|
# If user has 2FA enabled and hasn't verified this session
|
|
if current_user.totp_enabled and not session.get('2fa_verified'):
|
|
flash('Wymagana weryfikacja 2FA.', 'warning')
|
|
session['2fa_next'] = request.url
|
|
return redirect(url_for('verify_2fa'))
|
|
|
|
return f(*args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
# ============================================================
|
|
# INITIALIZATION
|
|
# ============================================================
|
|
|
|
def init_security_service():
|
|
"""Initialize security service (load GeoIP database, etc.)."""
|
|
if _get_geoip_enabled():
|
|
get_geoip_reader()
|
|
logger.info(f"Security service initialized with GeoIP enabled, blocking: {BLOCKED_COUNTRIES}")
|
|
else:
|
|
logger.info("Security service initialized (GeoIP disabled)")
|