AI Learning System: - Add FeedbackLearningService for few-shot learning from user feedback - Integrate learning context into chat prompts (nordabiz_chat.py) - Add seed examples for cold start (when insufficient real feedback) - Add /api/admin/ai-learning-status endpoint - Add learning status section to chat analytics panel Other Changes: - Update release notes to v1.12.0 - Remove old password references from documentation (CLAUDE.md) - Fix password masking in run_migration.py (use regex for any password) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
517 lines
17 KiB
Python
517 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Norda Biznes AI Chat Engine
|
|
============================
|
|
|
|
Multi-turn conversational AI for company directory queries.
|
|
|
|
Features:
|
|
- Answer questions about member companies
|
|
- Find companies by service, competency, or need
|
|
- Concise, helpful responses
|
|
- Full conversation history tracking
|
|
- Cost tracking per message
|
|
|
|
Author: Norda Biznes Development Team
|
|
Created: 2025-11-23
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional
|
|
import google.generativeai as genai
|
|
import gemini_service
|
|
from search_service import search_companies
|
|
|
|
from database import (
|
|
SessionLocal,
|
|
Company,
|
|
Category,
|
|
Service,
|
|
CompanyService,
|
|
Competency,
|
|
CompanyCompetency,
|
|
Certification,
|
|
Award,
|
|
CompanyEvent,
|
|
AIChatConversation,
|
|
AIChatMessage
|
|
)
|
|
|
|
# Import feedback learning service for few-shot learning
|
|
try:
|
|
from feedback_learning_service import get_feedback_learning_service
|
|
FEEDBACK_LEARNING_AVAILABLE = True
|
|
except ImportError:
|
|
FEEDBACK_LEARNING_AVAILABLE = False
|
|
|
|
|
|
class NordaBizChatEngine:
|
|
"""
|
|
AI Chat Assistant for Norda Biznes company directory
|
|
|
|
Helps users find companies, services, and business partners.
|
|
"""
|
|
|
|
def __init__(self, gemini_api_key: Optional[str] = None, use_global_service: bool = True):
|
|
"""
|
|
Initialize Norda Biznes Chat Engine
|
|
|
|
Args:
|
|
gemini_api_key: Google Gemini API key (uses env var if not provided)
|
|
use_global_service: Use global gemini_service for automatic cost tracking (default: True)
|
|
"""
|
|
self.use_global_service = use_global_service
|
|
|
|
if use_global_service:
|
|
# Use global gemini_service for automatic cost tracking to ai_api_costs table
|
|
self.gemini_service = gemini_service.get_gemini_service()
|
|
self.model_name = "gemini-2.5-flash"
|
|
self.model = None
|
|
|
|
# Initialize tokenizer for cost calculation (still needed for per-message tracking)
|
|
api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY')
|
|
if api_key and api_key != 'TWOJ_KLUCZ_API_TUTAJ':
|
|
genai.configure(api_key=api_key)
|
|
self.tokenizer = genai.GenerativeModel(self.model_name)
|
|
else:
|
|
self.tokenizer = None
|
|
else:
|
|
# Legacy: direct API access (no centralized cost tracking)
|
|
api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY')
|
|
if not api_key or api_key == 'TWOJ_KLUCZ_API_TUTAJ':
|
|
raise ValueError("GOOGLE_GEMINI_API_KEY not found in environment")
|
|
|
|
genai.configure(api_key=api_key)
|
|
self.model_name = "gemini-2.5-flash"
|
|
self.model = genai.GenerativeModel(self.model_name)
|
|
self.tokenizer = self.model
|
|
self.gemini_service = None
|
|
|
|
def start_conversation(
|
|
self,
|
|
user_id: int,
|
|
title: Optional[str] = None,
|
|
conversation_type: str = 'general'
|
|
) -> AIChatConversation:
|
|
"""
|
|
Start new conversation
|
|
|
|
Args:
|
|
user_id: User ID
|
|
title: Optional conversation title
|
|
conversation_type: Type of conversation (default: 'general')
|
|
|
|
Returns:
|
|
AIChatConversation: New conversation object
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Auto-generate title if not provided
|
|
if not title:
|
|
title = f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
conversation = AIChatConversation(
|
|
user_id=user_id,
|
|
started_at=datetime.now(),
|
|
conversation_type=conversation_type,
|
|
title=title,
|
|
is_active=True,
|
|
message_count=0,
|
|
model_name=self.model_name
|
|
)
|
|
|
|
db.add(conversation)
|
|
db.commit()
|
|
db.refresh(conversation)
|
|
|
|
return conversation
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def send_message(
|
|
self,
|
|
conversation_id: int,
|
|
user_message: str,
|
|
user_id: Optional[int] = None
|
|
) -> AIChatMessage:
|
|
"""
|
|
Send message and get AI response
|
|
|
|
Args:
|
|
conversation_id: Conversation ID
|
|
user_message: User's message text
|
|
user_id: User ID for cost tracking (optional)
|
|
|
|
Returns:
|
|
AIChatMessage: AI response message
|
|
"""
|
|
db = SessionLocal()
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Get conversation
|
|
conversation = db.query(AIChatConversation).filter_by(
|
|
id=conversation_id
|
|
).first()
|
|
|
|
if not conversation:
|
|
raise ValueError(f"Conversation {conversation_id} not found")
|
|
|
|
# Save user message
|
|
user_msg = AIChatMessage(
|
|
conversation_id=conversation_id,
|
|
created_at=datetime.now(),
|
|
role='user',
|
|
content=user_message,
|
|
edited=False,
|
|
regenerated=False
|
|
)
|
|
db.add(user_msg)
|
|
db.commit()
|
|
|
|
# Build context from conversation history and relevant companies
|
|
context = self._build_conversation_context(db, conversation, user_message)
|
|
|
|
# Get AI response with cost tracking
|
|
response = self._query_ai(
|
|
context,
|
|
user_message,
|
|
user_id=user_id
|
|
)
|
|
|
|
# Calculate metrics for per-message tracking in AIChatMessage table
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
if self.tokenizer:
|
|
input_tokens = self.tokenizer.count_tokens(user_message).total_tokens
|
|
output_tokens = self.tokenizer.count_tokens(response).total_tokens
|
|
cost_usd = self._calculate_cost(input_tokens, output_tokens)
|
|
else:
|
|
# Fallback if tokenizer not available
|
|
input_tokens = len(user_message.split()) * 2 # Rough estimate
|
|
output_tokens = len(response.split()) * 2
|
|
cost_usd = 0.0
|
|
|
|
# Save AI response
|
|
ai_msg = AIChatMessage(
|
|
conversation_id=conversation_id,
|
|
created_at=datetime.now(),
|
|
role='assistant',
|
|
content=response,
|
|
tokens_input=input_tokens,
|
|
tokens_output=output_tokens,
|
|
cost_usd=cost_usd,
|
|
latency_ms=latency_ms,
|
|
edited=False,
|
|
regenerated=False
|
|
)
|
|
db.add(ai_msg)
|
|
|
|
# Update conversation
|
|
conversation.message_count += 2
|
|
conversation.updated_at = datetime.now()
|
|
db.commit()
|
|
db.refresh(ai_msg)
|
|
|
|
return ai_msg
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def get_conversation_history(
|
|
self,
|
|
conversation_id: int
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all messages in conversation
|
|
|
|
Args:
|
|
conversation_id: Conversation ID
|
|
|
|
Returns:
|
|
List of message dicts
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
messages = db.query(AIChatMessage).filter_by(
|
|
conversation_id=conversation_id
|
|
).order_by(AIChatMessage.created_at).all()
|
|
|
|
return [
|
|
{
|
|
'id': msg.id,
|
|
'role': msg.role,
|
|
'content': msg.content,
|
|
'created_at': msg.created_at.isoformat(),
|
|
'tokens_input': msg.tokens_input,
|
|
'tokens_output': msg.tokens_output,
|
|
'cost_usd': float(msg.cost_usd) if msg.cost_usd else 0.0,
|
|
'latency_ms': msg.latency_ms
|
|
}
|
|
for msg in messages
|
|
]
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def _build_conversation_context(
|
|
self,
|
|
db,
|
|
conversation: AIChatConversation,
|
|
current_message: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build context for AI with ALL companies (not pre-filtered)
|
|
|
|
This allows AI to intelligently select relevant companies instead of
|
|
relying on keyword-based search pre-filtering.
|
|
|
|
Args:
|
|
db: Database session
|
|
conversation: Current conversation
|
|
current_message: User's current message (for reference only)
|
|
|
|
Returns:
|
|
Context dict with ALL companies and categories
|
|
"""
|
|
# Load ALL active companies - let AI do the intelligent filtering
|
|
all_companies = db.query(Company).filter_by(status='active').all()
|
|
|
|
context = {
|
|
'conversation_type': conversation.conversation_type,
|
|
'total_companies': len(all_companies)
|
|
}
|
|
|
|
# Get all categories with company counts
|
|
categories = db.query(Category).all()
|
|
context['categories'] = [
|
|
{
|
|
'name': cat.name,
|
|
'slug': cat.slug,
|
|
'company_count': db.query(Company).filter_by(category_id=cat.id, status='active').count()
|
|
}
|
|
for cat in categories
|
|
]
|
|
|
|
# Include ALL companies in compact format to minimize tokens
|
|
# AI will intelligently select the most relevant ones
|
|
context['all_companies'] = [
|
|
self._company_to_compact_dict(c)
|
|
for c in all_companies
|
|
]
|
|
|
|
# Add conversation history (last 10 messages for context)
|
|
messages = db.query(AIChatMessage).filter_by(
|
|
conversation_id=conversation.id
|
|
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
|
|
|
|
context['recent_messages'] = [
|
|
{'role': msg.role, 'content': msg.content}
|
|
for msg in reversed(messages)
|
|
]
|
|
|
|
return context
|
|
|
|
def _company_to_compact_dict(self, c: Company) -> Dict[str, Any]:
|
|
"""
|
|
Convert company to compact dictionary for AI context.
|
|
Optimized to minimize tokens while keeping all important data.
|
|
|
|
Args:
|
|
c: Company object
|
|
|
|
Returns:
|
|
Compact dict with essential company info
|
|
"""
|
|
compact = {
|
|
'name': c.name,
|
|
'cat': c.category.name if c.category else None,
|
|
}
|
|
|
|
# Only include non-empty fields to save tokens
|
|
if c.description_short:
|
|
compact['desc'] = c.description_short
|
|
if c.founding_history:
|
|
compact['history'] = c.founding_history # Owners, founders, history
|
|
if c.services:
|
|
services = [cs.service.name for cs in c.services if cs.service]
|
|
if services:
|
|
compact['svc'] = services
|
|
if c.competencies:
|
|
competencies = [cc.competency.name for cc in c.competencies if cc.competency]
|
|
if competencies:
|
|
compact['comp'] = competencies
|
|
if c.website:
|
|
compact['web'] = c.website
|
|
if c.phone:
|
|
compact['tel'] = c.phone
|
|
if c.email:
|
|
compact['mail'] = c.email
|
|
if c.address_city:
|
|
compact['city'] = c.address_city
|
|
if c.year_established:
|
|
compact['year'] = c.year_established
|
|
if c.certifications:
|
|
certs = [cert.name for cert in c.certifications if cert.is_active]
|
|
if certs:
|
|
compact['cert'] = certs[:3] # Limit to 3 certs
|
|
|
|
return compact
|
|
|
|
# Słownik synonimów i powiązanych terminów dla lepszego wyszukiwania
|
|
KEYWORD_SYNONYMS = {
|
|
# IT / Web
|
|
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
|
|
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
|
|
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
|
|
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
|
|
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
|
|
# Budownictwo
|
|
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
|
|
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
|
|
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
|
|
# Transport / Logistyka
|
|
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
|
|
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
|
|
# Usługi
|
|
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
|
|
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
|
|
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
|
|
# Produkcja
|
|
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
|
|
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
|
|
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
|
|
}
|
|
|
|
def _find_relevant_companies(self, db, message: str) -> List[Company]:
|
|
"""
|
|
Find companies relevant to user's message
|
|
|
|
Uses unified SearchService with:
|
|
- Synonym expansion for better keyword matching
|
|
- NIP/REGON direct lookup
|
|
- PostgreSQL FTS with fuzzy matching (when available)
|
|
- Fallback scoring for SQLite
|
|
|
|
Args:
|
|
db: Database session
|
|
message: User's message
|
|
|
|
Returns:
|
|
List of relevant Company objects
|
|
"""
|
|
# Use unified SearchService for better search results
|
|
results = search_companies(db, message, limit=10)
|
|
|
|
# Extract Company objects from SearchResult
|
|
return [result.company for result in results]
|
|
|
|
def _query_ai(
|
|
self,
|
|
context: Dict[str, Any],
|
|
user_message: str,
|
|
user_id: Optional[int] = None
|
|
) -> str:
|
|
"""
|
|
Query Gemini AI with full company database context
|
|
|
|
Args:
|
|
context: Context dict with ALL companies
|
|
user_message: User's message
|
|
user_id: User ID for cost tracking
|
|
|
|
Returns:
|
|
AI response text
|
|
"""
|
|
import json
|
|
|
|
# Build system prompt with ALL companies
|
|
system_prompt = f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa.
|
|
|
|
📊 MASZ DOSTĘP DO PEŁNEJ BAZY DANYCH:
|
|
- Liczba firm: {context['total_companies']}
|
|
- Kategorie: {', '.join([f"{cat['name']} ({cat['company_count']})" for cat in context.get('categories', [])])}
|
|
|
|
🎯 TWOJA ROLA:
|
|
- Analizujesz CAŁĄ bazę firm i wybierasz najlepsze dopasowania do pytania użytkownika
|
|
- Odpowiadasz zwięźle (2-3 zdania), chyba że użytkownik prosi o szczegóły
|
|
- Podajesz konkretne nazwy firm z kontaktem
|
|
- Możesz wyszukiwać po: nazwie firmy, usługach, kompetencjach, właścicielach (w history), mieście
|
|
|
|
📋 FORMAT DANYCH (skróty):
|
|
- name: nazwa firmy
|
|
- cat: kategoria
|
|
- desc: krótki opis
|
|
- history: historia firmy, właściciele, założyciele
|
|
- svc: usługi
|
|
- comp: kompetencje
|
|
- web/tel/mail: kontakt
|
|
- city: miasto
|
|
- cert: certyfikaty
|
|
|
|
⚠️ WAŻNE:
|
|
- ZAWSZE podawaj nazwę firmy i kontakt (tel/web/mail jeśli dostępne)
|
|
- Jeśli pytanie o osobę (np. "kto to Roszman") - szukaj w polu "history"
|
|
- Odpowiadaj PO POLSKU
|
|
"""
|
|
|
|
# Add feedback-based learning context (few-shot examples)
|
|
if FEEDBACK_LEARNING_AVAILABLE:
|
|
try:
|
|
feedback_service = get_feedback_learning_service()
|
|
learning_context = feedback_service.format_for_prompt()
|
|
if learning_context:
|
|
system_prompt += learning_context
|
|
except Exception as e:
|
|
# Don't fail if feedback learning has issues
|
|
import logging
|
|
logging.getLogger(__name__).warning(f"Feedback learning error: {e}")
|
|
|
|
# Add ALL companies in compact JSON format
|
|
if context.get('all_companies'):
|
|
system_prompt += "\n\n🏢 PEŁNA BAZA FIRM (wybierz najlepsze):\n"
|
|
system_prompt += json.dumps(context['all_companies'], ensure_ascii=False, indent=None)
|
|
system_prompt += "\n"
|
|
|
|
# Add conversation history
|
|
full_prompt = system_prompt + "\n\n# HISTORIA ROZMOWY:\n"
|
|
for msg in context.get('recent_messages', []):
|
|
role_name = "Użytkownik" if msg['role'] == 'user' else "Ty"
|
|
full_prompt += f"{role_name}: {msg['content']}\n"
|
|
|
|
full_prompt += f"\nUżytkownik: {user_message}\nTy: "
|
|
|
|
# Get response with automatic cost tracking to ai_api_costs table
|
|
if self.use_global_service and self.gemini_service:
|
|
response_text = self.gemini_service.generate_text(
|
|
prompt=full_prompt,
|
|
feature='ai_chat',
|
|
user_id=user_id,
|
|
temperature=0.7
|
|
)
|
|
return response_text
|
|
else:
|
|
# Legacy: direct API call (no centralized cost tracking)
|
|
response = self.model.generate_content(full_prompt)
|
|
return response.text
|
|
|
|
def _calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
|
|
"""
|
|
Calculate cost in USD
|
|
|
|
Args:
|
|
input_tokens: Number of input tokens
|
|
output_tokens: Number of output tokens
|
|
|
|
Returns:
|
|
Total cost in USD
|
|
"""
|
|
# Gemini 2.5 Flash pricing (per 1M tokens)
|
|
input_cost = (input_tokens / 1_000_000) * 0.075
|
|
output_cost = (output_tokens / 1_000_000) * 0.30
|
|
return input_cost + output_cost
|