nordabiz/blueprints/chat/routes.py
Maciej Pienczyn 86c7e83886
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: handle lazy loading error for company.category in chat routes
2026-03-28 05:54:49 +01:00

693 lines
25 KiB
Python

"""
Chat Routes
===========
AI Chat interface, API, and analytics.
"""
import json as json_module
import logging
from datetime import datetime, date, timedelta
from flask import render_template, request, redirect, url_for, flash, jsonify, session, Response, stream_with_context
from flask_login import login_required, current_user
from sqlalchemy import func, desc
from . import bp
from database import (
SessionLocal, AIChatConversation, AIChatMessage, AIChatFeedback, AIAPICostLog,
SystemRole
)
from nordabiz_chat import NordaBizChatEngine
from utils.decorators import member_required
# Logger
logger = logging.getLogger(__name__)
# ============================================================
# AI COST LIMITS
# ============================================================
# Global budget: 100 PLN/month (~$25) for all users except UNLIMITED_USERS.
# Per-user limits are degressive (weekly < 7x daily, monthly < 4x weekly).
UNLIMITED_USERS = ['maciej.pienczyn@inpi.pl']
# Per-user limits in USD (both Flash and Pro combined)
USER_DAILY_LIMIT = 0.15 # ~2-3 Flash queries/day
USER_WEEKLY_LIMIT = 0.50 # ~9 Flash queries/week (not 7x daily)
USER_MONTHLY_LIMIT = 1.00 # ~18 Flash queries/month (not 4x weekly)
GLOBAL_MONTHLY_BUDGET = 25.00 # $25 = ~100 PLN
def get_user_usage(user_id):
"""Calculate user's AI cost usage for current day, week, and month, plus global usage."""
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_start = today_start - timedelta(days=today_start.weekday()) # Monday
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
db = SessionLocal()
try:
# User's costs this month
monthly_costs = db.query(AIAPICostLog).filter(
AIAPICostLog.user_id == user_id,
AIAPICostLog.timestamp >= month_start,
AIAPICostLog.feature == 'ai_chat'
).all()
daily_total = sum(float(c.total_cost or 0) for c in monthly_costs if c.timestamp >= today_start)
weekly_total = sum(float(c.total_cost or 0) for c in monthly_costs if c.timestamp >= week_start)
monthly_total = sum(float(c.total_cost or 0) for c in monthly_costs)
# Global portal usage this month (all users except unlimited)
from database import User
unlimited_ids = db.query(User.id).filter(User.email.in_(UNLIMITED_USERS)).all()
unlimited_ids = [uid[0] for uid in unlimited_ids]
global_q = db.query(func.coalesce(func.sum(AIAPICostLog.total_cost), 0)).filter(
AIAPICostLog.timestamp >= month_start,
AIAPICostLog.feature == 'ai_chat'
)
if unlimited_ids:
global_q = global_q.filter(~AIAPICostLog.user_id.in_(unlimited_ids))
global_total = float(global_q.scalar() or 0)
return {
'daily': round(daily_total, 4),
'weekly': round(weekly_total, 4),
'monthly': round(monthly_total, 4),
'daily_limit': USER_DAILY_LIMIT,
'weekly_limit': USER_WEEKLY_LIMIT,
'monthly_limit': USER_MONTHLY_LIMIT,
'daily_percent': round(min(daily_total / USER_DAILY_LIMIT * 100, 100), 1) if USER_DAILY_LIMIT > 0 else 0,
'weekly_percent': round(min(weekly_total / USER_WEEKLY_LIMIT * 100, 100), 1) if USER_WEEKLY_LIMIT > 0 else 0,
'monthly_percent': round(min(monthly_total / USER_MONTHLY_LIMIT * 100, 100), 1) if USER_MONTHLY_LIMIT > 0 else 0,
'global_monthly': round(global_total, 4),
'global_monthly_limit': GLOBAL_MONTHLY_BUDGET,
'global_monthly_percent': round(min(global_total / GLOBAL_MONTHLY_BUDGET * 100, 100), 1) if GLOBAL_MONTHLY_BUDGET > 0 else 0,
}
finally:
db.close()
def check_user_limits(user_id, user_email):
"""Check if user has exceeded any limit. Returns (exceeded, message) tuple."""
if user_email in UNLIMITED_USERS:
return False, None
usage = get_user_usage(user_id)
if usage['monthly'] >= USER_MONTHLY_LIMIT:
return True, {
'error': 'Wykorzystano miesięczny limit pytań do NordaGPT. Limit odnowi się na początku kolejnego miesiąca. Jeśli potrzebujesz więcej, kliknij przycisk poniżej.',
'limit_exceeded': 'monthly',
'usage': usage
}
if usage['weekly'] >= USER_WEEKLY_LIMIT:
return True, {
'error': 'Wykorzystano tygodniowy limit pytań do NordaGPT. Nowe pytania będą dostępne od poniedziałku.',
'limit_exceeded': 'weekly',
'usage': usage
}
if usage['daily'] >= USER_DAILY_LIMIT:
return True, {
'error': 'Wykorzystano dzisiejszy limit pytań do NordaGPT. Jutro będziesz mógł zadać kolejne pytania.',
'limit_exceeded': 'daily',
'usage': usage
}
return False, None
# ============================================================
# AI CHAT ROUTES
# ============================================================
@bp.route('/chat')
@login_required
def chat():
"""AI Chat interface - requires MEMBER role"""
# SECURITY: NordaGPT is only for members (MEMBER role or higher)
if not current_user.has_role(SystemRole.MEMBER):
return render_template('chat_members_only.html'), 403
return render_template('chat.html')
@bp.route('/api/chat/settings', methods=['GET', 'POST'])
@login_required
@member_required
def chat_settings():
"""Get or update chat settings (model selection, usage limits)"""
if request.method == 'GET':
model = session.get('chat_model', 'flash')
is_unlimited = current_user.email in UNLIMITED_USERS
try:
usage = get_user_usage(current_user.id)
except Exception as e:
logger.warning(f"Error calculating usage: {e}")
usage = {'daily': 0, 'weekly': 0, 'monthly': 0,
'daily_limit': USER_DAILY_LIMIT, 'weekly_limit': USER_WEEKLY_LIMIT,
'monthly_limit': USER_MONTHLY_LIMIT,
'daily_percent': 0, 'weekly_percent': 0, 'monthly_percent': 0}
return jsonify({
'success': True,
'model': model,
'monthly_cost': round(usage['monthly'], 4),
'usage': usage,
'is_unlimited': is_unlimited
})
# POST - update settings
try:
data = request.get_json()
model = data.get('model', 'flash')
# Validate model
valid_models = ['flash', 'pro']
if model not in valid_models:
model = 'flash'
# Store in session
session['chat_model'] = model
logger.info(f"User {current_user.id} set chat_model to: {model}")
return jsonify({
'success': True,
'model': model
})
except Exception as e:
logger.error(f"Error updating chat settings: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/request-higher-limits', methods=['POST'])
@login_required
@member_required
def chat_request_higher_limits():
"""User requests higher AI limits — logs the request for admin review."""
try:
usage = get_user_usage(current_user.id)
logger.info(
f"HIGHER_LIMITS_REQUEST: User {current_user.id} ({current_user.name}, {current_user.email}) "
f"requested higher AI limits. Current usage: daily=${usage['daily']}, "
f"weekly=${usage['weekly']}, monthly=${usage['monthly']}"
)
return jsonify({
'success': True,
'message': 'Twoje zgłoszenie zostało zarejestrowane. Administrator skontaktuje się z Tobą w sprawie indywidualnych limitów.'
})
except Exception as e:
logger.error(f"Error logging higher limits request: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/start', methods=['POST'])
@login_required
@member_required
def chat_start():
"""Start new chat conversation"""
try:
data = request.get_json()
title = data.get('title', f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
chat_engine = NordaBizChatEngine()
conversation = chat_engine.start_conversation(
user_id=current_user.id,
title=title
)
return jsonify({
'success': True,
'conversation_id': conversation.id,
'title': conversation.title
})
except Exception as e:
logger.error(f"Error starting chat: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/<int:conversation_id>/message', methods=['POST'])
@login_required
@member_required
def chat_send_message(conversation_id):
"""Send message to AI chat"""
try:
data = request.get_json()
message = data.get('message', '').strip()
if not message:
return jsonify({'success': False, 'error': 'Wiadomość nie może być pusta'}), 400
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
# Get model from request or session (flash = default with thinking, pro = premium)
model_choice = data.get('model') or session.get('chat_model', 'flash')
# Check usage limits (applies to all models)
exceeded, limit_msg = check_user_limits(current_user.id, current_user.email)
if exceeded:
return jsonify({'success': False, **limit_msg}), 429
# Build user context for AI personalization
user_context = {
'user_id': current_user.id,
'user_name': current_user.name,
'user_email': current_user.email,
'company_name': current_user.company.name if current_user.company else None,
'company_id': current_user.company.id if current_user.company else None,
'company_category': None,
'company_role': current_user.company_role or 'MEMBER',
'is_norda_member': current_user.is_norda_member,
'chamber_role': current_user.chamber_role,
'member_since': current_user.created_at.strftime('%Y-%m-%d') if current_user.created_at else None,
}
try:
if current_user.company and current_user.company.category:
user_context['company_category'] = current_user.company.category.name
except Exception:
pass
# Map model choice to actual model name and thinking level
model_map = {
'flash': '3-flash', # Gemini 3 Flash - 10K RPD, thinking mode
'pro': '3-pro' # Gemini 3.1 Pro - premium reasoning
}
thinking_map = {
'flash': 'high',
'pro': 'high'
}
model_key = model_map.get(model_choice, '3-flash')
chat_engine = NordaBizChatEngine(model=model_key)
response = chat_engine.send_message(
conversation_id=conversation_id,
user_message=message,
user_id=current_user.id,
thinking_level=thinking_map.get(model_choice, 'high'),
user_context=user_context
)
# Get actual cost from response
tokens_in = response.tokens_input or 0
tokens_out = response.tokens_output or 0
actual_cost = response.cost_usd or 0.0
return jsonify({
'success': True,
'message': response.content,
'message_id': response.id,
'created_at': response.created_at.isoformat(),
# Technical metadata
'tech_info': {
'model': model_choice,
'tokens_input': tokens_in,
'tokens_output': tokens_out,
'tokens_total': tokens_in + tokens_out,
'latency_ms': response.latency_ms or 0,
'cost_usd': round(actual_cost, 6)
}
})
except Exception as e:
logger.error(f"Error sending message: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/<int:conversation_id>/message/stream', methods=['POST'])
@login_required
@member_required
def chat_send_message_stream(conversation_id):
"""Send message to AI chat — streaming SSE response (word-by-word)"""
try:
data = request.get_json()
message = (data.get('message') or '').strip()
if not message:
return jsonify({'success': False, 'error': 'Wiadomość nie może być pusta'}), 400
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
# Get model from request or session
model_choice = data.get('model') or session.get('chat_model', 'flash')
# Check usage limits
exceeded, limit_msg = check_user_limits(current_user.id, current_user.email)
if exceeded:
return jsonify({'success': False, **limit_msg}), 429
# Build user context for AI personalization (same as non-streaming)
user_context = {
'user_id': current_user.id,
'user_name': current_user.name,
'user_email': current_user.email,
'company_name': current_user.company.name if current_user.company else None,
'company_id': current_user.company.id if current_user.company else None,
'company_category': None,
'company_role': current_user.company_role or 'MEMBER',
'is_norda_member': current_user.is_norda_member,
'chamber_role': current_user.chamber_role,
'member_since': current_user.created_at.strftime('%Y-%m-%d') if current_user.created_at else None,
}
try:
if current_user.company and current_user.company.category:
user_context['company_category'] = current_user.company.category.name
except Exception:
pass
model_map = {
'flash': '3-flash',
'pro': '3-pro'
}
model_key = model_map.get(model_choice, '3-flash')
chat_engine = NordaBizChatEngine(model=model_key)
def generate():
try:
for chunk in chat_engine.send_message_stream(
conversation_id=conversation_id,
user_message=message,
user_id=current_user.id,
user_context=user_context
):
yield f"data: {json_module.dumps(chunk, ensure_ascii=False)}\n\n"
except Exception as e:
logger.error(f"SSE generator error: {e}")
error_chunk = {'type': 'error', 'content': f'Błąd: {str(e)}'}
yield f"data: {json_module.dumps(error_chunk)}\n\n"
response = Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return response
except Exception as e:
logger.error(f"Error setting up streaming: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/<int:conversation_id>/history', methods=['GET'])
@login_required
@member_required
def chat_get_history(conversation_id):
"""Get conversation history"""
try:
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
chat_engine = NordaBizChatEngine()
# SECURITY: Pass user_id for defense-in-depth ownership validation
history = chat_engine.get_conversation_history(conversation_id, user_id=current_user.id)
return jsonify({
'success': True,
'messages': history
})
except Exception as e:
logger.error(f"Error getting history: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/api/chat/conversations', methods=['GET'])
@login_required
@member_required
def chat_list_conversations():
"""Get list of user's conversations for sidebar"""
db = SessionLocal()
try:
conversations = db.query(AIChatConversation).filter_by(
user_id=current_user.id
).order_by(
AIChatConversation.is_pinned.desc().nullslast(),
AIChatConversation.updated_at.desc()
).limit(50).all()
return jsonify({
'success': True,
'conversations': [
{
'id': c.id,
'title': c.custom_name or c.title,
'original_title': c.title,
'custom_name': c.custom_name,
'is_pinned': c.is_pinned or False,
'created_at': c.started_at.isoformat() if c.started_at else None,
'updated_at': c.updated_at.isoformat() if c.updated_at else None,
'message_count': len(c.messages) if c.messages else 0
}
for c in conversations
]
})
except Exception as e:
logger.error(f"Error listing conversations: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/api/chat/<int:conversation_id>/rename', methods=['PATCH'])
@login_required
@member_required
def chat_rename_conversation(conversation_id):
"""Rename a conversation"""
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'success': False, 'error': 'Nazwa nie może być pusta'}), 400
if len(name) > 255:
name = name[:255]
conversation.custom_name = name
db.commit()
return jsonify({'success': True, 'name': name})
except Exception as e:
logger.error(f"Error renaming conversation: {e}")
db.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/api/chat/<int:conversation_id>/pin', methods=['PATCH'])
@login_required
@member_required
def chat_pin_conversation(conversation_id):
"""Pin or unpin a conversation"""
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
conversation.is_pinned = not conversation.is_pinned
conversation.pinned_at = datetime.now() if conversation.is_pinned else None
db.commit()
return jsonify({'success': True, 'is_pinned': conversation.is_pinned})
except Exception as e:
logger.error(f"Error pinning conversation: {e}")
db.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/api/chat/<int:conversation_id>/delete', methods=['DELETE'])
@login_required
@member_required
def chat_delete_conversation(conversation_id):
"""Delete a conversation"""
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
# Delete messages first
db.query(AIChatMessage).filter_by(conversation_id=conversation_id).delete()
db.delete(conversation)
db.commit()
return jsonify({'success': True})
except Exception as e:
logger.error(f"Error deleting conversation: {e}")
db.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
# ============================================================
# AI CHAT FEEDBACK & ANALYTICS
# ============================================================
@bp.route('/api/chat/feedback', methods=['POST'])
@login_required
@member_required
def chat_feedback():
"""API: Submit feedback for AI response"""
try:
data = request.get_json()
message_id = data.get('message_id')
rating = data.get('rating') # 1 = thumbs down, 2 = thumbs up
if not message_id or rating not in [1, 2]:
return jsonify({'success': False, 'error': 'Invalid data'}), 400
db = SessionLocal()
try:
# Verify message exists and belongs to user's conversation
message = db.query(AIChatMessage).filter_by(id=message_id).first()
if not message:
return jsonify({'success': False, 'error': 'Message not found'}), 404
conversation = db.query(AIChatConversation).filter_by(
id=message.conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
# Update message feedback
message.feedback_rating = rating
message.feedback_at = datetime.now()
message.feedback_comment = data.get('comment', '')
# Create detailed feedback record if provided
if data.get('is_helpful') is not None or data.get('comment'):
existing_feedback = db.query(AIChatFeedback).filter_by(message_id=message_id).first()
if existing_feedback:
existing_feedback.rating = rating
existing_feedback.is_helpful = data.get('is_helpful')
existing_feedback.is_accurate = data.get('is_accurate')
existing_feedback.found_company = data.get('found_company')
existing_feedback.comment = data.get('comment')
else:
feedback = AIChatFeedback(
message_id=message_id,
user_id=current_user.id,
rating=rating,
is_helpful=data.get('is_helpful'),
is_accurate=data.get('is_accurate'),
found_company=data.get('found_company'),
comment=data.get('comment'),
original_query=data.get('original_query'),
expected_companies=data.get('expected_companies')
)
db.add(feedback)
db.commit()
logger.info(f"Feedback received: message_id={message_id}, rating={rating}")
return jsonify({'success': True})
finally:
db.close()
except Exception as e:
logger.error(f"Error saving feedback: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/admin/chat-analytics')
@login_required
def chat_analytics():
"""Redirect to consolidated analytics chat tab."""
return redirect(url_for('admin.user_insights', tab='chat'))
# ============================================================
# MEMORY CRUD API
# ============================================================
@bp.route('/api/chat/memory', methods=['GET'])
@login_required
@member_required
def get_user_memory_api():
"""API: Get user's memory facts and conversation summaries."""
try:
from memory_service import get_user_memory, get_conversation_summaries
return jsonify({
'facts': get_user_memory(current_user.id, limit=20),
'summaries': get_conversation_summaries(current_user.id, limit=10)
})
except Exception as e:
logger.error(f"Error fetching user memory: {e}")
return jsonify({'error': str(e)}), 500
@bp.route('/api/chat/memory/<int:fact_id>', methods=['DELETE'])
@login_required
@member_required
def delete_memory_fact(fact_id):
"""API: Soft-delete a memory fact."""
try:
from memory_service import delete_user_fact
if delete_user_fact(current_user.id, fact_id):
return jsonify({'status': 'ok'})
return jsonify({'error': 'Nie znaleziono'}), 404
except Exception as e:
logger.error(f"Error deleting memory fact {fact_id}: {e}")
return jsonify({'error': str(e)}), 500