fix: Use lazy loading for GeoIP config to respect load_dotenv timing
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
cb574851cf
commit
341ce29aa9
@ -178,12 +178,18 @@ def _send_alert_email(alert):
|
|||||||
# GEOIP BLOCKING
|
# GEOIP BLOCKING
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
# GeoIP configuration
|
# GeoIP configuration (lazy loaded to respect load_dotenv timing)
|
||||||
GEOIP_ENABLED = os.getenv('GEOIP_ENABLED', 'false').lower() == 'true'
|
def _get_geoip_enabled():
|
||||||
GEOIP_DB_PATH = os.getenv('GEOIP_DB_PATH', '/var/www/nordabiznes/geoip/GeoLite2-Country.mmdb')
|
return os.getenv('GEOIP_ENABLED', 'false').lower() == 'true'
|
||||||
|
|
||||||
|
def _get_geoip_db_path():
|
||||||
|
return os.getenv('GEOIP_DB_PATH', '/var/www/nordabiznes/geoip/GeoLite2-Country.mmdb')
|
||||||
|
|
||||||
|
def _get_geoip_whitelist():
|
||||||
|
return set(os.getenv('GEOIP_WHITELIST', '').split(',')) - {''}
|
||||||
|
|
||||||
# Block high-risk countries (Russia, China, North Korea, Iran, etc.)
|
# Block high-risk countries (Russia, China, North Korea, Iran, etc.)
|
||||||
BLOCKED_COUNTRIES = {'RU', 'CN', 'KP', 'IR', 'BY', 'SY', 'VE', 'CU'}
|
BLOCKED_COUNTRIES = {'RU', 'CN', 'KP', 'IR', 'BY', 'SY', 'VE', 'CU'}
|
||||||
GEOIP_WHITELIST = set(os.getenv('GEOIP_WHITELIST', '').split(',')) - {''} # Whitelisted IPs
|
|
||||||
|
|
||||||
# GeoIP reader (lazy loaded)
|
# GeoIP reader (lazy loaded)
|
||||||
_geoip_reader = None
|
_geoip_reader = None
|
||||||
@ -196,16 +202,17 @@ def get_geoip_reader():
|
|||||||
if _geoip_reader is not None:
|
if _geoip_reader is not None:
|
||||||
return _geoip_reader
|
return _geoip_reader
|
||||||
|
|
||||||
if not GEOIP_ENABLED:
|
if not _get_geoip_enabled():
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import geoip2.database
|
import geoip2.database
|
||||||
if os.path.exists(GEOIP_DB_PATH):
|
db_path = _get_geoip_db_path()
|
||||||
_geoip_reader = geoip2.database.Reader(GEOIP_DB_PATH)
|
if os.path.exists(db_path):
|
||||||
logger.info(f"GeoIP database loaded from {GEOIP_DB_PATH}")
|
_geoip_reader = geoip2.database.Reader(db_path)
|
||||||
|
logger.info(f"GeoIP database loaded from {db_path}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"GeoIP database not found at {GEOIP_DB_PATH}")
|
logger.warning(f"GeoIP database not found at {db_path}")
|
||||||
except ImportError:
|
except ImportError:
|
||||||
logger.warning("geoip2 package not installed, GeoIP blocking disabled")
|
logger.warning("geoip2 package not installed, GeoIP blocking disabled")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -242,7 +249,7 @@ def is_ip_allowed(ip_address: str = None) -> bool:
|
|||||||
Returns:
|
Returns:
|
||||||
True if allowed, False if blocked
|
True if allowed, False if blocked
|
||||||
"""
|
"""
|
||||||
if not GEOIP_ENABLED:
|
if not _get_geoip_enabled():
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if ip_address is None:
|
if ip_address is None:
|
||||||
@ -252,7 +259,7 @@ def is_ip_allowed(ip_address: str = None) -> bool:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
# Check whitelist first
|
# Check whitelist first
|
||||||
if ip_address in GEOIP_WHITELIST:
|
if ip_address in _get_geoip_whitelist():
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Local/private IPs are always allowed
|
# Local/private IPs are always allowed
|
||||||
@ -452,6 +459,8 @@ def requires_2fa(f):
|
|||||||
|
|
||||||
def init_security_service():
|
def init_security_service():
|
||||||
"""Initialize security service (load GeoIP database, etc.)."""
|
"""Initialize security service (load GeoIP database, etc.)."""
|
||||||
if GEOIP_ENABLED:
|
if _get_geoip_enabled():
|
||||||
get_geoip_reader()
|
get_geoip_reader()
|
||||||
logger.info("Security service initialized")
|
logger.info(f"Security service initialized with GeoIP enabled, blocking: {BLOCKED_COUNTRIES}")
|
||||||
|
else:
|
||||||
|
logger.info("Security service initialized (GeoIP disabled)")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user