#!/usr/bin/env python3 """ Norda Biznes AI Chat Engine ============================ Multi-turn conversational AI for company directory queries. Features: - Answer questions about member companies - Find companies by service, competency, or need - Concise, helpful responses - Full conversation history tracking - Cost tracking per message Author: Norda Biznes Development Team Created: 2025-11-23 """ import os import time from datetime import datetime from typing import Dict, List, Any, Optional import google.generativeai as genai import gemini_service from search_service import search_companies from database import ( SessionLocal, Company, Category, Service, CompanyService, Competency, CompanyCompetency, Certification, Award, CompanyEvent, AIChatConversation, AIChatMessage ) # Import feedback learning service for few-shot learning try: from feedback_learning_service import get_feedback_learning_service FEEDBACK_LEARNING_AVAILABLE = True except ImportError: FEEDBACK_LEARNING_AVAILABLE = False class NordaBizChatEngine: """ AI Chat Assistant for Norda Biznes company directory Helps users find companies, services, and business partners. """ def __init__(self, gemini_api_key: Optional[str] = None, use_global_service: bool = True): """ Initialize Norda Biznes Chat Engine Args: gemini_api_key: Google Gemini API key (uses env var if not provided) use_global_service: Use global gemini_service for automatic cost tracking (default: True) """ self.use_global_service = use_global_service if use_global_service: # Use global gemini_service for automatic cost tracking to ai_api_costs table self.gemini_service = gemini_service.get_gemini_service() self.model_name = "gemini-2.5-flash" self.model = None # Initialize tokenizer for cost calculation (still needed for per-message tracking) api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY') if api_key and api_key != 'TWOJ_KLUCZ_API_TUTAJ': genai.configure(api_key=api_key) self.tokenizer = genai.GenerativeModel(self.model_name) else: self.tokenizer = None else: # Legacy: direct API access (no centralized cost tracking) api_key = gemini_api_key or os.getenv('GOOGLE_GEMINI_API_KEY') if not api_key or api_key == 'TWOJ_KLUCZ_API_TUTAJ': raise ValueError("GOOGLE_GEMINI_API_KEY not found in environment") genai.configure(api_key=api_key) self.model_name = "gemini-2.5-flash" self.model = genai.GenerativeModel(self.model_name) self.tokenizer = self.model self.gemini_service = None def start_conversation( self, user_id: int, title: Optional[str] = None, conversation_type: str = 'general' ) -> AIChatConversation: """ Start new conversation Args: user_id: User ID title: Optional conversation title conversation_type: Type of conversation (default: 'general') Returns: AIChatConversation: New conversation object """ db = SessionLocal() try: # Auto-generate title if not provided if not title: title = f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}" conversation = AIChatConversation( user_id=user_id, started_at=datetime.now(), conversation_type=conversation_type, title=title, is_active=True, message_count=0, model_name=self.model_name ) db.add(conversation) db.commit() db.refresh(conversation) return conversation finally: db.close() def send_message( self, conversation_id: int, user_message: str, user_id: Optional[int] = None ) -> AIChatMessage: """ Send message and get AI response Args: conversation_id: Conversation ID user_message: User's message text user_id: User ID for cost tracking (optional) Returns: AIChatMessage: AI response message """ db = SessionLocal() start_time = time.time() try: # Get conversation conversation = db.query(AIChatConversation).filter_by( id=conversation_id ).first() if not conversation: raise ValueError(f"Conversation {conversation_id} not found") # Save user message user_msg = AIChatMessage( conversation_id=conversation_id, created_at=datetime.now(), role='user', content=user_message, edited=False, regenerated=False ) db.add(user_msg) db.commit() # Build context from conversation history and relevant companies context = self._build_conversation_context(db, conversation, user_message) # Get AI response with cost tracking response = self._query_ai( context, user_message, user_id=user_id ) # Calculate metrics for per-message tracking in AIChatMessage table latency_ms = int((time.time() - start_time) * 1000) if self.tokenizer: input_tokens = self.tokenizer.count_tokens(user_message).total_tokens output_tokens = self.tokenizer.count_tokens(response).total_tokens cost_usd = self._calculate_cost(input_tokens, output_tokens) else: # Fallback if tokenizer not available input_tokens = len(user_message.split()) * 2 # Rough estimate output_tokens = len(response.split()) * 2 cost_usd = 0.0 # Save AI response ai_msg = AIChatMessage( conversation_id=conversation_id, created_at=datetime.now(), role='assistant', content=response, tokens_input=input_tokens, tokens_output=output_tokens, cost_usd=cost_usd, latency_ms=latency_ms, edited=False, regenerated=False ) db.add(ai_msg) # Update conversation conversation.message_count += 2 conversation.updated_at = datetime.now() db.commit() db.refresh(ai_msg) return ai_msg finally: db.close() def get_conversation_history( self, conversation_id: int ) -> List[Dict[str, Any]]: """ Get all messages in conversation Args: conversation_id: Conversation ID Returns: List of message dicts """ db = SessionLocal() try: messages = db.query(AIChatMessage).filter_by( conversation_id=conversation_id ).order_by(AIChatMessage.created_at).all() return [ { 'id': msg.id, 'role': msg.role, 'content': msg.content, 'created_at': msg.created_at.isoformat(), 'tokens_input': msg.tokens_input, 'tokens_output': msg.tokens_output, 'cost_usd': float(msg.cost_usd) if msg.cost_usd else 0.0, 'latency_ms': msg.latency_ms } for msg in messages ] finally: db.close() def _build_conversation_context( self, db, conversation: AIChatConversation, current_message: str ) -> Dict[str, Any]: """ Build context for AI with ALL companies (not pre-filtered) This allows AI to intelligently select relevant companies instead of relying on keyword-based search pre-filtering. Args: db: Database session conversation: Current conversation current_message: User's current message (for reference only) Returns: Context dict with ALL companies and categories """ # Load ALL active companies - let AI do the intelligent filtering all_companies = db.query(Company).filter_by(status='active').all() context = { 'conversation_type': conversation.conversation_type, 'total_companies': len(all_companies) } # Get all categories with company counts categories = db.query(Category).all() context['categories'] = [ { 'name': cat.name, 'slug': cat.slug, 'company_count': db.query(Company).filter_by(category_id=cat.id, status='active').count() } for cat in categories ] # Include ALL companies in compact format to minimize tokens # AI will intelligently select the most relevant ones context['all_companies'] = [ self._company_to_compact_dict(c) for c in all_companies ] # Add conversation history (last 10 messages for context) messages = db.query(AIChatMessage).filter_by( conversation_id=conversation.id ).order_by(AIChatMessage.created_at.desc()).limit(10).all() context['recent_messages'] = [ {'role': msg.role, 'content': msg.content} for msg in reversed(messages) ] return context def _company_to_compact_dict(self, c: Company) -> Dict[str, Any]: """ Convert company to compact dictionary for AI context. Optimized to minimize tokens while keeping all important data. Args: c: Company object Returns: Compact dict with essential company info """ compact = { 'name': c.name, 'cat': c.category.name if c.category else None, } # Only include non-empty fields to save tokens if c.description_short: compact['desc'] = c.description_short if c.founding_history: compact['history'] = c.founding_history # Owners, founders, history if c.services: services = [cs.service.name for cs in c.services if cs.service] if services: compact['svc'] = services if c.competencies: competencies = [cc.competency.name for cc in c.competencies if cc.competency] if competencies: compact['comp'] = competencies if c.website: compact['web'] = c.website if c.phone: compact['tel'] = c.phone if c.email: compact['mail'] = c.email if c.address_city: compact['city'] = c.address_city if c.year_established: compact['year'] = c.year_established if c.certifications: certs = [cert.name for cert in c.certifications if cert.is_active] if certs: compact['cert'] = certs[:3] # Limit to 3 certs return compact # Słownik synonimów i powiązanych terminów dla lepszego wyszukiwania KEYWORD_SYNONYMS = { # IT / Web 'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'], 'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'], 'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'], 'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'], 'programowanie': ['software', 'kod', 'developer', 'aplikacje'], # Budownictwo 'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'], 'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'], 'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'], # Transport / Logistyka 'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'], 'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'], # Usługi 'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'], 'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'], 'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'], # Produkcja 'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'], 'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'], 'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'], } def _find_relevant_companies(self, db, message: str) -> List[Company]: """ Find companies relevant to user's message Uses unified SearchService with: - Synonym expansion for better keyword matching - NIP/REGON direct lookup - PostgreSQL FTS with fuzzy matching (when available) - Fallback scoring for SQLite Args: db: Database session message: User's message Returns: List of relevant Company objects """ # Use unified SearchService for better search results results = search_companies(db, message, limit=10) # Extract Company objects from SearchResult return [result.company for result in results] def _query_ai( self, context: Dict[str, Any], user_message: str, user_id: Optional[int] = None ) -> str: """ Query Gemini AI with full company database context Args: context: Context dict with ALL companies user_message: User's message user_id: User ID for cost tracking Returns: AI response text """ import json # Build system prompt with ALL companies system_prompt = f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa. 📊 MASZ DOSTĘP DO PEŁNEJ BAZY DANYCH: - Liczba firm: {context['total_companies']} - Kategorie: {', '.join([f"{cat['name']} ({cat['company_count']})" for cat in context.get('categories', [])])} 🎯 TWOJA ROLA: - Analizujesz CAŁĄ bazę firm i wybierasz najlepsze dopasowania do pytania użytkownika - Odpowiadasz zwięźle (2-3 zdania), chyba że użytkownik prosi o szczegóły - Podajesz konkretne nazwy firm z kontaktem - Możesz wyszukiwać po: nazwie firmy, usługach, kompetencjach, właścicielach (w history), mieście 📋 FORMAT DANYCH (skróty): - name: nazwa firmy - cat: kategoria - desc: krótki opis - history: historia firmy, właściciele, założyciele - svc: usługi - comp: kompetencje - web/tel/mail: kontakt - city: miasto - cert: certyfikaty ⚠️ WAŻNE: - ZAWSZE podawaj nazwę firmy i kontakt (tel/web/mail jeśli dostępne) - Jeśli pytanie o osobę (np. "kto to Roszman") - szukaj w polu "history" - Odpowiadaj PO POLSKU """ # Add feedback-based learning context (few-shot examples) if FEEDBACK_LEARNING_AVAILABLE: try: feedback_service = get_feedback_learning_service() learning_context = feedback_service.format_for_prompt() if learning_context: system_prompt += learning_context except Exception as e: # Don't fail if feedback learning has issues import logging logging.getLogger(__name__).warning(f"Feedback learning error: {e}") # Add ALL companies in compact JSON format if context.get('all_companies'): system_prompt += "\n\n🏢 PEŁNA BAZA FIRM (wybierz najlepsze):\n" system_prompt += json.dumps(context['all_companies'], ensure_ascii=False, indent=None) system_prompt += "\n" # Add conversation history full_prompt = system_prompt + "\n\n# HISTORIA ROZMOWY:\n" for msg in context.get('recent_messages', []): role_name = "Użytkownik" if msg['role'] == 'user' else "Ty" full_prompt += f"{role_name}: {msg['content']}\n" full_prompt += f"\nUżytkownik: {user_message}\nTy: " # Get response with automatic cost tracking to ai_api_costs table if self.use_global_service and self.gemini_service: response_text = self.gemini_service.generate_text( prompt=full_prompt, feature='ai_chat', user_id=user_id, temperature=0.7 ) return response_text else: # Legacy: direct API call (no centralized cost tracking) response = self.model.generate_content(full_prompt) return response.text def _calculate_cost(self, input_tokens: int, output_tokens: int) -> float: """ Calculate cost in USD Args: input_tokens: Number of input tokens output_tokens: Number of output tokens Returns: Total cost in USD """ # Gemini 2.5 Flash pricing (per 1M tokens) input_cost = (input_tokens / 1_000_000) * 0.075 output_cost = (output_tokens / 1_000_000) * 0.30 return input_cost + output_cost