nordabiz/nordabiz_chat.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

2057 lines
90 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Norda Biznes AI Chat Engine
============================
Multi-turn conversational AI for company directory queries.
Features:
- Answer questions about member companies
- Find companies by service, competency, or need
- Concise, helpful responses
- Full conversation history tracking
- Cost tracking per message
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2025-11-23
"""
import os
import time
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional
import gemini_service
from search_service import search_companies
# Module logger
logger = logging.getLogger(__name__)
from database import (
SessionLocal,
Company,
Category,
Service,
CompanyService,
Competency,
CompanyCompetency,
Certification,
Award,
CompanyEvent,
AIChatConversation,
AIChatMessage,
CompanyRecommendation,
ZOPKNews,
# Etap 2: Tablica B2B, Kalendarz, Forum
Classified,
NordaEvent,
ForumTopic,
ForumReply,
# Etap 3: Osoby, Social Media, Audyty
Person,
CompanyPerson,
CompanySocialMedia,
GBPAudit,
CompanyWebsiteAnalysis,
User
)
# Import feedback learning service for few-shot learning
try:
from feedback_learning_service import get_feedback_learning_service
FEEDBACK_LEARNING_AVAILABLE = True
except ImportError:
FEEDBACK_LEARNING_AVAILABLE = False
# Import ZOPK knowledge service for semantic search
try:
from zopk_knowledge_service import search_knowledge, get_relevant_facts
ZOPK_KNOWLEDGE_AVAILABLE = True
except ImportError:
ZOPK_KNOWLEDGE_AVAILABLE = False
# Import sensitive data sanitization service (RODO compliance)
try:
from sensitive_data_service import sanitize_message, SensitiveDataType
SENSITIVE_DATA_SERVICE_AVAILABLE = True
except ImportError:
SENSITIVE_DATA_SERVICE_AVAILABLE = False
logger.warning("Sensitive data service not available - messages will not be sanitized")
# Import Smart Router and Context Builder
try:
from smart_router import route_query
from context_builder import build_selective_context
SMART_ROUTER_AVAILABLE = True
except ImportError:
SMART_ROUTER_AVAILABLE = False
logger.warning("Smart Router or Context Builder not available - using full context fallback")
# Import company matcher for pre-selecting relevant companies (eliminates hallucinations)
try:
from company_matcher import match_companies
COMPANY_MATCHER_AVAILABLE = True
except ImportError:
COMPANY_MATCHER_AVAILABLE = False
logger.warning("Company matcher not available - using full company list")
# Import memory service for user fact storage and prompt injection
try:
from memory_service import format_memory_for_prompt, extract_facts_async, summarize_conversation_async
MEMORY_SERVICE_AVAILABLE = True
except ImportError:
MEMORY_SERVICE_AVAILABLE = False
logger.warning("Memory service not available - user memory will not be injected")
class NordaBizChatEngine:
"""
AI Chat Assistant for Norda Biznes company directory
Helps users find companies, services, and business partners.
"""
def __init__(self, gemini_api_key: Optional[str] = None, use_global_service: bool = True, model: str = None):
"""
Initialize Norda Biznes Chat Engine
Args:
gemini_api_key: Google Gemini API key (uses env var if not provided)
use_global_service: Use global gemini_service for automatic cost tracking (default: True)
model: Model key ('3-flash', '3-pro') - if provided, creates new service with this model
"""
self.use_global_service = use_global_service
self.requested_model = model
if use_global_service:
if model:
# Create new service with requested model
from gemini_service import GeminiService
self.gemini_service = GeminiService(model=model)
self.model_name = self.gemini_service.model_name
else:
# Use global gemini_service for automatic cost tracking to ai_api_costs table
self.gemini_service = gemini_service.get_gemini_service()
# Get model name from global service (currently Gemini 3 Flash Preview)
self.model_name = self.gemini_service.model_name if self.gemini_service else "gemini-3-flash-preview"
self.model = None
else:
# Legacy: direct API access via gemini_service
from gemini_service import GeminiService
self.gemini_service = GeminiService()
self.model_name = self.gemini_service.model_name
self.model = None
@staticmethod
def _get_valid_company_slugs() -> set:
"""Load all valid company slugs from DB. Cached per-request."""
db = SessionLocal()
try:
from database import Company
rows = db.query(Company.slug, Company.name).filter(Company.status == 'active').all()
return {r.slug: r.name for r in rows if r.slug}
finally:
db.close()
@staticmethod
def _find_correct_slug(attempted_slug: str, valid_slugs: set, name_to_slug: dict) -> Optional[str]:
"""Try to find the correct slug for a hallucinated one."""
# Direct match
if attempted_slug in valid_slugs:
return attempted_slug
# AI often appends legal form to slug: "inpi-sp-z-o-o" instead of "inpi"
# Try prefix match: if any valid slug is a prefix of the attempted slug
for vs in valid_slugs:
if attempted_slug.startswith(vs + '-') or attempted_slug == vs:
return vs
# Try if attempted slug is a prefix of a valid slug
for vs in valid_slugs:
if vs.startswith(attempted_slug + '-') or vs.startswith(attempted_slug):
return vs
return None
@staticmethod
def _validate_company_references(text: str) -> str:
"""
Post-process AI response: fix or remove links to companies that don't exist in DB.
This is the ONLY reliable way to prevent hallucinated company names.
"""
import re
valid_companies = NordaBizChatEngine._get_valid_company_slugs()
valid_slugs = set(valid_companies.keys())
# Map: lowercase name → slug
name_to_slug = {}
for slug, name in valid_companies.items():
name_to_slug[name.lower()] = slug
# 1. Validate markdown links to /company/slug — fix or remove
def replace_link(match):
link_text = match.group(1)
slug = match.group(2)
# Try exact match first
if slug in valid_slugs:
return match.group(0)
# Try fuzzy slug match (AI often adds legal suffix)
correct_slug = NordaBizChatEngine._find_correct_slug(slug, valid_slugs, name_to_slug)
if correct_slug:
logger.info(f"NordaGPT slug corrected: '{slug}''{correct_slug}'")
return f'[{link_text}](/company/{correct_slug})'
# Try matching by link text (company name)
name_slug = name_to_slug.get(link_text.lower().strip())
if name_slug:
logger.info(f"NordaGPT slug resolved by name: '{link_text}''{name_slug}'")
return f'[{link_text}](/company/{name_slug})'
# No match — hallucination, keep just the text without link
logger.warning(f"NordaGPT hallucination blocked: '{link_text}' (slug: '{slug}') not in DB")
return link_text # Keep text, remove link
# Match both relative /company/slug AND absolute https://nordabiznes.pl/company/slug
text = re.sub(r'\[([^\]]+)\]\((?:https?://nordabiznes\.pl)?/company/([a-z0-9-]+)\)', replace_link, text)
# Also catch /firma/ in case AI uses old format
text = re.sub(r'\[([^\]]+)\]\((?:https?://nordabiznes\.pl)?/firma/([a-z0-9-]+)\)', replace_link, text)
# 2. Validate pill-style links (HTML)
def replace_pill_link(match):
full_match = match.group(0)
slug = match.group(1)
link_text = match.group(2) if match.lastindex >= 2 else slug
if slug in valid_slugs:
return full_match
correct_slug = NordaBizChatEngine._find_correct_slug(slug, valid_slugs, name_to_slug)
if correct_slug:
return full_match.replace(f'/company/{slug}', f'/company/{correct_slug}')
logger.warning(f"NordaGPT hallucination blocked: pill link '{slug}' not in DB")
return link_text
text = re.sub(r'<a[^>]*href=["\'](?:https?://nordabiznes\.pl)?/company/([a-z0-9-]+)["\'][^>]*>(.*?)</a>', replace_pill_link, text)
text = re.sub(r'<a[^>]*href=["\'](?:https?://nordabiznes\.pl)?/firma/([a-z0-9-]+)["\'][^>]*>(.*?)</a>', replace_pill_link, text)
# 3. Remove plain-text mentions of fake companies (bold or plain)
# Catch patterns like "**Baumar**" or "Baumar" that appear as company names
# but aren't in the valid list
valid_names_set = set(n.lower() for n in valid_companies.values())
def replace_bold_company(match):
bold_name = match.group(1).strip()
if bold_name.lower() in valid_names_set:
return match.group(0) # Keep valid company
# Check if it's likely a company name (capitalized, not a common word)
common_words = {'budownictwo', 'infrastruktura', 'technologia', 'energia', 'bezpieczeństwo',
'doradztwo', 'networking', 'konsorcja', 'aktualności', 'przygotowanie',
'zatrudnienie', 'wniosek', 'kontakt', 'local content', 'projekt',
'chłodzenie', 'elektryka', 'telekomunikacja', 'ochrona', 'zarządzanie',
'hvac', 'instalacje', 'oze', 'it', 'inwestycje'}
if bold_name.lower() in common_words or len(bold_name) < 3:
return match.group(0) # Not a company name
# Check if valid company name contains this as substring
for vn in valid_names_set:
if bold_name.lower() in vn or vn in bold_name.lower():
return match.group(0) # Partial match — keep it
logger.warning(f"NordaGPT hallucination blocked: bold text '{bold_name}' not a known company")
return ''
text = re.sub(r'\*\*([^*]{2,40})\*\*', replace_bold_company, text)
# 4. Remove ALL plain-text company name mentions that aren't linked
# Catches: "firma Baumar", "również Pro-Bud", "* Pro-Bud to..."
def check_company_name(name: str) -> bool:
"""Check if a name is a valid company."""
name_clean = name.strip().rstrip('.,;:')
if name_clean.lower() in valid_names_set:
return True
for vn in valid_names_set:
if name_clean.lower() in vn or vn in name_clean.lower():
return True
return False
def replace_plain_company(match):
prefix = match.group(1)
name = match.group(2).strip().rstrip('.,;:')
if check_company_name(name):
return match.group(0)
logger.warning(f"NordaGPT hallucination blocked: plain text '{name}' after '{prefix}' not in DB")
return ''
# Pattern 1: "firma X", "również X", "oraz X", "lub X", "i X"
text = re.sub(
r'(firma|firmą|firmę|firmy|również|oraz|lub|czy)\s+([A-ZĄĘÓŁŹŻŚĆŃ][a-zA-ZąęółźżśćńĄĘÓŁŹŻŚĆŃ-]{2,25}(?:\s+[A-ZĄĘÓŁŹŻŚĆŃ][a-zA-ZąęółźżśćńĄĘÓŁŹŻŚĆŃ-]+)?)',
replace_plain_company, text
)
# Pattern 2: "* CompanyName to//specjalizuje" at start of bullet point
def replace_bullet_company(match):
bullet = match.group(1) # "* " or "- "
name = match.group(2).strip()
suffix = match.group(3) # "to", "", "specjalizuje" etc.
if check_company_name(name):
return match.group(0)
logger.warning(f"NordaGPT hallucination blocked: bullet company '{name}' not in DB")
return f'{bullet}{suffix}' # Keep bullet and suffix, remove company name
text = re.sub(
r'(\*\s+)([A-ZĄĘÓŁŹŻŚĆŃ][a-zA-ZąęółźżśćńĄĘÓŁŹŻŚĆŃ-]{2,25}(?:\s+[A-ZĄĘÓŁŹŻŚĆŃ][a-zA-ZąęółźżśćńĄĘÓŁŹŻŚĆŃ-]+)?)\s+(to | |specjali|oferuj|zajmuj|zapewni|posiada|świadcz)',
replace_bullet_company, text
)
# 5. Clean up artifacts left by removals
text = re.sub(r':\s*oraz\s*to\b', ': to', text) # ": oraz to" → ": to"
text = re.sub(r':\s*,', ':', text) # ": ," → ":"
text = re.sub(r'\*\s*\s*\n', '\n', text) # "* "
text = re.sub(r'\*\s*oraz\s*', '*', text) # "* oraz "
text = re.sub(r'\n\s*\*\s*\n', '\n', text) # empty bullet points
text = re.sub(r'\n\s*-\s*\n', '\n', text) # empty list items
text = re.sub(r' +', ' ', text) # double spaces
text = re.sub(r'\n{3,}', '\n\n', text) # triple+ newlines
return text.strip()
def start_conversation(
self,
user_id: int,
title: Optional[str] = None,
conversation_type: str = 'general'
) -> AIChatConversation:
"""
Start new conversation
Args:
user_id: User ID
title: Optional conversation title
conversation_type: Type of conversation (default: 'general')
Returns:
AIChatConversation: New conversation object
"""
db = SessionLocal()
try:
# Auto-generate title if not provided
if not title:
title = f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
conversation = AIChatConversation(
user_id=user_id,
started_at=datetime.now(),
conversation_type=conversation_type,
title=title,
is_active=True,
message_count=0,
model_name=self.model_name
)
db.add(conversation)
db.commit()
db.refresh(conversation)
return conversation
finally:
db.close()
def send_message(
self,
conversation_id: int,
user_message: str,
user_id: int,
thinking_level: str = 'high',
user_context: Optional[Dict[str, Any]] = None
) -> AIChatMessage:
"""
Send message and get AI response
SECURITY: Validates that user_id owns the conversation before allowing messages.
This is defense-in-depth - API routes should also validate ownership.
Args:
conversation_id: Conversation ID
user_message: User's message text
user_id: User ID (required for ownership validation and cost tracking)
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
AIChatMessage: AI response message
Raises:
ValueError: If conversation not found
PermissionError: If user doesn't own the conversation
"""
db = SessionLocal()
start_time = time.time()
try:
# SECURITY: Get conversation with ownership check
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
# SECURITY: Verify user owns this conversation (defense in depth)
if conversation.user_id != user_id:
logger.warning(
f"SECURITY: User {user_id} attempted to access conversation {conversation_id} "
f"owned by user {conversation.user_id}"
)
raise PermissionError("Access denied: You don't own this conversation")
# RODO/GDPR: Sanitize user message - remove sensitive data before storage
# Note: NIP and email are NOT considered sensitive (public business data)
sanitized_message = user_message
sensitive_data_found = []
if SENSITIVE_DATA_SERVICE_AVAILABLE:
sanitized_message, sensitive_data_found = sanitize_message(user_message)
if sensitive_data_found:
logger.info(
f"RODO: Sanitized {len(sensitive_data_found)} sensitive items in message "
f"from user {user_id}: {[m.data_type.value for m in sensitive_data_found]}"
)
# Save user message (sanitized for storage, original for AI context)
user_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='user',
content=sanitized_message, # Store sanitized version
edited=False,
regenerated=False
)
db.add(user_msg)
db.commit()
# Build context from conversation history and relevant companies
# Use ORIGINAL message for AI (so it can understand the question)
# but the sanitized version is what gets stored in DB
# Smart Router — classify query and select data + model
if SMART_ROUTER_AVAILABLE:
route_decision = route_query(
message=user_message,
user_context=user_context,
gemini_service=self.gemini_service
)
logger.info(
f"NordaGPT Router: user={user_context.get('user_name') if user_context else '?'}, "
f"complexity={route_decision['complexity']}, model={route_decision.get('model')}, "
f"thinking={route_decision.get('thinking')}, data={route_decision['data_needed']}, "
f"routed_by={route_decision.get('routed_by')}"
)
# Build context — selective (via router) or full (fallback)
if route_decision.get('routed_by') != 'fallback':
context = build_selective_context(
data_needed=route_decision.get('data_needed', []),
conversation_id=conversation.id,
current_message=user_message,
user_context=user_context
)
else:
context = self._build_conversation_context(db, conversation, user_message)
context['_route_decision'] = route_decision
# Use router-determined thinking level
effective_thinking = route_decision.get('thinking', thinking_level)
else:
context = self._build_conversation_context(db, conversation, user_message)
effective_thinking = thinking_level
# Company Matcher — replace full company list with matched subset
if COMPANY_MATCHER_AVAILABLE:
try:
matched = match_companies(user_message, user_context=user_context, max_results=15)
if matched: # Only use matcher results if non-empty
context['matched_companies'] = matched
context['all_companies'] = [] # Clear full list — use matched only
else:
logger.info("Company matcher returned 0 results — keeping full company list as fallback")
# Don't clear all_companies — AI will use the full list
logger.info(f"Company matcher found {len(matched)} companies for query")
except Exception as e:
logger.warning(f"Company matcher failed: {e}, using full company list")
# Get AI response with cost tracking
response = self._query_ai(
context,
user_message,
user_id=user_id,
thinking_level=effective_thinking,
user_context=user_context
)
# CRITICAL: Validate all company references — remove hallucinated firms
response = self._validate_company_references(response)
# Calculate metrics for per-message tracking in AIChatMessage table
latency_ms = int((time.time() - start_time) * 1000)
if self.gemini_service:
input_tokens = self.gemini_service.count_tokens(user_message)
output_tokens = self.gemini_service.count_tokens(response)
cost_usd = self._calculate_cost(input_tokens, output_tokens)
else:
# Fallback estimate (~4 chars per token)
input_tokens = len(user_message) // 4
output_tokens = len(response) // 4
cost_usd = 0.0
# Save AI response
ai_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='assistant',
content=response,
tokens_input=input_tokens,
tokens_output=output_tokens,
cost_usd=cost_usd,
latency_ms=latency_ms,
edited=False,
regenerated=False
)
db.add(ai_msg)
# Update conversation
conversation.message_count += 2
conversation.updated_at = datetime.now()
db.commit()
db.refresh(ai_msg)
# Async memory extraction in background thread
if MEMORY_SERVICE_AVAILABLE:
import threading
_conv_id = conversation_id
_user_id = user_id
_msg_count = conversation.message_count or 0
_uctx = user_context
_gsvc = self.gemini_service
def _extract_memory():
try:
extract_facts_async(_conv_id, _user_id, _uctx, _gsvc)
if _msg_count % 5 == 0 and _msg_count > 0:
summarize_conversation_async(_conv_id, _user_id, _gsvc)
except Exception as e:
logger.warning(f"Async memory extraction failed: {e}")
threading.Thread(target=_extract_memory, daemon=True).start()
return ai_msg
finally:
db.close()
def get_conversation_history(
self,
conversation_id: int,
user_id: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get all messages in conversation
SECURITY: If user_id is provided, validates ownership before returning messages.
This is defense-in-depth - API routes should also validate ownership.
Args:
conversation_id: Conversation ID
user_id: User ID for ownership validation (optional for backwards compatibility,
but strongly recommended for security)
Returns:
List of message dicts
Raises:
ValueError: If conversation not found
PermissionError: If user_id provided and user doesn't own the conversation
"""
db = SessionLocal()
try:
# SECURITY: Verify ownership if user_id provided
if user_id is not None:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
if conversation.user_id != user_id:
logger.warning(
f"SECURITY: User {user_id} attempted to read history of conversation {conversation_id} "
f"owned by user {conversation.user_id}"
)
raise PermissionError("Access denied: You don't own this conversation")
messages = db.query(AIChatMessage).filter_by(
conversation_id=conversation_id
).order_by(AIChatMessage.created_at).all()
return [
{
'id': msg.id,
'role': msg.role,
'content': msg.content,
'created_at': msg.created_at.isoformat(),
'tokens_input': msg.tokens_input,
'tokens_output': msg.tokens_output,
'cost_usd': float(msg.cost_usd) if msg.cost_usd else 0.0,
'latency_ms': msg.latency_ms
}
for msg in messages
]
finally:
db.close()
def _build_conversation_context(
self,
db,
conversation: AIChatConversation,
current_message: str
) -> Dict[str, Any]:
"""
Build context for AI with ALL companies (not pre-filtered)
This allows AI to intelligently select relevant companies instead of
relying on keyword-based search pre-filtering.
Args:
db: Database session
conversation: Current conversation
current_message: User's current message (for reference only)
Returns:
Context dict with ALL companies and categories
"""
# Load ALL active companies - let AI do the intelligent filtering
all_companies = db.query(Company).filter_by(status='active').all()
context = {
'conversation_type': conversation.conversation_type,
'total_companies': len(all_companies)
}
# Get all categories with company counts
categories = db.query(Category).all()
context['categories'] = [
{
'name': cat.name,
'slug': cat.slug,
'company_count': db.query(Company).filter_by(category_id=cat.id, status='active').count()
}
for cat in categories
]
# Include ALL companies in compact format to minimize tokens
# AI will intelligently select the most relevant ones
context['all_companies'] = [
self._company_to_compact_dict(c)
for c in all_companies
]
# Add conversation history (last 10 messages for context)
messages = db.query(AIChatMessage).filter_by(
conversation_id=conversation.id
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
context['recent_messages'] = [
{'role': msg.role, 'content': msg.content}
for msg in reversed(messages)
]
# === ETAP 1: Rekomendacje i Newsy ===
# Add approved recommendations (peer endorsements)
recommendations = db.query(CompanyRecommendation).filter_by(
status='approved'
).order_by(CompanyRecommendation.created_at.desc()).limit(20).all()
context['recommendations'] = [
{
'company': rec.company.name if rec.company else 'Nieznana',
'text': rec.recommendation_text[:200] if rec.recommendation_text else '',
'service': rec.service_category or '',
'author': rec.user.name if rec.user and rec.show_contact else 'Członek Norda Biznes'
}
for rec in recommendations
]
# Add recent approved news (last 30 days)
from datetime import timedelta
news_cutoff = datetime.now() - timedelta(days=30)
recent_news = db.query(ZOPKNews).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.published_at >= news_cutoff
).order_by(ZOPKNews.published_at.desc()).limit(10).all()
context['recent_news'] = [
{
'title': news.title,
'description': news.description[:400] if news.description else '', # Opis/lead
'summary': news.ai_summary[:300] if news.ai_summary else '', # AI streszczenie
'source': news.source_name or '',
'url': news.url or '', # Link do artykułu
'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '',
'type': news.news_type or 'news',
'keywords': news.keywords[:5] if news.keywords else [] # Słowa kluczowe
}
for news in recent_news
]
# === ZOPK KNOWLEDGE BASE (semantic search) ===
# Detect if question is about ZOPK topics
if self._is_zopk_query(current_message):
zopk_knowledge = self._get_zopk_knowledge_context(db, current_message)
context['zopk_knowledge'] = zopk_knowledge
# === ETAP 2: Tablica B2B, Kalendarz, Forum ===
# Add upcoming events (next 60 days)
from datetime import date
event_cutoff = date.today() + timedelta(days=60)
upcoming_events = db.query(NordaEvent).filter(
NordaEvent.event_date >= date.today(),
NordaEvent.event_date <= event_cutoff
).order_by(NordaEvent.event_date).limit(15).all()
context['upcoming_events'] = [
{
'title': event.title[:80] if event.title else '',
'date': event.event_date.strftime('%Y-%m-%d') if event.event_date else '',
'type': event.event_type or 'meeting',
'location': event.location[:50] if event.location else '',
'speaker': event.speaker_name[:30] if event.speaker_name else ''
}
for event in upcoming_events
]
# Add active B2B classifieds (non-test only)
active_classifieds = db.query(Classified).filter(
Classified.is_active == True,
Classified.is_test == False
).order_by(Classified.created_at.desc()).limit(20).all()
context['classifieds'] = [
{
'type': c.listing_type, # szukam/oferuje
'category': c.category,
'title': c.title,
'description': c.description[:400] if c.description else '', # Pełny opis
'company': c.company.name if c.company else '',
'author': c.author.name if c.author else '',
'budget': c.budget_info or '',
'location': c.location_info or '',
'date': c.created_at.strftime('%Y-%m-%d') if c.created_at else '',
'views': c.views_count or 0,
'url': f'/classifieds/{c.id}'
}
for c in active_classifieds
]
# Add recent forum topics with FULL content, authors and replies
from sqlalchemy.orm import joinedload as jl
forum_topics = db.query(ForumTopic).options(
jl(ForumTopic.author),
jl(ForumTopic.replies).joinedload(ForumReply.author)
).filter(
ForumTopic.category != 'test'
).order_by(ForumTopic.created_at.desc()).limit(15).all()
context['forum_topics'] = []
for topic in forum_topics:
topic_data = {
'title': topic.title,
'content': topic.content[:500] if topic.content else '', # Treść tematu
'author': topic.author.name if topic.author else 'Anonim',
'category': topic.category_label,
'status': topic.status_label,
'date': topic.created_at.strftime('%Y-%m-%d') if topic.created_at else '',
'url': f'/forum/{topic.id}', # Link do tematu
'views': topic.views_count or 0, # Popularność
'pinned': topic.is_pinned, # Czy przypięty (ważny)
'replies_count': topic.reply_count,
'has_attachments': len(topic.attachments) > 0 if topic.attachments else False
}
# Dodaj odpowiedzi (max 5 ostatnich per temat)
if topic.replies:
topic_data['replies'] = [
{
'author': reply.author.name if reply.author else 'Anonim',
'content': reply.content[:300] if reply.content else '',
'date': reply.created_at.strftime('%Y-%m-%d') if reply.created_at else ''
}
for reply in sorted(topic.replies, key=lambda r: r.created_at, reverse=True)[:5]
]
context['forum_topics'].append(topic_data)
# === ETAP 3: Osoby (zarząd/wspólnicy), Social Media, Audyty ===
# Add company people (zarząd, wspólnicy) - grouped by company
from sqlalchemy.orm import joinedload
company_people = db.query(CompanyPerson).options(
joinedload(CompanyPerson.person),
joinedload(CompanyPerson.company)
).order_by(CompanyPerson.company_id).all()
# Group people by company for compact representation
people_by_company = {}
for cp in company_people:
company_name = cp.company.name if cp.company else 'Nieznana'
company_profile = f"https://nordabiznes.pl/company/{cp.company.slug}" if cp.company and cp.company.slug else None
if company_name not in people_by_company:
people_by_company[company_name] = {'profile': company_profile, 'people': []}
person_info = {
'name': cp.person.full_name() if cp.person else '',
'profile': f"https://nordabiznes.pl/osoba/{cp.person.id}" if cp.person else None,
'role': cp.role[:30] if cp.role else ''
}
if cp.shares_percent:
person_info['shares'] = f"{cp.shares_percent}%"
people_by_company[company_name]['people'].append(person_info)
context['company_people'] = people_by_company
# Add registered portal users grouped by company (Option C)
registered_users = db.query(User).filter(
User.is_active == True,
User.company_id.isnot(None)
).options(joinedload(User.company)).all()
users_by_company = {}
for u in registered_users:
company_name = u.company.name if u.company else 'Nieznana'
company_profile = f"https://nordabiznes.pl/company/{u.company.slug}" if u.company and u.company.slug else None
if company_name not in users_by_company:
users_by_company[company_name] = {'profile': company_profile, 'users': []}
# Map portal role codes to human-readable Polish labels
role_labels = {'MANAGER': 'administrator profilu', 'EMPLOYEE': 'pracownik', 'VIEWER': 'obserwator'}
users_by_company[company_name]['users'].append({
'name': u.name,
'email': u.email,
'portal_role': role_labels.get(u.company_role, ''),
'member': u.is_norda_member,
'verified': u.is_verified
})
context['registered_users'] = users_by_company
# Add social media summary per company (platforms and followers)
social_media = db.query(CompanySocialMedia).filter(
CompanySocialMedia.is_valid == True
).options(joinedload(CompanySocialMedia.company)).all()
# Group social media by company
social_by_company = {}
for sm in social_media:
company_name = sm.company.name if sm.company else 'Nieznana'
if company_name not in social_by_company:
social_by_company[company_name] = []
social_by_company[company_name].append({
'platform': sm.platform,
'url': sm.url or '',
'followers': sm.followers_count or 0
})
context['company_social_media'] = social_by_company
# Add latest GBP audit scores (one per company, most recent)
from sqlalchemy import func
# Subquery to get max audit_date per company
latest_audit_subq = db.query(
GBPAudit.company_id,
func.max(GBPAudit.audit_date).label('max_date')
).group_by(GBPAudit.company_id).subquery()
latest_audits = db.query(GBPAudit).join(
latest_audit_subq,
(GBPAudit.company_id == latest_audit_subq.c.company_id) &
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
).options(joinedload(GBPAudit.company)).all()
context['gbp_audits'] = [
{
'company': audit.company.name if audit.company else '',
'score': audit.completeness_score or 0,
'reviews': audit.review_count or 0,
'rating': float(audit.average_rating) if audit.average_rating else 0,
'maps_url': audit.google_maps_url or '',
'profile_url': f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else ''
}
for audit in latest_audits
]
# Add SEO audits (PageSpeed scores) for companies with website analysis
seo_audits = db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None)
).options(joinedload(CompanyWebsiteAnalysis.company)).all()
context['seo_audits'] = [
{
'company': audit.company.name if audit.company else '',
'seo': audit.pagespeed_seo_score or 0,
'performance': audit.pagespeed_performance_score or 0,
'accessibility': audit.pagespeed_accessibility_score or 0,
'best_practices': audit.pagespeed_best_practices_score or 0,
'overall': audit.seo_overall_score or 0,
'url': audit.company.website if audit.company else '',
'profile_url': f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else ''
}
for audit in seo_audits
]
return context
def _company_to_compact_dict(self, c: Company) -> Dict[str, Any]:
"""
Convert company to compact dictionary for AI context.
Optimized to minimize tokens while keeping all important data.
Args:
c: Company object
Returns:
Compact dict with essential company info
"""
compact = {
'name': c.name,
'cat': c.category.name if c.category else None,
'profile': f'https://nordabiznes.pl/company/{c.slug}',
}
# Only include non-empty fields to save tokens
if c.description_short:
compact['desc'] = c.description_short
if c.description_full:
compact['about'] = c.description_full
if c.founding_history:
compact['history'] = c.founding_history
if c.core_values:
compact['values'] = c.core_values
if c.services_offered:
compact['offerings'] = c.services_offered
if c.technologies_used:
compact['tech'] = c.technologies_used
if c.services:
services = [cs.service.name for cs in c.services if cs.service]
if services:
compact['svc'] = services
if c.competencies:
competencies = [cc.competency.name for cc in c.competencies if cc.competency]
if competencies:
compact['comp'] = competencies
if c.website:
compact['web'] = c.website
if c.phone:
compact['tel'] = c.phone
if c.email:
compact['mail'] = c.email
if c.address_city:
compact['city'] = c.address_city
if c.year_established:
compact['year'] = c.year_established
if c.certifications:
certs = [cert.name for cert in c.certifications if cert.is_active]
if certs:
compact['cert'] = certs[:3] # Limit to 3 certs
return compact
# Słownik synonimów i powiązanych terminów dla lepszego wyszukiwania
KEYWORD_SYNONYMS = {
# IT / Web
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
# Budownictwo
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
# Transport / Logistyka
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
# Usługi
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
# Produkcja
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
}
def _find_relevant_companies(self, db, message: str) -> List[Company]:
"""
Find companies relevant to user's message
Uses unified SearchService with:
- Synonym expansion for better keyword matching
- NIP/REGON direct lookup
- PostgreSQL FTS with fuzzy matching (when available)
- Fallback scoring for SQLite
Args:
db: Database session
message: User's message
Returns:
List of relevant Company objects
"""
# Use unified SearchService for better search results
results = search_companies(db, message, limit=10)
# Extract Company objects from SearchResult
return [result.company for result in results]
def _is_zopk_query(self, message: str) -> bool:
"""
Check if the message is related to ZOPK (Zielony Okręg Przemysłowy Kaszubia).
ZOPK topics include:
- Offshore wind energy (Baltic Power, Baltica)
- Nuclear power plant (Lubiatowo-Kopalino)
- Kongsberg investment in Rumia
- Infrastructure (Via Pomerania, S6, Droga Czerwona)
- Hydrogen, data centers
"""
zopk_keywords = [
# Main project
'zopk', 'zielony okręg', 'okręg przemysłowy', 'kaszubia', 'kaszub',
'projekt kaszubia', 'przemysłowy kaszubia',
# Offshore wind (Polish forms + English)
'offshore', 'farmy wiatrowe', 'energetyka wiatrowa', 'bałtyk', 'baltic power',
'baltica', 'orsted', 'morska energia', 'wiatraki morskie', 'farma wiatrowa',
# Nuclear - all Polish grammatical forms
'elektrownia jądrowa', 'elektrowni jądrowej', 'elektrownie jądrowe',
'jądrowa', 'jądrowej', 'jądrowe', 'jądrowy', # adjective forms
'atomowa', 'atomowej', 'atomowe', 'atom',
'lubiatowo', 'kopalino', 'pej', 'polskie elektrownie',
'westinghouse', 'bechtel', 'turbiny', 'arabelle',
# Kongsberg defense industry
'kongsberg', 'inwestycje norweskie', 'przemysł obronny', 'zbrojeniow',
# Infrastructure
'via pomerania', 'droga czerwona', 's6', 'port gdynia',
# Energy transition
'wodór', 'centra danych', 'samsonowicz', 'transformacja energetyczna',
'energetyka', 'energetyczny', 'energetyczna',
# Organizations
'norda biznes', 'izba przedsiębiorców', 'rumia invest', 'rumia',
# Roadmap and milestones
'kamienie milowe', 'roadmapa', 'timeline', 'harmonogram',
'inwestycje pomorze', 'inwestycje pomorskie', 'rozwój pomorza'
]
message_lower = message.lower()
return any(kw in message_lower for kw in zopk_keywords)
def _get_zopk_knowledge_context(self, db, message: str) -> Dict[str, Any]:
"""
Get ZOPK knowledge base context for the current message.
Uses semantic search to find relevant:
- Knowledge chunks (text fragments with embeddings)
- Facts (structured information)
- Entities (companies, people, projects)
Args:
db: Database session
message: User's question
Returns:
Dict with chunks, facts, entities
"""
from database import ZOPKKnowledgeEntity, ZOPKKnowledgeChunk, ZOPKNews
context = {
'chunks': [],
'facts': [],
'entities': []
}
# Check if knowledge service is available
if not ZOPK_KNOWLEDGE_AVAILABLE:
logger.warning("ZOPK knowledge service not available")
return context
try:
# Semantic search in knowledge chunks
chunks = search_knowledge(
db,
query=message,
limit=5,
min_similarity=0.3,
user_id=None # Don't track cost for context building
)
# Enrich chunks with source information
for c in chunks:
chunk_data = {
'content': c['content'][:400], # Limit length
'summary': c.get('summary', ''),
'similarity': c.get('similarity', 0),
'source': 'nieznane',
'date': ''
}
# Get source news info if available
if c.get('source_news_id'):
news = db.query(ZOPKNews).filter(
ZOPKNews.id == c['source_news_id']
).first()
if news:
chunk_data['source'] = news.source_name or news.source_domain or 'nieznane'
chunk_data['source_url'] = news.url or ''
if news.published_at:
chunk_data['date'] = news.published_at.strftime('%Y-%m-%d')
context['chunks'].append(chunk_data)
# Get relevant facts with source information
facts = get_relevant_facts(db, query=message, limit=5)
context['facts'] = [
{
'fact': f['full_text'],
'type': f['fact_type'],
'confidence': f.get('confidence', 0),
'value': f.get('numeric_value'),
'unit': f.get('numeric_unit'),
'source_url': f.get('source_url', ''),
'source_name': f.get('source_name', ''),
'source_date': f.get('source_date', '')
}
for f in facts
]
# Get top mentioned entities (always include for context)
top_entities = db.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.mentions_count > 1
).order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).limit(10).all()
context['entities'] = [
{
'name': e.name,
'type': e.entity_type,
'description': e.short_description or '',
'mentions': e.mentions_count
}
for e in top_entities
]
except Exception as e:
logger.error(f"Error getting ZOPK knowledge context: {e}")
# Return empty context on error, don't break chat
return context
def _build_full_prompt(
self,
context: Dict[str, Any],
user_message: str,
user_context: Optional[Dict[str, Any]] = None,
thinking_level: str = 'high'
) -> str:
"""
Build the complete prompt string for the AI.
Extracted from _query_ai() for reuse in streaming.
Args:
context: Context dict with ALL companies
user_message: User's message
user_context: Optional user identity/context dict
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
Full prompt string ready to send to Gemini
"""
import json
# Build system prompt with ALL companies
recommendations_count = len(context.get('recommendations', []))
news_count = len(context.get('recent_news', []))
events_count = len(context.get('upcoming_events', []))
classifieds_count = len(context.get('classifieds', []))
forum_count = len(context.get('forum_topics', []))
people_companies_count = len(context.get('company_people', {}))
social_companies_count = len(context.get('company_social_media', {}))
gbp_audits_count = len(context.get('gbp_audits', []))
seo_audits_count = len(context.get('seo_audits', []))
user_identity = ""
if user_context:
first_name = user_context.get('user_name', 'Nieznany').split()[0] if user_context.get('user_name') else 'Nieznany'
user_identity = f"""
# AKTUALNY UŻYTKOWNIK
Rozmawiasz z: {user_context.get('user_name', 'Nieznany')}
Firma: {user_context.get('company_name', 'brak')} — kategoria: {user_context.get('company_category', 'brak')}
Rola w firmie: {user_context.get('company_role', 'MEMBER')}
Członek Izby Norda Biznes: {'tak' if user_context.get('is_norda_member') else 'nie'}
Rola w Izbie: {user_context.get('chamber_role') or ''}
Na portalu od: {user_context.get('member_since', 'nieznana data')}
ZASADY PERSONALIZACJI:
- Zwracaj się do użytkownika po imieniu ({first_name})
- W pierwszej wiadomości konwersacji przywitaj się krótko po imieniu
- Na pytania "co wiesz o mnie?" / "kim jestem?" — wypisz powyższe dane + powiązania firmowe z bazy
- Uwzględniaj kontekst firmy użytkownika w odpowiedziach
- NIE ujawniaj danych technicznych (user_id, company_id, rola systemowa)
ABSOLUTNY ZAKAZ HALUCYNACJI — FIRM I DANYCH KONTAKTOWYCH:
- NIGDY nie wymyślaj nazw firm. To jest NAJWAŻNIEJSZA zasada.
- Wymieniaj WYŁĄCZNIE firmy z sekcji "FIRMY W BAZIE" poniżej — żadnych innych.
- Każdą firmę podawaj WYŁĄCZNIE jako link: [Nazwa Firmy](/company/slug) — używając dokładnego slug z bazy.
- Jeśli żadna firma z bazy nie pasuje do zapytania, napisz wprost: "W bazie Izby nie znalazłem firmy o takim profilu."
- NIE WYMIENIAJ firm jako zwykły tekst (bold, kursywa) — TYLKO jako link [Nazwa](/company/slug).
- NIE UŻYWAJ nazw firm ze swojej wiedzy ogólnej — TYLKO z dostarczonej bazy.
DANE KONTAKTOWE — TYLKO Z DOSTARCZONEJ BAZY:
- Numery telefonów podawaj DOKŁADNIE tak jak są w danych firmy (pole "Tel:").
- Adresy stron WWW podawaj DOKŁADNIE tak jak są w danych firmy (pole "WWW:").
- NIGDY nie wymyślaj numerów telefonów ani adresów URL z pamięci.
- Jeśli dana firma nie ma telefonu lub strony WWW w bazie — NIE PODAWAJ ich, napisz "brak danych kontaktowych w bazie".
- Podanie fałszywego numeru telefonu jest GORSZE niż niepodanie żadnego.
- Złamanie tej zasady oznacza linkowanie do nieistniejących stron (404) co jest niedopuszczalne.
PYTANIA FOLLOW-UP (dane kontaktowe, porównania):
- Gdy użytkownik prosi o dane kontaktowe — podaj TYLKO firmy wymienione w Twojej POPRZEDNIEJ odpowiedzi, nie wszystkie z bazy.
- Używaj danych kontaktowych DOKŁADNIE z pól "Tel:" i "WWW:" przypisanych do danej firmy w kontekście.
- Przy każdej firmie w liście kontaktowej podaj KRÓTKIE uzasadnienie dlaczego ją poleciłeś (np. "budownictwo", "HVAC", "IT").
"""
# Inject user memory (facts + conversation summaries) into prompt
user_memory_text = ""
if MEMORY_SERVICE_AVAILABLE and user_context and user_context.get('user_id'):
try:
user_memory_text = format_memory_for_prompt(user_context['user_id'])
except Exception as e:
logger.warning(f"Memory load failed: {e}")
system_prompt = user_identity + user_memory_text + f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa.
📊 MASZ DOSTĘP DO BAZY WIEDZY:
- Liczba firm: {context['total_companies']}
- Kategorie: {', '.join([f"{cat['name']} ({cat['company_count']})" for cat in context.get('categories', [])])}
- Rekomendacje członków: {recommendations_count}
- Ostatnie aktualności: {news_count}
- Nadchodzące wydarzenia: {events_count}
- Ogłoszenia B2B: {classifieds_count}
- Tematy na forum: {forum_count}
- Firmy z danymi KRS (zarząd/wspólnicy): {people_companies_count}
- Firmy z Social Media: {social_companies_count}
- Audyty Google Business: {gbp_audits_count}
- Audyty SEO (PageSpeed): {seo_audits_count}
🎯 TWOJA ROLA:
- Analizujesz CAŁĄ bazę firm i wybierasz najlepsze dopasowania do pytania użytkownika
- Odpowiadasz zwięźle (2-3 zdania), chyba że użytkownik prosi o szczegóły
- Podajesz konkretne nazwy firm z kontaktem
- Możesz wyszukiwać po: nazwie firmy, usługach, kompetencjach, właścicielach (w history), mieście
- Możesz cytować rekomendacje innych członków
- Możesz informować o aktualnych newsach, wydarzeniach, ogłoszeniach i dyskusjach na forum
🏛️ O IZBIE NORDA BIZNES:
Izba Przedsiebiorców NORDA Biznes to stowarzyszenie non-profit z Wejherowa, dzialajace od blisko 30 lat (jubileusz 30-lecia w 2027). NORDA to kaszubskie okreslenie "polnocy" i jeden z 5 regionow Kaszub.
MISJA: Laczenie przedsiebiorcow z regionu, reprezentowanie interesow biznesu wobec samorzadu, tworzenie przestrzeni do networkingu i wymiany doswiadczen, wspieranie rozwoju kompetencji czlonkow.
WARTOSCI: Regionalizm (tozsamosc kaszubska), Wspolpraca (partnerstwo nie konkurencja), Otwartosc (firmy wszystkich wielkosci), Roznorodnosc (wielobranzowosci), Apolitycznosc (neutralnosc polityczna).
STRUKTURA: Zrzesza glownie male firmy (do 50 osob) z roznych branz - budownictwo, IT, handel, uslugi profesjonalne, media, produkcja. Rada Izby liczy 16 osob, Zarzad 4 osoby.
RADA IZBY (kadencja do 2025): Leszek Glaza (prowadzacy), Iwona Musial, Andrzej Gorczycki, Pawel Kwidzinski, Dariusz Schmidtke, Artur Wiertel, Aureliusz Jedrzejewski, Krzysztof Kubis, Angelika Piechocka, Janusz Masiak, Jakub Bornowski, Pawel Piechota, Jacek Pomieczynski, Radoslaw Skwarlo, Roman Wiercinski, Michal Wesierski.
DZIALANIA CYKLICZNE:
- "Chwila dla Biznesu" - networking w ostatni czwartek miesiaca, godz. 18:00, Hotel Olimp, Wejherowo
- Spotkanie Rady - pierwsza sroda miesiaca
- Akademia NORDA - kwartalne szkolenia (ksiegowosc, social media, mentoring)
- Bale Przedsiebiorcy 2-3 razy w roku
- Wizyty u czlonkow Izby (do 5/miesiac)
WYDARZENIA 2025: Piknik w Parku Wejherowo, konferencja "Zielone Okregi Przemyslowe - Kaszubia" w Rumi (200+ uczestnikow), Tydzien Przedsiebiorczosci w Filharmonii Wejherowskiej (prelegent z Microsoft). 30-40 nowych czlonkow w 2025.
STRATEGIA 2026-2031 - TRZY KIERUNKI:
1. Od zrzeszenia do ekosystemu - wspolne projekty, konsorcja, wzajemne zlecenia
2. Brama do regionu - pierwszy kontakt dla inwestorow z zewnatrz (elektrownia jadrowa, offshore, Kaszubia)
3. Profesjonalizacja komunikacji - spojna obecnosc w mediach spolecznosciowych
CEL na 30-lecie (2027): 200 czlonkow, rozpoznawalna marka, realny wplyw na rozwoj regionu.
REGION: Powiat wejherowski - jeden z najdynamiczniej rosnacych demograficznie obszarow w Polsce. 230 tys. mieszkancow (wzrost 30% w 20 lat), sredni wiek ponizej 40 lat. Projekt "Kaszubia" - Zielony Okreg Przemyslowy to ok. 250 mld PLN inwestycji.
PROJEKTY CZLONKOW: Energo Velo / Zarnowiecki Ring N59 - trasa rowerowa wokol Jeziora Zarnowieckiego (autor: Jacek Pomieczynski, EURA-TECH).
📋 FORMAT DANYCH FIRM (skróty):
- name: nazwa firmy
- cat: kategoria
- profile: link do profilu firmy na nordabiznes.pl
- desc: krótki opis
- history: historia firmy, właściciele, założyciele
- svc: usługi
- comp: kompetencje
- web/tel/mail: kontakt
- city: miasto
- cert: certyfikaty
⭐ REKOMENDACJE - opinie członków o firmach:
- company: nazwa polecanej firmy
- text: treść rekomendacji
- service: kategoria usługi
- author: kto poleca
📰 AKTUALNOŚCI - ostatnie newsy:
- title: tytuł artykułu
- source: źródło (portal)
- date: data publikacji
📅 KALENDARZ - nadchodzące wydarzenia Norda Biznes:
- title: nazwa wydarzenia
- date: data (YYYY-MM-DD)
- type: typ (meeting, networking, webinar)
- location: miejsce
- speaker: prelegent (jeśli jest)
📋 TABLICA B2B - ogłoszenia członków:
- type: "szukam" lub "oferuje"
- category: uslugi/produkty/wspolpraca/praca/inne
- title: tytuł ogłoszenia
- company: firma ogłaszająca
- location: lokalizacja
💬 FORUM - dyskusje społeczności (pełna treść!):
- title: tytuł tematu
- content: treść tematu (do 500 znaków)
- author: imię i nazwisko autora tematu
- category: Propozycja funkcji/Błąd/Pytanie/Ogłoszenie
- status: Nowy/W realizacji/Rozwiązany/Odrzucony
- date: data utworzenia
- url: link do tematu na nordabiznes.pl
- views: liczba wyświetleń (popularność)
- pinned: czy temat jest przypięty (ważny)
- replies_count: liczba odpowiedzi
- replies: lista odpowiedzi (autor, treść, data) - max 5 ostatnich
👥 ZARZĄD I WSPÓLNICY - dane KRS firm (pogrupowane po firmie):
- name: imię i nazwisko osoby
- role: funkcja (Prezes Zarządu, Członek Zarządu, Wspólnik, Prokurent)
- shares: procent udziałów (tylko dla wspólników)
📱 SOCIAL MEDIA - profile firm (pogrupowane po firmie):
- platform: facebook, instagram, linkedin, youtube, twitter, tiktok
- url: link do profilu
- followers: liczba obserwujących
🏪 AUDYT GOOGLE BUSINESS - wyniki audytu profili Google:
- company: nazwa firmy
- score: wynik kompletności profilu (0-100)
- reviews: liczba recenzji
- rating: średnia ocena (1-5)
- maps_url: link do profilu Google Maps
- profile_url: link do profilu firmy na nordabiznes.pl
🔍 AUDYT SEO (PageSpeed) - wyniki analizy stron www firm:
- company: nazwa firmy
- seo: wynik SEO (0-100)
- performance: wydajność strony (0-100)
- accessibility: dostępność (0-100)
- best_practices: najlepsze praktyki (0-100)
- overall: ogólny wynik SEO (0-100)
- url: adres strony www
- profile_url: link do profilu firmy na nordabiznes.pl
⚠️ WAŻNE:
- ZAWSZE podawaj nazwę firmy i kontakt (tel/web/mail jeśli dostępne)
🚫 DANE WRAŻLIWE - BEZWZGLĘDNY ZAKAZ:
NIE odpowiadaj na pytania o:
- PESEL (numer identyfikacyjny)
- Numery dowodów osobistych
- Numery paszportów
- Numery kart kredytowych/debetowych
- Hasła lub dane logowania
- Numery kont bankowych / IBAN
Jeśli użytkownik pyta o PESEL lub inne dane wrażliwe (nawet zamaskowane jako [PESEL UKRYTY]):
- ODPOWIEDZ: "Przepraszam, nie mogę podawać informacji o numerach PESEL ani innych danych wrażliwych. Jest to niezgodne z RODO."
- NIE wymyślaj żadnych połączeń między numerami a osobami
- NIE zgaduj czyj to może być PESEL
- NIE sugeruj żadnych osób z bazy danych
To jest wymóg prawny (RODO/GDPR) - nie ma żadnych wyjątków.
🔗 KLIKALNE LINKI (BEZWZGLĘDNIE OBOWIĄZKOWE!):
⚠️ KRYTYCZNE - KAŻDA nazwa firmy MUSI być linkiem markdown:
- ✅ JEDYNY PRAWIDŁOWY FORMAT: [Nazwa Firmy](URL z pola profile)
- ❌ NIEDOPUSZCZALNE: Nazwa Firmy (bez linku)
- ❌ NIEDOPUSZCZALNE: **Nazwa Firmy** (pogrubienie bez linku)
- ❌ NIEDOPUSZCZALNE: "Nazwa Firmy" (cudzysłowy bez linku)
Przykład: [Pixlab Softwarehouse](https://nordabiznes.pl/company/pixlab-sp-z-o-o)
👤 OSOBY - każda osoba (zarząd/wspólnik) też MUSI być linkiem:
- ✅ PRAWIDŁOWO: [Michał Bogdan Roszman](https://nordabiznes.pl/osoba/123)
- ❌ BŁĘDNIE: **Michał Bogdan Roszman** (pogrubienie bez linku)
- ❌ BŁĘDNIE: Michał Bogdan Roszman (tekst bez linku)
W sekcji ZARZĄD I WSPÓLNICY każda osoba ma pole "profile" z URL - UŻYJ GO!
Inne linki które MUSISZ dołączać gdy dostępne:
• Strona www firmy (pole "web" lub "url")
• Profil Google Maps (pole "maps_url")
• Profile social media (pole "url")
• FORUM: [tytuł tematu](/forum/ID) - np. [moje uwagi do CRM](/forum/18)
• B2B: [tytuł ogłoszenia](/ogloszenia/ID)
• AKTUALNOŚCI: [tytuł](/news/ID)
- Jeśli pytanie o osobę (np. "kto to Roszman") - szukaj w ZARZĄD I WSPÓLNICY, ZAREJESTROWANI PRZEDSTAWICIELE FIRM lub w polu "history"
- Jeśli pytanie "kto jest prezesem firmy X" - szukaj w ZARZĄD I WSPÓLNICY
- Jeśli pytanie "kto pracuje w firmie X" lub "kto reprezentuje firmę X" - szukaj w ZARZĄD I WSPÓLNICY i ZAREJESTROWANI PRZEDSTAWICIELE FIRM
- Jeśli pytanie "kto poleca firmę X" - szukaj w rekomendacjach
- Jeśli pytanie "co słychać" - sprawdź aktualności i wydarzenia
- Jeśli pytanie "kiedy następne spotkanie" - sprawdź kalendarz
- Jeśli pytanie "kto szuka/oferuje X" - sprawdź tablicę B2B
- Jeśli pytanie o dyskusje/tematy - sprawdź forum i ZAWSZE podaj link z pola "url"
- Jeśli pytanie o social media/followers - sprawdź SOCIAL MEDIA (dołącz linki!)
- Jeśli pytanie o Google opinie/recenzje - sprawdź AUDYT GOOGLE BUSINESS (dołącz link do Maps!)
- Jeśli pytanie o SEO/wydajność strony/PageSpeed - sprawdź AUDYT SEO (dołącz link do strony!)
- Odpowiadaj PO POLSKU
✍️ FORMATOWANIE ODPOWIEDZI:
🎯 PRECYZJA: Jeśli użytkownik pyta o konkretną liczbę (np. "wymień 5"), ZAWSZE podaj DOKŁADNIE tyle ile prosi!
- "wymień 5" → podaj dokładnie 5 elementów
- "podaj 3 firmy" → podaj dokładnie 3 firmy
- NIGDY nie podawaj mniej niż użytkownik prosi!
📝 FORMAT LIST - każdy element w JEDNEJ LINII ze szczegółami po przecinku:
PRAWIDŁOWO:
1. **Chwila dla Biznesu** (29.01.2026) - Hotel Olimp, Wejherowo, networking
2. **Rada Izby NORDA** (04.02.2026) - biuro Norda Biznes, spotkanie zarządu
3. **Chwila dla Biznesu** (26.02.2026) - Hotel Olimp, Wejherowo, networking
BŁĘDNIE (NIE RÓB - resetuje numerację):
1. **Chwila dla Biznesu**
Data: 29.01.2026
Miejsce: Hotel Olimp
1. **Rada Izby NORDA**
Data: 04.02.2026
- Używaj **pogrubienia** dla tytułów sekcji i nazw wydarzeń
- Nazwy firm ZAWSZE jako link [Nazwa](/company/slug), nigdy jako bold tekst
- Numeracja MUSI być ciągła w obrębie jednej listy: 1, 2, 3, 4, 5 — NIE resetuj do 1
- Gdy dzielisz odpowiedź na sekcje, używaj nagłówków ### zamiast numerowanych list
- Wszystkie szczegóły elementu w JEDNEJ linii (po myślniku lub w nawiasie)
"""
# Add thinking level specific instructions
if thinking_level == 'high':
system_prompt += """
🧠 TRYB GŁĘBOKIEJ ANALIZY - WYMAGANIA:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
W tym trybie użytkownik oczekuje SZCZEGÓŁOWEJ odpowiedzi:
1. ROZBUDOWANA analiza - podaj WIĘCEJ informacji niż standardowo
2. STRUKTURA - używaj list punktowanych/numerowanych (1. 2. 3.)
3. KONTEKST - dodaj tło, wyjaśnienia, powiązania między informacjami
4. WSZYSTKIE LINKI - każda firma, osoba, temat forum MUSI mieć link markdown
5. CYTATY - jeśli są rekomendacje lub opinie, cytuj je
6. WNIOSKI - na końcu możesz dodać krótkie podsumowanie lub wniosek
Przykład odpowiedzi w trybie głębokiej analizy:
"Pomysły [Jacka Pomieczyńskiego](link) w temacie **moje uwagi do CRM** są bardzo konkretne:
1. **Rozbudowa paska nawigacji** - dodanie zakładek: NordaGPT, B2B, Lokalne Projekty
2. **Usprawnienie kalendarza** - podział na przeszłe/przyszłe wydarzenia
3. **Identyfikacja wizualna** - logo Norda Biznes w interfejsie
W dyskusji [Artur Wiertel](link) pytał o moderację. Pełna treść: [moje uwagi do CRM](/forum/18)"
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
elif thinking_level == 'minimal':
system_prompt += """
⚡ TRYB SZYBKI - odpowiadaj zwięźle ale z PEŁNYMI linkami do firm i tematów.
"""
# Add feedback-based learning context (few-shot examples)
if FEEDBACK_LEARNING_AVAILABLE:
try:
feedback_service = get_feedback_learning_service()
learning_context = feedback_service.format_for_prompt()
if learning_context:
system_prompt += learning_context
except Exception as e:
# Don't fail if feedback learning has issues
import logging
logging.getLogger(__name__).warning(f"Feedback learning error: {e}")
# Add companies to prompt — matched (preferred) or full list (fallback)
matched = context.get('matched_companies', [])
if matched:
system_prompt += "\n\n🏢 FIRMY Z BAZY IZBY PASUJĄCE DO ZAPYTANIA:\n"
system_prompt += "Poniżej znajdują się JEDYNE firmy, które możesz wymienić w odpowiedzi.\n"
system_prompt += "NIE WOLNO Ci wymyślać ani dodawać ŻADNYCH innych firm.\n"
system_prompt += "Każdą firmę podawaj jako link: [Nazwa Firmy](/company/slug)\n\n"
for c in matched:
system_prompt += f"### {c['name']} → [link](/company/{c['slug']})\n"
if c.get('category'):
system_prompt += f"Kategoria: {c['category']}\n"
if c.get('description'):
system_prompt += f"Opis: {c['description'][:400]}\n"
if c.get('services'):
system_prompt += f"Usługi: {c['services'][:300]}\n"
if c.get('structured_services'):
system_prompt += f"Oferta: {', '.join(str(s) for s in c['structured_services'][:10] if s)}\n"
if c.get('ai_insights'):
ins = c['ai_insights']
if ins.get('services_list'):
system_prompt += f"Specjalizacje: {', '.join(str(s) for s in ins['services_list'][:8] if s)}\n"
if ins.get('unique_selling_points'):
system_prompt += f"Wyróżniki: {', '.join(str(s) for s in ins['unique_selling_points'][:5] if s)}\n"
if c.get('google_rating') and c.get('google_reviews'):
system_prompt += f"Google: {c['google_rating']}/5 ({c['google_reviews']} opinii)\n"
if c.get('phone'):
system_prompt += f"Tel: {c['phone']}\n"
if c.get('website'):
system_prompt += f"WWW: {c['website']}\n"
if c.get('match_reasons'):
system_prompt += f"Dopasowanie: {', '.join(str(r) for r in c['match_reasons'] if r)}\n"
system_prompt += "\n"
system_prompt += f"POWYŻSZE {len(matched)} FIRM TO JEDYNE, KTÓRE MOŻESZ WYMIENIĆ.\n"
elif context.get('all_companies'):
# Fallback: old behavior with all companies (when company_matcher unavailable)
whitelist_lines = []
for c in context['all_companies']:
name = c.get('name', '')
profile = c.get('profile', '')
slug = profile.replace('/company/', '') if profile else ''
if name and slug:
whitelist_lines.append(f" {name} → [link](/company/{slug})")
system_prompt += "\n\n⚠️ DOZWOLONE FIRMY — możesz wymieniać TYLKO te (użyj dokładnie podanego linku):\n"
system_prompt += "\n".join(whitelist_lines)
system_prompt += "\n\n🏢 SZCZEGÓŁY FIRM:\n"
system_prompt += json.dumps(context['all_companies'], ensure_ascii=False, indent=None)
system_prompt += "\n"
else:
system_prompt += "\n\n⚠️ BRAK DOPASOWANYCH FIRM: Wyszukiwanie nie znalazło firm bezpośrednio pasujących do tego zapytania. "
system_prompt += "Odpowiedz na pytanie ogólnie, bez wymieniania konkretnych nazw firm. "
system_prompt += "NIE WYMYŚLAJ nazw firm. Jeśli pytanie dotyczy firm, powiedz: 'W bazie Izby nie znalazłem firmy o takim profilu.'\n"
# Add recommendations (peer endorsements)
if context.get('recommendations'):
system_prompt += "\n\n⭐ REKOMENDACJE CZŁONKÓW:\n"
system_prompt += json.dumps(context['recommendations'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add recent news
if context.get('recent_news'):
system_prompt += "\n\n📰 OSTATNIE AKTUALNOŚCI:\n"
system_prompt += json.dumps(context['recent_news'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add ZOPK Knowledge Base context (semantic search results)
if context.get('zopk_knowledge'):
zopk = context['zopk_knowledge']
system_prompt += "\n\n🌍 BAZA WIEDZY ZOPK (Zielony Okręg Przemysłowy Kaszubia):\n"
system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
# Collect all sources for citations at the end
sources_for_citation = []
# Add knowledge chunks (most relevant excerpts)
if zopk.get('chunks'):
system_prompt += "\n📄 FRAGMENTY WIEDZY (semantycznie dopasowane):\n"
for i, chunk in enumerate(zopk['chunks'][:5], 1):
source_name = chunk.get('source', 'nieznane')
source_url = chunk.get('source_url', '')
source_date = chunk.get('date', '')
system_prompt += f"\n[{i}] {chunk.get('summary', '')}\n"
if source_url:
system_prompt += f" Źródło: [{source_name}]({source_url}) ({source_date})\n"
if source_url and source_name:
sources_for_citation.append({
'name': source_name,
'url': source_url,
'date': source_date
})
else:
system_prompt += f" Źródło: {source_name} ({source_date})\n"
if chunk.get('content'):
content_preview = chunk['content'][:300]
if len(chunk['content']) > 300:
content_preview += "..."
system_prompt += f" Treść: {content_preview}\n"
# Add verified facts with source links
if zopk.get('facts'):
system_prompt += "\n📌 ZWERYFIKOWANE FAKTY:\n"
for fact in zopk['facts'][:10]:
confidence_stars = "" * int(fact.get('confidence', 0) * 5)
source_name = fact.get('source_name', '')
source_url = fact.get('source_url', '')
source_date = fact.get('source_date', '')
system_prompt += f"{fact.get('fact', '')} [{confidence_stars}]"
if source_name and source_url:
system_prompt += f" ([{source_name}]({source_url}), {source_date})"
sources_for_citation.append({
'name': source_name,
'url': source_url,
'date': source_date
})
system_prompt += "\n"
if fact.get('value') and fact.get('unit'):
system_prompt += f" Wartość: {fact['value']} {fact['unit']}\n"
# Add key entities
if zopk.get('entities'):
system_prompt += "\n🏢 KLUCZOWE PODMIOTY ZOPK:\n"
for entity in zopk['entities'][:8]:
entity_icon = {
'organization': '🏛️',
'company': '🏢',
'person': '👤',
'location': '📍',
'place': '📍',
'project': '🎯',
'technology': ''
}.get(entity.get('type', ''), '')
system_prompt += f"{entity_icon} {entity.get('name', '')} ({entity.get('type', '')})"
if entity.get('description'):
system_prompt += f" - {entity['description']}"
if entity.get('mentions'):
system_prompt += f" [{entity['mentions']} wzmianek]"
system_prompt += "\n"
# Add available sources for citation
if sources_for_citation:
# Deduplicate sources by URL
unique_sources = {s['url']: s for s in sources_for_citation if s.get('url')}.values()
system_prompt += "\n📚 DOSTĘPNE ŹRÓDŁA DO CYTOWANIA:\n"
for src in list(unique_sources)[:5]:
system_prompt += f"- [{src['name']}]({src['url']}) ({src['date']})\n"
system_prompt += "\n🎯 ZASADY ODPOWIEDZI O ZOPK:\n"
system_prompt += "1. Odpowiadaj na podstawie bazy wiedzy (NIE WYMYŚLAJ faktów)\n"
system_prompt += "2. FORMATUJ odpowiedzi używając:\n"
system_prompt += " - **Pogrubienia** dla kluczowych informacji\n"
system_prompt += " - Listy punktowane dla wielu faktów\n"
system_prompt += " - Nagłówki dla sekcji (## Inwestycje, ## Terminarz)\n"
system_prompt += "3. CYTUJ źródła w tekście: \"Według [nazwa portalu](URL) z dnia RRRR-MM-DD...\"\n"
system_prompt += "4. NA KOŃCU odpowiedzi DODAJ sekcję:\n"
system_prompt += " 📚 **Źródła:**\n"
system_prompt += " - [Nazwa portalu](URL) - krótki opis (data)\n"
system_prompt += "5. Podawaj konkretne daty i liczby gdy dostępne\n"
system_prompt += "6. Jeśli brak informacji w bazie - powiedz wprost: \"Nie mam tej informacji w bazie wiedzy ZOPK\"\n"
system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
# Add upcoming events (Etap 2)
if context.get('upcoming_events'):
system_prompt += "\n\n📅 KALENDARZ WYDARZEŃ:\n"
system_prompt += json.dumps(context['upcoming_events'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add B2B classifieds (Etap 2)
if context.get('classifieds'):
system_prompt += "\n\n📋 TABLICA B2B:\n"
system_prompt += json.dumps(context['classifieds'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add forum topics (Etap 2)
if context.get('forum_topics'):
system_prompt += "\n\n💬 FORUM:\n"
system_prompt += json.dumps(context['forum_topics'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add company people - zarząd/wspólnicy (Etap 3)
if context.get('company_people'):
system_prompt += "\n\n👥 ZARZĄD I WSPÓLNICY:\n"
system_prompt += json.dumps(context['company_people'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add registered portal users (Option C)
if context.get('registered_users'):
system_prompt += "\n\n👤 ZAREJESTROWANI PRZEDSTAWICIELE FIRM:\n"
system_prompt += json.dumps(context['registered_users'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add social media per company (Etap 3)
if context.get('company_social_media'):
system_prompt += "\n\n📱 SOCIAL MEDIA:\n"
system_prompt += json.dumps(context['company_social_media'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add GBP audits (Etap 3)
if context.get('gbp_audits'):
system_prompt += "\n\n🏪 AUDYTY GOOGLE BUSINESS:\n"
system_prompt += json.dumps(context['gbp_audits'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add SEO audits (PageSpeed scores)
if context.get('seo_audits'):
system_prompt += "\n\n🔍 AUDYTY SEO (PageSpeed):\n"
system_prompt += json.dumps(context['seo_audits'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add conversation history
full_prompt = system_prompt + "\n\n# HISTORIA ROZMOWY:\n"
for msg in context.get('recent_messages', []):
role_name = "Użytkownik" if msg['role'] == 'user' else "Ty"
full_prompt += f"{role_name}: {msg['content']}\n"
full_prompt += f"\nUżytkownik: {user_message}\nTy: "
return full_prompt
def _query_ai(
self,
context: Dict[str, Any],
user_message: str,
user_id: Optional[int] = None,
thinking_level: str = 'high',
user_context: Optional[Dict[str, Any]] = None
) -> str:
"""
Query Gemini AI with full company database context.
Args:
context: Context dict with ALL companies
user_message: User's message
user_id: User ID for cost tracking
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
AI response text
"""
full_prompt = self._build_full_prompt(context, user_message, user_context, thinking_level)
# Get response with automatic cost tracking to ai_api_costs table
if self.use_global_service and self.gemini_service:
# Read router decision from context to select model
route = context.get('_route_decision', {})
effective_model_id = None
model_alias = route.get('model')
if model_alias:
from gemini_service import GEMINI_MODELS
effective_model_id = GEMINI_MODELS.get(model_alias)
response_text = self.gemini_service.generate_text(
prompt=full_prompt,
feature='ai_chat',
user_id=user_id,
temperature=0.7,
thinking_level=thinking_level,
model=effective_model_id
)
# Post-process to ensure links are added even if AI didn't format them
return self._postprocess_links(response_text, context)
else:
# Legacy: direct API call (no centralized cost tracking)
response = self.model.generate_content(full_prompt)
# Post-process to ensure links are added even if AI didn't format them
return self._postprocess_links(response.text, context)
def send_message_stream(
self,
conversation_id: int,
user_message: str,
user_id: int,
user_context: Optional[Dict[str, Any]] = None
):
"""
Generator yielding streaming chunks as dicts for SSE.
Yields dicts:
{'type': 'token', 'content': text_chunk}
{'type': 'done', 'message_id': int, 'latency_ms': int, 'model': str}
{'type': 'error', 'content': error_message}
"""
import json as json_module
from gemini_service import GEMINI_MODELS
db = SessionLocal()
start_time = time.time()
try:
# SECURITY: Validate ownership
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
yield {'type': 'error', 'content': 'Rozmowa nie istnieje'}
return
if conversation.user_id != user_id:
logger.warning(
f"SECURITY: User {user_id} attempted to stream conversation {conversation_id} "
f"owned by user {conversation.user_id}"
)
yield {'type': 'error', 'content': 'Brak dostępu do tej rozmowy'}
return
# RODO/GDPR: Sanitize user message before storage
sanitized_message = user_message
if SENSITIVE_DATA_SERVICE_AVAILABLE:
sanitized_message, _ = sanitize_message(user_message)
# Save user message
user_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='user',
content=sanitized_message,
edited=False,
regenerated=False
)
db.add(user_msg)
db.commit()
# Smart Router — classify query, select data + model
thinking_level = 'high'
effective_model_id = None
if SMART_ROUTER_AVAILABLE:
route_decision = route_query(
message=user_message,
user_context=user_context,
gemini_service=self.gemini_service
)
logger.info(
f"NordaGPT Stream Router: complexity={route_decision['complexity']}, "
f"model={route_decision.get('model')}, thinking={route_decision.get('thinking')}"
)
# Build selective or full context
if route_decision.get('routed_by') != 'fallback':
context = build_selective_context(
data_needed=route_decision.get('data_needed', []),
conversation_id=conversation.id,
current_message=user_message,
user_context=user_context
)
else:
context = self._build_conversation_context(db, conversation, user_message)
context['_route_decision'] = route_decision
thinking_level = route_decision.get('thinking', 'high')
model_alias = route_decision.get('model')
if model_alias:
effective_model_id = GEMINI_MODELS.get(model_alias)
else:
context = self._build_conversation_context(db, conversation, user_message)
# Company Matcher — replace full company list with matched subset
if COMPANY_MATCHER_AVAILABLE:
try:
matched = match_companies(user_message, user_context=user_context, max_results=15)
if matched: # Only use matcher results if non-empty
context['matched_companies'] = matched
context['all_companies'] = [] # Clear full list — use matched only
else:
logger.info("Company matcher (stream) returned 0 results — keeping full company list as fallback")
# Don't clear all_companies — AI will use the full list
logger.info(f"Company matcher (stream) found {len(matched)} companies for query")
except Exception as e:
logger.warning(f"Company matcher failed: {e}, using full company list")
# Build full prompt
full_prompt = self._build_full_prompt(context, user_message, user_context, thinking_level)
# Determine model name for logging
primary_model = self.gemini_service.model_name if self.gemini_service else 'gemini-3-flash-preview'
actual_model = effective_model_id or primary_model
# Stream from Gemini using generate_content_stream
full_response_text = ""
try:
from gemini_service import THINKING_MODELS, THINKING_LEVELS
from google.genai import types as genai_types
config_params = {'temperature': 0.7}
if actual_model in THINKING_MODELS:
config_params['thinking_config'] = genai_types.ThinkingConfig(
thinking_level=THINKING_LEVELS.get(thinking_level, 'HIGH'),
include_thoughts=False
)
safety_settings = self.gemini_service.safety_settings if self.gemini_service else []
generation_config = genai_types.GenerateContentConfig(
**config_params,
safety_settings=safety_settings
)
stream_response = self.gemini_service.client.models.generate_content_stream(
model=actual_model,
contents=full_prompt,
config=generation_config
)
for chunk in stream_response:
chunk_text = None
try:
chunk_text = chunk.text
except Exception:
pass
if chunk_text:
full_response_text += chunk_text
yield {'type': 'token', 'content': chunk_text}
except Exception as e:
logger.error(f"Streaming error: {e}")
yield {'type': 'error', 'content': f'Błąd generowania odpowiedzi: {str(e)}'}
return
# Post-process links in full response
full_response_text = self._postprocess_links(full_response_text, context)
# CRITICAL: Validate all company references — remove hallucinated firms
full_response_text = self._validate_company_references(full_response_text)
# Calculate metrics
latency_ms = int((time.time() - start_time) * 1000)
input_tokens = len(full_prompt) // 4
output_tokens = len(full_response_text) // 4
cost_usd = self._calculate_cost(input_tokens, output_tokens)
# Save AI response to DB
ai_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='assistant',
content=full_response_text,
tokens_input=input_tokens,
tokens_output=output_tokens,
cost_usd=cost_usd,
latency_ms=latency_ms,
edited=False,
regenerated=False
)
db.add(ai_msg)
# Update conversation stats
conversation.message_count = (conversation.message_count or 0) + 2
conversation.updated_at = datetime.now()
db.commit()
db.refresh(ai_msg)
# Async memory extraction in background thread
if MEMORY_SERVICE_AVAILABLE:
import threading
_conv_id = conversation_id
_user_id = user_id
_msg_count = conversation.message_count or 0
_uctx = user_context
_gsvc = self.gemini_service
def _extract_memory_stream():
try:
extract_facts_async(_conv_id, _user_id, _uctx, _gsvc)
if _msg_count % 5 == 0 and _msg_count > 0:
summarize_conversation_async(_conv_id, _user_id, _gsvc)
except Exception as e:
logger.warning(f"Async memory extraction failed: {e}")
threading.Thread(target=_extract_memory_stream, daemon=True).start()
yield {
'type': 'done',
'message_id': ai_msg.id,
'latency_ms': latency_ms,
'model': actual_model,
'cost_usd': round(cost_usd, 6),
'full_text': full_response_text,
'complexity': route_decision.get('complexity', '?'),
'thinking': route_decision.get('thinking', '?'),
'routed_by': route_decision.get('routed_by', '?'),
}
except Exception as e:
logger.error(f"send_message_stream error: {e}")
yield {'type': 'error', 'content': f'Błąd: {str(e)}'}
finally:
db.close()
def _postprocess_links(self, text: str, context: Dict) -> str:
"""
Post-process AI response to add markdown links for companies and people.
This ensures consistent linking regardless of AI behavior.
Args:
text: AI response text
context: Context dict with company_people data
Returns:
Text with names replaced by markdown links
"""
import re
# Build lookup dict: name -> url
name_to_url = {}
# Extract companies and people from company_people context
company_people = context.get('company_people', {})
for company_name, data in company_people.items():
# Add company
if data.get('profile'):
name_to_url[company_name] = data['profile']
# Add people - normalize to Title Case (DB stores UPPERCASE)
for person in data.get('people', []):
if person.get('name') and person.get('profile'):
# Convert "MICHAŁ BOGDAN ROSZMAN" to "Michał Bogdan Roszman"
normalized_name = person['name'].title()
name_to_url[normalized_name] = person['profile']
# Also extract from companies list (context['companies'] has profile URLs)
# Companies format: list of dicts with 'name' and 'profile'
# This is populated by _company_to_compact_dict
# Sort by name length (longest first) to avoid partial replacements
sorted_names = sorted(name_to_url.keys(), key=len, reverse=True)
for name in sorted_names:
url = name_to_url[name]
if not name or not url:
continue
# Skip if already a markdown link
# Pattern: [Name](url) - already linked
already_linked = re.search(r'\[' + re.escape(name) + r'\]\([^)]+\)', text)
if already_linked:
continue
# Replace **Name** (bold) with [Name](url)
bold_pattern = r'\*\*' + re.escape(name) + r'\*\*'
if re.search(bold_pattern, text):
text = re.sub(bold_pattern, f'[{name}]({url})', text, count=1)
continue
# Replace plain "Name" at word boundaries (but not if already in link)
# Be careful not to replace inside existing markdown
plain_pattern = r'(?<!\[)(?<!\()' + re.escape(name) + r'(?!\])(?!\))'
if re.search(plain_pattern, text):
# Only replace first occurrence to avoid over-linking
text = re.sub(plain_pattern, f'[{name}]({url})', text, count=1)
return text
def _calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""
Calculate cost in USD
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
Returns:
Total cost in USD
"""
# Gemini 2.5 Flash pricing (per 1M tokens)
input_cost = (input_tokens / 1_000_000) * 0.075
output_cost = (output_tokens / 1_000_000) * 0.30
return input_cost + output_cost