nordabiz/utils/analytics.py
Maciej Pienczyn 164b9c925e
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
debug: log mobile cookies to /tmp/pwa_debug.log
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 10:16:16 +01:00

317 lines
10 KiB
Python

"""
Analytics Helpers
=================
Functions for tracking page views, API usage, and user analytics.
"""
import logging
import uuid
from datetime import date
from flask import request, session
from flask_login import current_user
from sqlalchemy import func, extract
from user_agents import parse as parse_user_agent
from database import (
SessionLocal, UserSession, PageView, AIAPICostLog, Company
)
logger = logging.getLogger(__name__)
# Global variable to store current page_view_id for templates
_current_page_view_id = {}
def get_or_create_analytics_session():
"""
Get existing analytics session or create new one.
Returns:
The database session ID (integer) or None on error.
"""
analytics_session_id = session.get('analytics_session_id')
if not analytics_session_id:
analytics_session_id = str(uuid.uuid4())
session['analytics_session_id'] = analytics_session_id
db = SessionLocal()
try:
user_session = db.query(UserSession).filter_by(
session_id=analytics_session_id
).first()
if not user_session:
# Parse user agent
ua_string = request.headers.get('User-Agent', '')
try:
ua = parse_user_agent(ua_string)
device_type = 'mobile' if ua.is_mobile else (
'tablet' if ua.is_tablet else 'desktop'
)
browser = ua.browser.family
browser_version = ua.browser.version_string
os_name = ua.os.family
os_version = ua.os.version_string
ua_lower = ua_string.lower()
is_bot = ua.is_bot or any(p in ua_lower for p in
['curl/', 'python-requests', 'axios/', 'wget/', 'scrapy',
'werkzeug', 'leakix', 'nuclei', 'masscan', 'zgrab', 'httpx',
'googleassociationservice', 'censysinspect', 'paloaltonetworks',
'cortex', 'netcraft', 'fasthttp', 'cms-checker',
'wp-safe-scanner', 'notebooklm', 'ruby/', 'skypeuri',
'com.apple.webkit', 'networkingextension'])
# Flag empty or bare Mozilla/5.0 user agents as bots
if not ua_string.strip() or ua_string.strip() == 'Mozilla/5.0':
is_bot = True
except Exception:
device_type = 'desktop'
browser = 'Unknown'
browser_version = ''
os_name = 'Unknown'
os_version = ''
is_bot = False
user_session = UserSession(
session_id=analytics_session_id,
user_id=current_user.id if current_user.is_authenticated else None,
ip_address=request.remote_addr,
user_agent=ua_string[:2000] if ua_string else None,
device_type=device_type,
browser=browser[:50] if browser else None,
browser_version=browser_version[:20] if browser_version else None,
os=os_name[:50] if os_name else None,
os_version=os_version[:20] if os_version else None,
is_bot=is_bot
)
# PWA detection from cookie
is_pwa = request.cookies.get('pwa_mode') == '1'
user_session.is_pwa = is_pwa
db.add(user_session)
db.commit()
db.refresh(user_session)
else:
# Update last activity
from datetime import datetime
user_session.last_activity_at = datetime.now()
if current_user.is_authenticated and not user_session.user_id:
user_session.user_id = current_user.id
# PWA detection from cookie (set by JS in standalone mode)
pwa_cookie = request.cookies.get('pwa_mode') == '1'
pwa_display = request.cookies.get('pwa_display', '')
# DEBUG: log all cookies for mobile sessions to /tmp/pwa_debug.log
if user_session.device_type == 'mobile':
try:
with open('/tmp/pwa_debug.log', 'a') as _f:
import json as _json
_f.write(_json.dumps({
'session_id': user_session.id,
'path': request.path,
'pwa_mode': request.cookies.get('pwa_mode'),
'pwa_display': request.cookies.get('pwa_display'),
'all_cookies': list(request.cookies.keys()),
}) + '\n')
except Exception:
pass
if not user_session.is_pwa and pwa_cookie:
user_session.is_pwa = True
db.commit()
return user_session.id
except Exception as e:
logger.error(f"Analytics session error: {e}")
db.rollback()
return None
finally:
db.close()
def track_page_view_for_request():
"""
Track page view for current request.
Called from before_request middleware.
Returns:
page_view_id or None
"""
try:
session_db_id = get_or_create_analytics_session()
if not session_db_id:
return None
db = SessionLocal()
try:
page_view = PageView(
session_id=session_db_id,
user_id=current_user.id if current_user.is_authenticated else None,
url=request.url[:2000] if request.url else '',
path=request.path[:500] if request.path else '/',
referrer=request.referrer[:2000] if request.referrer else None
)
# Extract company_id from path if on company page
if request.path.startswith('/company/'):
try:
slug = request.path.split('/')[2].split('?')[0]
company = db.query(Company).filter_by(slug=slug).first()
if company:
page_view.company_id = company.id
except Exception:
pass
db.add(page_view)
# Update session page count
user_session = db.query(UserSession).filter_by(id=session_db_id).first()
if user_session:
user_session.page_views_count = (user_session.page_views_count or 0) + 1
db.commit()
return page_view.id
except Exception as e:
logger.error(f"Page view tracking error: {e}")
db.rollback()
return None
finally:
db.close()
except Exception as e:
logger.error(f"Page view tracking outer error: {e}")
return None
def get_current_page_view_id():
"""Get page_view_id for current request."""
return _current_page_view_id.get(id(request), '')
def set_current_page_view_id(page_view_id):
"""Set page_view_id for current request."""
_current_page_view_id[id(request)] = page_view_id
def cleanup_page_view_id():
"""Clean up page_view_id from global dict after request."""
_current_page_view_id.pop(id(request), None)
def get_free_tier_usage():
"""
Get today's Gemini API usage for free tier tracking.
Returns:
Dict with requests_today and tokens_today
"""
db = SessionLocal()
try:
today = date.today()
result = db.query(
func.count(AIAPICostLog.id).label('requests'),
func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'gemini'
).first()
return {
'requests_today': result.requests or 0,
'tokens_today': int(result.tokens or 0)
}
except Exception as e:
logger.warning(f"Failed to get free tier usage: {e}")
return {'requests_today': 0, 'tokens_today': 0}
finally:
db.close()
def get_brave_api_usage():
"""
Get Brave Search API usage for current month.
Brave free tier: 2000 requests/month
Returns:
Dict with usage stats and limits
"""
db = SessionLocal()
try:
today = date.today()
current_month = today.month
current_year = today.year
# Monthly usage
monthly_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
extract('month', AIAPICostLog.timestamp) == current_month,
extract('year', AIAPICostLog.timestamp) == current_year,
AIAPICostLog.api_provider == 'brave'
).first()
# Today's usage
daily_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'brave'
).first()
monthly_used = monthly_result.requests or 0
daily_used = daily_result.requests or 0
monthly_limit = 2000 # Brave free tier
return {
'requests_today': daily_used,
'requests_this_month': monthly_used,
'monthly_limit': monthly_limit,
'remaining': max(0, monthly_limit - monthly_used),
'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0,
'tier': 'free',
'is_limit_reached': monthly_used >= monthly_limit
}
except Exception as e:
logger.warning(f"Failed to get Brave API usage: {e}")
return {
'requests_today': 0,
'requests_this_month': 0,
'monthly_limit': 2000,
'remaining': 2000,
'usage_percent': 0,
'tier': 'free',
'is_limit_reached': False
}
finally:
db.close()
def log_brave_api_call(user_id=None, feature='news_search', company_name=None):
"""
Log a Brave API call for usage tracking.
Args:
user_id: User who triggered the call (optional)
feature: Feature name (news_search, etc.)
company_name: Company being searched (for reference)
"""
db = SessionLocal()
try:
log_entry = AIAPICostLog(
api_provider='brave',
model_name='search_api',
feature=feature,
user_id=user_id,
input_tokens=0,
output_tokens=0,
total_tokens=0
)
db.add(log_entry)
db.commit()
logger.debug(f"Logged Brave API call: {feature} for {company_name}")
except Exception as e:
logger.error(f"Failed to log Brave API call: {e}")
db.rollback()
finally:
db.close()