nordabiz/nordabiz_chat.py
Maciej Pienczyn 917d686a10
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: calendar year validation, rate limit exemption for polling, migrate old Gemini SDK
- Add year range validation (2020-2100) on /kalendarz/ to prevent ValueError crash
- Exempt notification/message unread-count endpoints from rate limiting (shared IP via NAT)
- Replace deprecated google.generativeai SDK with google-genai in nordabiz_chat.py
- Remove dead news_service import that logged warnings on every worker startup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 19:24:46 +01:00

1437 lines
60 KiB
Python

#!/usr/bin/env python3
"""
Norda Biznes AI Chat Engine
============================
Multi-turn conversational AI for company directory queries.
Features:
- Answer questions about member companies
- Find companies by service, competency, or need
- Concise, helpful responses
- Full conversation history tracking
- Cost tracking per message
Author: Norda Biznes Development Team
Created: 2025-11-23
"""
import os
import time
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional
import gemini_service
from search_service import search_companies
# Module logger
logger = logging.getLogger(__name__)
from database import (
SessionLocal,
Company,
Category,
Service,
CompanyService,
Competency,
CompanyCompetency,
Certification,
Award,
CompanyEvent,
AIChatConversation,
AIChatMessage,
CompanyRecommendation,
ZOPKNews,
# Etap 2: Tablica B2B, Kalendarz, Forum
Classified,
NordaEvent,
ForumTopic,
ForumReply,
# Etap 3: Osoby, Social Media, Audyty
Person,
CompanyPerson,
CompanySocialMedia,
GBPAudit,
CompanyWebsiteAnalysis,
User
)
# Import feedback learning service for few-shot learning
try:
from feedback_learning_service import get_feedback_learning_service
FEEDBACK_LEARNING_AVAILABLE = True
except ImportError:
FEEDBACK_LEARNING_AVAILABLE = False
# Import ZOPK knowledge service for semantic search
try:
from zopk_knowledge_service import search_knowledge, get_relevant_facts
ZOPK_KNOWLEDGE_AVAILABLE = True
except ImportError:
ZOPK_KNOWLEDGE_AVAILABLE = False
# Import sensitive data sanitization service (RODO compliance)
try:
from sensitive_data_service import sanitize_message, SensitiveDataType
SENSITIVE_DATA_SERVICE_AVAILABLE = True
except ImportError:
SENSITIVE_DATA_SERVICE_AVAILABLE = False
logger.warning("Sensitive data service not available - messages will not be sanitized")
class NordaBizChatEngine:
"""
AI Chat Assistant for Norda Biznes company directory
Helps users find companies, services, and business partners.
"""
def __init__(self, gemini_api_key: Optional[str] = None, use_global_service: bool = True, model: str = None):
"""
Initialize Norda Biznes Chat Engine
Args:
gemini_api_key: Google Gemini API key (uses env var if not provided)
use_global_service: Use global gemini_service for automatic cost tracking (default: True)
model: Model key ('3-flash', '3-pro') - if provided, creates new service with this model
"""
self.use_global_service = use_global_service
self.requested_model = model
if use_global_service:
if model:
# Create new service with requested model
from gemini_service import GeminiService
self.gemini_service = GeminiService(model=model)
self.model_name = self.gemini_service.model_name
else:
# Use global gemini_service for automatic cost tracking to ai_api_costs table
self.gemini_service = gemini_service.get_gemini_service()
# Get model name from global service (currently Gemini 3 Flash Preview)
self.model_name = self.gemini_service.model_name if self.gemini_service else "gemini-3-flash-preview"
self.model = None
else:
# Legacy: direct API access via gemini_service
from gemini_service import GeminiService
self.gemini_service = GeminiService()
self.model_name = self.gemini_service.model_name
self.model = None
def start_conversation(
self,
user_id: int,
title: Optional[str] = None,
conversation_type: str = 'general'
) -> AIChatConversation:
"""
Start new conversation
Args:
user_id: User ID
title: Optional conversation title
conversation_type: Type of conversation (default: 'general')
Returns:
AIChatConversation: New conversation object
"""
db = SessionLocal()
try:
# Auto-generate title if not provided
if not title:
title = f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
conversation = AIChatConversation(
user_id=user_id,
started_at=datetime.now(),
conversation_type=conversation_type,
title=title,
is_active=True,
message_count=0,
model_name=self.model_name
)
db.add(conversation)
db.commit()
db.refresh(conversation)
return conversation
finally:
db.close()
def send_message(
self,
conversation_id: int,
user_message: str,
user_id: int,
thinking_level: str = 'high'
) -> AIChatMessage:
"""
Send message and get AI response
SECURITY: Validates that user_id owns the conversation before allowing messages.
This is defense-in-depth - API routes should also validate ownership.
Args:
conversation_id: Conversation ID
user_message: User's message text
user_id: User ID (required for ownership validation and cost tracking)
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
AIChatMessage: AI response message
Raises:
ValueError: If conversation not found
PermissionError: If user doesn't own the conversation
"""
db = SessionLocal()
start_time = time.time()
try:
# SECURITY: Get conversation with ownership check
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
# SECURITY: Verify user owns this conversation (defense in depth)
if conversation.user_id != user_id:
logger.warning(
f"SECURITY: User {user_id} attempted to access conversation {conversation_id} "
f"owned by user {conversation.user_id}"
)
raise PermissionError("Access denied: You don't own this conversation")
# RODO/GDPR: Sanitize user message - remove sensitive data before storage
# Note: NIP and email are NOT considered sensitive (public business data)
sanitized_message = user_message
sensitive_data_found = []
if SENSITIVE_DATA_SERVICE_AVAILABLE:
sanitized_message, sensitive_data_found = sanitize_message(user_message)
if sensitive_data_found:
logger.info(
f"RODO: Sanitized {len(sensitive_data_found)} sensitive items in message "
f"from user {user_id}: {[m.data_type.value for m in sensitive_data_found]}"
)
# Save user message (sanitized for storage, original for AI context)
user_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='user',
content=sanitized_message, # Store sanitized version
edited=False,
regenerated=False
)
db.add(user_msg)
db.commit()
# Build context from conversation history and relevant companies
# Use ORIGINAL message for AI (so it can understand the question)
# but the sanitized version is what gets stored in DB
context = self._build_conversation_context(db, conversation, user_message)
# Get AI response with cost tracking
response = self._query_ai(
context,
user_message,
user_id=user_id,
thinking_level=thinking_level
)
# Calculate metrics for per-message tracking in AIChatMessage table
latency_ms = int((time.time() - start_time) * 1000)
if self.gemini_service:
input_tokens = self.gemini_service.count_tokens(user_message)
output_tokens = self.gemini_service.count_tokens(response)
cost_usd = self._calculate_cost(input_tokens, output_tokens)
else:
# Fallback estimate (~4 chars per token)
input_tokens = len(user_message) // 4
output_tokens = len(response) // 4
cost_usd = 0.0
# Save AI response
ai_msg = AIChatMessage(
conversation_id=conversation_id,
created_at=datetime.now(),
role='assistant',
content=response,
tokens_input=input_tokens,
tokens_output=output_tokens,
cost_usd=cost_usd,
latency_ms=latency_ms,
edited=False,
regenerated=False
)
db.add(ai_msg)
# Update conversation
conversation.message_count += 2
conversation.updated_at = datetime.now()
db.commit()
db.refresh(ai_msg)
return ai_msg
finally:
db.close()
def get_conversation_history(
self,
conversation_id: int,
user_id: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get all messages in conversation
SECURITY: If user_id is provided, validates ownership before returning messages.
This is defense-in-depth - API routes should also validate ownership.
Args:
conversation_id: Conversation ID
user_id: User ID for ownership validation (optional for backwards compatibility,
but strongly recommended for security)
Returns:
List of message dicts
Raises:
ValueError: If conversation not found
PermissionError: If user_id provided and user doesn't own the conversation
"""
db = SessionLocal()
try:
# SECURITY: Verify ownership if user_id provided
if user_id is not None:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id
).first()
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
if conversation.user_id != user_id:
logger.warning(
f"SECURITY: User {user_id} attempted to read history of conversation {conversation_id} "
f"owned by user {conversation.user_id}"
)
raise PermissionError("Access denied: You don't own this conversation")
messages = db.query(AIChatMessage).filter_by(
conversation_id=conversation_id
).order_by(AIChatMessage.created_at).all()
return [
{
'id': msg.id,
'role': msg.role,
'content': msg.content,
'created_at': msg.created_at.isoformat(),
'tokens_input': msg.tokens_input,
'tokens_output': msg.tokens_output,
'cost_usd': float(msg.cost_usd) if msg.cost_usd else 0.0,
'latency_ms': msg.latency_ms
}
for msg in messages
]
finally:
db.close()
def _build_conversation_context(
self,
db,
conversation: AIChatConversation,
current_message: str
) -> Dict[str, Any]:
"""
Build context for AI with ALL companies (not pre-filtered)
This allows AI to intelligently select relevant companies instead of
relying on keyword-based search pre-filtering.
Args:
db: Database session
conversation: Current conversation
current_message: User's current message (for reference only)
Returns:
Context dict with ALL companies and categories
"""
# Load ALL active companies - let AI do the intelligent filtering
all_companies = db.query(Company).filter_by(status='active').all()
context = {
'conversation_type': conversation.conversation_type,
'total_companies': len(all_companies)
}
# Get all categories with company counts
categories = db.query(Category).all()
context['categories'] = [
{
'name': cat.name,
'slug': cat.slug,
'company_count': db.query(Company).filter_by(category_id=cat.id, status='active').count()
}
for cat in categories
]
# Include ALL companies in compact format to minimize tokens
# AI will intelligently select the most relevant ones
context['all_companies'] = [
self._company_to_compact_dict(c)
for c in all_companies
]
# Add conversation history (last 10 messages for context)
messages = db.query(AIChatMessage).filter_by(
conversation_id=conversation.id
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
context['recent_messages'] = [
{'role': msg.role, 'content': msg.content}
for msg in reversed(messages)
]
# === ETAP 1: Rekomendacje i Newsy ===
# Add approved recommendations (peer endorsements)
recommendations = db.query(CompanyRecommendation).filter_by(
status='approved'
).order_by(CompanyRecommendation.created_at.desc()).limit(20).all()
context['recommendations'] = [
{
'company': rec.company.name if rec.company else 'Nieznana',
'text': rec.recommendation_text[:200] if rec.recommendation_text else '',
'service': rec.service_category or '',
'author': rec.user.name if rec.user and rec.show_contact else 'Członek Norda Biznes'
}
for rec in recommendations
]
# Add recent approved news (last 30 days)
from datetime import timedelta
news_cutoff = datetime.now() - timedelta(days=30)
recent_news = db.query(ZOPKNews).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.published_at >= news_cutoff
).order_by(ZOPKNews.published_at.desc()).limit(10).all()
context['recent_news'] = [
{
'title': news.title,
'description': news.description[:400] if news.description else '', # Opis/lead
'summary': news.ai_summary[:300] if news.ai_summary else '', # AI streszczenie
'source': news.source_name or '',
'url': news.url or '', # Link do artykułu
'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '',
'type': news.news_type or 'news',
'keywords': news.keywords[:5] if news.keywords else [] # Słowa kluczowe
}
for news in recent_news
]
# === ZOPK KNOWLEDGE BASE (semantic search) ===
# Detect if question is about ZOPK topics
if self._is_zopk_query(current_message):
zopk_knowledge = self._get_zopk_knowledge_context(db, current_message)
context['zopk_knowledge'] = zopk_knowledge
# === ETAP 2: Tablica B2B, Kalendarz, Forum ===
# Add upcoming events (next 60 days)
from datetime import date
event_cutoff = date.today() + timedelta(days=60)
upcoming_events = db.query(NordaEvent).filter(
NordaEvent.event_date >= date.today(),
NordaEvent.event_date <= event_cutoff
).order_by(NordaEvent.event_date).limit(15).all()
context['upcoming_events'] = [
{
'title': event.title[:80] if event.title else '',
'date': event.event_date.strftime('%Y-%m-%d') if event.event_date else '',
'type': event.event_type or 'meeting',
'location': event.location[:50] if event.location else '',
'speaker': event.speaker_name[:30] if event.speaker_name else ''
}
for event in upcoming_events
]
# Add active B2B classifieds (non-test only)
active_classifieds = db.query(Classified).filter(
Classified.is_active == True,
Classified.is_test == False
).order_by(Classified.created_at.desc()).limit(20).all()
context['classifieds'] = [
{
'type': c.listing_type, # szukam/oferuje
'category': c.category,
'title': c.title,
'description': c.description[:400] if c.description else '', # Pełny opis
'company': c.company.name if c.company else '',
'author': c.author.name if c.author else '',
'budget': c.budget_info or '',
'location': c.location_info or '',
'date': c.created_at.strftime('%Y-%m-%d') if c.created_at else '',
'views': c.views_count or 0,
'url': f'/classifieds/{c.id}'
}
for c in active_classifieds
]
# Add recent forum topics with FULL content, authors and replies
from sqlalchemy.orm import joinedload as jl
forum_topics = db.query(ForumTopic).options(
jl(ForumTopic.author),
jl(ForumTopic.replies).joinedload(ForumReply.author)
).filter(
ForumTopic.category != 'test'
).order_by(ForumTopic.created_at.desc()).limit(15).all()
context['forum_topics'] = []
for topic in forum_topics:
topic_data = {
'title': topic.title,
'content': topic.content[:500] if topic.content else '', # Treść tematu
'author': topic.author.name if topic.author else 'Anonim',
'category': topic.category_label,
'status': topic.status_label,
'date': topic.created_at.strftime('%Y-%m-%d') if topic.created_at else '',
'url': f'/forum/{topic.id}', # Link do tematu
'views': topic.views_count or 0, # Popularność
'pinned': topic.is_pinned, # Czy przypięty (ważny)
'replies_count': topic.reply_count,
'has_attachments': len(topic.attachments) > 0 if topic.attachments else False
}
# Dodaj odpowiedzi (max 5 ostatnich per temat)
if topic.replies:
topic_data['replies'] = [
{
'author': reply.author.name if reply.author else 'Anonim',
'content': reply.content[:300] if reply.content else '',
'date': reply.created_at.strftime('%Y-%m-%d') if reply.created_at else ''
}
for reply in sorted(topic.replies, key=lambda r: r.created_at, reverse=True)[:5]
]
context['forum_topics'].append(topic_data)
# === ETAP 3: Osoby (zarząd/wspólnicy), Social Media, Audyty ===
# Add company people (zarząd, wspólnicy) - grouped by company
from sqlalchemy.orm import joinedload
company_people = db.query(CompanyPerson).options(
joinedload(CompanyPerson.person),
joinedload(CompanyPerson.company)
).order_by(CompanyPerson.company_id).all()
# Group people by company for compact representation
people_by_company = {}
for cp in company_people:
company_name = cp.company.name if cp.company else 'Nieznana'
company_profile = f"https://nordabiznes.pl/company/{cp.company.slug}" if cp.company and cp.company.slug else None
if company_name not in people_by_company:
people_by_company[company_name] = {'profile': company_profile, 'people': []}
person_info = {
'name': cp.person.full_name() if cp.person else '',
'profile': f"https://nordabiznes.pl/osoba/{cp.person.id}" if cp.person else None,
'role': cp.role[:30] if cp.role else ''
}
if cp.shares_percent:
person_info['shares'] = f"{cp.shares_percent}%"
people_by_company[company_name]['people'].append(person_info)
context['company_people'] = people_by_company
# Add registered portal users grouped by company (Option C)
registered_users = db.query(User).filter(
User.is_active == True,
User.company_id.isnot(None)
).options(joinedload(User.company)).all()
users_by_company = {}
for u in registered_users:
company_name = u.company.name if u.company else 'Nieznana'
company_profile = f"https://nordabiznes.pl/company/{u.company.slug}" if u.company and u.company.slug else None
if company_name not in users_by_company:
users_by_company[company_name] = {'profile': company_profile, 'users': []}
# Map portal role codes to human-readable Polish labels
role_labels = {'MANAGER': 'administrator profilu', 'EMPLOYEE': 'pracownik', 'VIEWER': 'obserwator'}
users_by_company[company_name]['users'].append({
'name': u.name,
'email': u.email,
'portal_role': role_labels.get(u.company_role, ''),
'member': u.is_norda_member,
'verified': u.is_verified
})
context['registered_users'] = users_by_company
# Add social media summary per company (platforms and followers)
social_media = db.query(CompanySocialMedia).filter(
CompanySocialMedia.is_valid == True
).options(joinedload(CompanySocialMedia.company)).all()
# Group social media by company
social_by_company = {}
for sm in social_media:
company_name = sm.company.name if sm.company else 'Nieznana'
if company_name not in social_by_company:
social_by_company[company_name] = []
social_by_company[company_name].append({
'platform': sm.platform,
'url': sm.url or '',
'followers': sm.followers_count or 0
})
context['company_social_media'] = social_by_company
# Add latest GBP audit scores (one per company, most recent)
from sqlalchemy import func
# Subquery to get max audit_date per company
latest_audit_subq = db.query(
GBPAudit.company_id,
func.max(GBPAudit.audit_date).label('max_date')
).group_by(GBPAudit.company_id).subquery()
latest_audits = db.query(GBPAudit).join(
latest_audit_subq,
(GBPAudit.company_id == latest_audit_subq.c.company_id) &
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
).options(joinedload(GBPAudit.company)).all()
context['gbp_audits'] = [
{
'company': audit.company.name if audit.company else '',
'score': audit.completeness_score or 0,
'reviews': audit.review_count or 0,
'rating': float(audit.average_rating) if audit.average_rating else 0,
'maps_url': audit.google_maps_url or '',
'profile_url': f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else ''
}
for audit in latest_audits
]
# Add SEO audits (PageSpeed scores) for companies with website analysis
seo_audits = db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None)
).options(joinedload(CompanyWebsiteAnalysis.company)).all()
context['seo_audits'] = [
{
'company': audit.company.name if audit.company else '',
'seo': audit.pagespeed_seo_score or 0,
'performance': audit.pagespeed_performance_score or 0,
'accessibility': audit.pagespeed_accessibility_score or 0,
'best_practices': audit.pagespeed_best_practices_score or 0,
'overall': audit.seo_overall_score or 0,
'url': audit.company.website if audit.company else '',
'profile_url': f'https://nordabiznes.pl/company/{audit.company.slug}' if audit.company else ''
}
for audit in seo_audits
]
return context
def _company_to_compact_dict(self, c: Company) -> Dict[str, Any]:
"""
Convert company to compact dictionary for AI context.
Optimized to minimize tokens while keeping all important data.
Args:
c: Company object
Returns:
Compact dict with essential company info
"""
compact = {
'name': c.name,
'cat': c.category.name if c.category else None,
'profile': f'https://nordabiznes.pl/company/{c.slug}',
}
# Only include non-empty fields to save tokens
if c.description_short:
compact['desc'] = c.description_short
if c.founding_history:
compact['history'] = c.founding_history # Owners, founders, history
if c.services:
services = [cs.service.name for cs in c.services if cs.service]
if services:
compact['svc'] = services
if c.competencies:
competencies = [cc.competency.name for cc in c.competencies if cc.competency]
if competencies:
compact['comp'] = competencies
if c.website:
compact['web'] = c.website
if c.phone:
compact['tel'] = c.phone
if c.email:
compact['mail'] = c.email
if c.address_city:
compact['city'] = c.address_city
if c.year_established:
compact['year'] = c.year_established
if c.certifications:
certs = [cert.name for cert in c.certifications if cert.is_active]
if certs:
compact['cert'] = certs[:3] # Limit to 3 certs
return compact
# Słownik synonimów i powiązanych terminów dla lepszego wyszukiwania
KEYWORD_SYNONYMS = {
# IT / Web
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
# Budownictwo
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
# Transport / Logistyka
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
# Usługi
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
# Produkcja
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
}
def _find_relevant_companies(self, db, message: str) -> List[Company]:
"""
Find companies relevant to user's message
Uses unified SearchService with:
- Synonym expansion for better keyword matching
- NIP/REGON direct lookup
- PostgreSQL FTS with fuzzy matching (when available)
- Fallback scoring for SQLite
Args:
db: Database session
message: User's message
Returns:
List of relevant Company objects
"""
# Use unified SearchService for better search results
results = search_companies(db, message, limit=10)
# Extract Company objects from SearchResult
return [result.company for result in results]
def _is_zopk_query(self, message: str) -> bool:
"""
Check if the message is related to ZOPK (Zielony Okręg Przemysłowy Kaszubia).
ZOPK topics include:
- Offshore wind energy (Baltic Power, Baltica)
- Nuclear power plant (Lubiatowo-Kopalino)
- Kongsberg investment in Rumia
- Infrastructure (Via Pomerania, S6, Droga Czerwona)
- Hydrogen, data centers
"""
zopk_keywords = [
# Main project
'zopk', 'zielony okręg', 'okręg przemysłowy', 'kaszubia', 'kaszub',
'projekt kaszubia', 'przemysłowy kaszubia',
# Offshore wind (Polish forms + English)
'offshore', 'farmy wiatrowe', 'energetyka wiatrowa', 'bałtyk', 'baltic power',
'baltica', 'orsted', 'morska energia', 'wiatraki morskie', 'farma wiatrowa',
# Nuclear - all Polish grammatical forms
'elektrownia jądrowa', 'elektrowni jądrowej', 'elektrownie jądrowe',
'jądrowa', 'jądrowej', 'jądrowe', 'jądrowy', # adjective forms
'atomowa', 'atomowej', 'atomowe', 'atom',
'lubiatowo', 'kopalino', 'pej', 'polskie elektrownie',
'westinghouse', 'bechtel', 'turbiny', 'arabelle',
# Kongsberg defense industry
'kongsberg', 'inwestycje norweskie', 'przemysł obronny', 'zbrojeniow',
# Infrastructure
'via pomerania', 'droga czerwona', 's6', 'port gdynia',
# Energy transition
'wodór', 'centra danych', 'samsonowicz', 'transformacja energetyczna',
'energetyka', 'energetyczny', 'energetyczna',
# Organizations
'norda biznes', 'izba przedsiębiorców', 'rumia invest', 'rumia',
# Roadmap and milestones
'kamienie milowe', 'roadmapa', 'timeline', 'harmonogram',
'inwestycje pomorze', 'inwestycje pomorskie', 'rozwój pomorza'
]
message_lower = message.lower()
return any(kw in message_lower for kw in zopk_keywords)
def _get_zopk_knowledge_context(self, db, message: str) -> Dict[str, Any]:
"""
Get ZOPK knowledge base context for the current message.
Uses semantic search to find relevant:
- Knowledge chunks (text fragments with embeddings)
- Facts (structured information)
- Entities (companies, people, projects)
Args:
db: Database session
message: User's question
Returns:
Dict with chunks, facts, entities
"""
from database import ZOPKKnowledgeEntity, ZOPKKnowledgeChunk, ZOPKNews
context = {
'chunks': [],
'facts': [],
'entities': []
}
# Check if knowledge service is available
if not ZOPK_KNOWLEDGE_AVAILABLE:
logger.warning("ZOPK knowledge service not available")
return context
try:
# Semantic search in knowledge chunks
chunks = search_knowledge(
db,
query=message,
limit=5,
min_similarity=0.3,
user_id=None # Don't track cost for context building
)
# Enrich chunks with source information
for c in chunks:
chunk_data = {
'content': c['content'][:400], # Limit length
'summary': c.get('summary', ''),
'similarity': c.get('similarity', 0),
'source': 'nieznane',
'date': ''
}
# Get source news info if available
if c.get('source_news_id'):
news = db.query(ZOPKNews).filter(
ZOPKNews.id == c['source_news_id']
).first()
if news:
chunk_data['source'] = news.source_name or news.source_domain or 'nieznane'
chunk_data['source_url'] = news.url or ''
if news.published_at:
chunk_data['date'] = news.published_at.strftime('%Y-%m-%d')
context['chunks'].append(chunk_data)
# Get relevant facts with source information
facts = get_relevant_facts(db, query=message, limit=5)
context['facts'] = [
{
'fact': f['full_text'],
'type': f['fact_type'],
'confidence': f.get('confidence', 0),
'value': f.get('numeric_value'),
'unit': f.get('numeric_unit'),
'source_url': f.get('source_url', ''),
'source_name': f.get('source_name', ''),
'source_date': f.get('source_date', '')
}
for f in facts
]
# Get top mentioned entities (always include for context)
top_entities = db.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.mentions_count > 1
).order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).limit(10).all()
context['entities'] = [
{
'name': e.name,
'type': e.entity_type,
'description': e.short_description or '',
'mentions': e.mentions_count
}
for e in top_entities
]
except Exception as e:
logger.error(f"Error getting ZOPK knowledge context: {e}")
# Return empty context on error, don't break chat
return context
def _query_ai(
self,
context: Dict[str, Any],
user_message: str,
user_id: Optional[int] = None,
thinking_level: str = 'high'
) -> str:
"""
Query Gemini AI with full company database context
Args:
context: Context dict with ALL companies
user_message: User's message
user_id: User ID for cost tracking
thinking_level: AI reasoning depth ('minimal', 'low', 'medium', 'high')
Returns:
AI response text
"""
import json
# Build system prompt with ALL companies
recommendations_count = len(context.get('recommendations', []))
news_count = len(context.get('recent_news', []))
events_count = len(context.get('upcoming_events', []))
classifieds_count = len(context.get('classifieds', []))
forum_count = len(context.get('forum_topics', []))
people_companies_count = len(context.get('company_people', {}))
social_companies_count = len(context.get('company_social_media', {}))
gbp_audits_count = len(context.get('gbp_audits', []))
seo_audits_count = len(context.get('seo_audits', []))
system_prompt = f"""Jesteś pomocnym asystentem portalu Norda Biznes - katalogu firm zrzeszonych w stowarzyszeniu Norda Biznes z Wejherowa.
📊 MASZ DOSTĘP DO BAZY WIEDZY:
- Liczba firm: {context['total_companies']}
- Kategorie: {', '.join([f"{cat['name']} ({cat['company_count']})" for cat in context.get('categories', [])])}
- Rekomendacje członków: {recommendations_count}
- Ostatnie aktualności: {news_count}
- Nadchodzące wydarzenia: {events_count}
- Ogłoszenia B2B: {classifieds_count}
- Tematy na forum: {forum_count}
- Firmy z danymi KRS (zarząd/wspólnicy): {people_companies_count}
- Firmy z Social Media: {social_companies_count}
- Audyty Google Business: {gbp_audits_count}
- Audyty SEO (PageSpeed): {seo_audits_count}
🎯 TWOJA ROLA:
- Analizujesz CAŁĄ bazę firm i wybierasz najlepsze dopasowania do pytania użytkownika
- Odpowiadasz zwięźle (2-3 zdania), chyba że użytkownik prosi o szczegóły
- Podajesz konkretne nazwy firm z kontaktem
- Możesz wyszukiwać po: nazwie firmy, usługach, kompetencjach, właścicielach (w history), mieście
- Możesz cytować rekomendacje innych członków
- Możesz informować o aktualnych newsach, wydarzeniach, ogłoszeniach i dyskusjach na forum
🏛️ O IZBIE NORDA BIZNES:
Izba Przedsiebiorców NORDA Biznes to stowarzyszenie non-profit z Wejherowa, dzialajace od blisko 30 lat (jubileusz 30-lecia w 2027). NORDA to kaszubskie okreslenie "polnocy" i jeden z 5 regionow Kaszub.
MISJA: Laczenie przedsiebiorcow z regionu, reprezentowanie interesow biznesu wobec samorzadu, tworzenie przestrzeni do networkingu i wymiany doswiadczen, wspieranie rozwoju kompetencji czlonkow.
WARTOSCI: Regionalizm (tozsamosc kaszubska), Wspolpraca (partnerstwo nie konkurencja), Otwartosc (firmy wszystkich wielkosci), Roznorodnosc (wielobranzowosci), Apolitycznosc (neutralnosc polityczna).
STRUKTURA: Zrzesza glownie male firmy (do 50 osob) z roznych branz - budownictwo, IT, handel, uslugi profesjonalne, media, produkcja. Rada Izby liczy 16 osob, Zarzad 4 osoby.
RADA IZBY (kadencja do 2025): Leszek Glaza (prowadzacy), Iwona Musial, Andrzej Gorczycki, Pawel Kwidzinski, Dariusz Schmidtke, Artur Wiertel, Aureliusz Jedrzejewski, Krzysztof Kubis, Angelika Piechocka, Janusz Masiak, Jakub Bornowski, Pawel Piechota, Jacek Pomieczynski, Radoslaw Skwarlo, Roman Wiercinski, Michal Wesierski.
DZIALANIA CYKLICZNE:
- "Chwila dla Biznesu" - networking w ostatni czwartek miesiaca, godz. 18:00, Hotel Olimp, Wejherowo
- Spotkanie Rady - pierwsza sroda miesiaca
- Akademia NORDA - kwartalne szkolenia (ksiegowosc, social media, mentoring)
- Bale Przedsiebiorcy 2-3 razy w roku
- Wizyty u czlonkow Izby (do 5/miesiac)
WYDARZENIA 2025: Piknik w Parku Wejherowo, konferencja "Zielone Okregi Przemyslowe - Kaszubia" w Rumi (200+ uczestnikow), Tydzien Przedsiebiorczosci w Filharmonii Wejherowskiej (prelegent z Microsoft). 30-40 nowych czlonkow w 2025.
STRATEGIA 2026-2031 - TRZY KIERUNKI:
1. Od zrzeszenia do ekosystemu - wspolne projekty, konsorcja, wzajemne zlecenia
2. Brama do regionu - pierwszy kontakt dla inwestorow z zewnatrz (elektrownia jadrowa, offshore, Kaszubia)
3. Profesjonalizacja komunikacji - spojna obecnosc w mediach spolecznosciowych
CEL na 30-lecie (2027): 200 czlonkow, rozpoznawalna marka, realny wplyw na rozwoj regionu.
REGION: Powiat wejherowski - jeden z najdynamiczniej rosnacych demograficznie obszarow w Polsce. 230 tys. mieszkancow (wzrost 30% w 20 lat), sredni wiek ponizej 40 lat. Projekt "Kaszubia" - Zielony Okreg Przemyslowy to ok. 250 mld PLN inwestycji.
PROJEKTY CZLONKOW: Energo Velo / Zarnowiecki Ring N59 - trasa rowerowa wokol Jeziora Zarnowieckiego (autor: Jacek Pomieczynski, EURA-TECH).
📋 FORMAT DANYCH FIRM (skróty):
- name: nazwa firmy
- cat: kategoria
- profile: link do profilu firmy na nordabiznes.pl
- desc: krótki opis
- history: historia firmy, właściciele, założyciele
- svc: usługi
- comp: kompetencje
- web/tel/mail: kontakt
- city: miasto
- cert: certyfikaty
⭐ REKOMENDACJE - opinie członków o firmach:
- company: nazwa polecanej firmy
- text: treść rekomendacji
- service: kategoria usługi
- author: kto poleca
📰 AKTUALNOŚCI - ostatnie newsy:
- title: tytuł artykułu
- source: źródło (portal)
- date: data publikacji
📅 KALENDARZ - nadchodzące wydarzenia Norda Biznes:
- title: nazwa wydarzenia
- date: data (YYYY-MM-DD)
- type: typ (meeting, networking, webinar)
- location: miejsce
- speaker: prelegent (jeśli jest)
📋 TABLICA B2B - ogłoszenia członków:
- type: "szukam" lub "oferuje"
- category: uslugi/produkty/wspolpraca/praca/inne
- title: tytuł ogłoszenia
- company: firma ogłaszająca
- location: lokalizacja
💬 FORUM - dyskusje społeczności (pełna treść!):
- title: tytuł tematu
- content: treść tematu (do 500 znaków)
- author: imię i nazwisko autora tematu
- category: Propozycja funkcji/Błąd/Pytanie/Ogłoszenie
- status: Nowy/W realizacji/Rozwiązany/Odrzucony
- date: data utworzenia
- url: link do tematu na nordabiznes.pl
- views: liczba wyświetleń (popularność)
- pinned: czy temat jest przypięty (ważny)
- replies_count: liczba odpowiedzi
- replies: lista odpowiedzi (autor, treść, data) - max 5 ostatnich
👥 ZARZĄD I WSPÓLNICY - dane KRS firm (pogrupowane po firmie):
- name: imię i nazwisko osoby
- role: funkcja (Prezes Zarządu, Członek Zarządu, Wspólnik, Prokurent)
- shares: procent udziałów (tylko dla wspólników)
📱 SOCIAL MEDIA - profile firm (pogrupowane po firmie):
- platform: facebook, instagram, linkedin, youtube, twitter, tiktok
- url: link do profilu
- followers: liczba obserwujących
🏪 AUDYT GOOGLE BUSINESS - wyniki audytu profili Google:
- company: nazwa firmy
- score: wynik kompletności profilu (0-100)
- reviews: liczba recenzji
- rating: średnia ocena (1-5)
- maps_url: link do profilu Google Maps
- profile_url: link do profilu firmy na nordabiznes.pl
🔍 AUDYT SEO (PageSpeed) - wyniki analizy stron www firm:
- company: nazwa firmy
- seo: wynik SEO (0-100)
- performance: wydajność strony (0-100)
- accessibility: dostępność (0-100)
- best_practices: najlepsze praktyki (0-100)
- overall: ogólny wynik SEO (0-100)
- url: adres strony www
- profile_url: link do profilu firmy na nordabiznes.pl
⚠️ WAŻNE:
- ZAWSZE podawaj nazwę firmy i kontakt (tel/web/mail jeśli dostępne)
🚫 DANE WRAŻLIWE - BEZWZGLĘDNY ZAKAZ:
NIE odpowiadaj na pytania o:
- PESEL (numer identyfikacyjny)
- Numery dowodów osobistych
- Numery paszportów
- Numery kart kredytowych/debetowych
- Hasła lub dane logowania
- Numery kont bankowych / IBAN
Jeśli użytkownik pyta o PESEL lub inne dane wrażliwe (nawet zamaskowane jako [PESEL UKRYTY]):
- ODPOWIEDZ: "Przepraszam, nie mogę podawać informacji o numerach PESEL ani innych danych wrażliwych. Jest to niezgodne z RODO."
- NIE wymyślaj żadnych połączeń między numerami a osobami
- NIE zgaduj czyj to może być PESEL
- NIE sugeruj żadnych osób z bazy danych
To jest wymóg prawny (RODO/GDPR) - nie ma żadnych wyjątków.
🔗 KLIKALNE LINKI (BEZWZGLĘDNIE OBOWIĄZKOWE!):
⚠️ KRYTYCZNE - KAŻDA nazwa firmy MUSI być linkiem markdown:
- ✅ JEDYNY PRAWIDŁOWY FORMAT: [Nazwa Firmy](URL z pola profile)
- ❌ NIEDOPUSZCZALNE: Nazwa Firmy (bez linku)
- ❌ NIEDOPUSZCZALNE: **Nazwa Firmy** (pogrubienie bez linku)
- ❌ NIEDOPUSZCZALNE: "Nazwa Firmy" (cudzysłowy bez linku)
Przykład: [Pixlab Softwarehouse](https://nordabiznes.pl/company/pixlab-sp-z-o-o)
👤 OSOBY - każda osoba (zarząd/wspólnik) też MUSI być linkiem:
- ✅ PRAWIDŁOWO: [Michał Bogdan Roszman](https://nordabiznes.pl/osoba/123)
- ❌ BŁĘDNIE: **Michał Bogdan Roszman** (pogrubienie bez linku)
- ❌ BŁĘDNIE: Michał Bogdan Roszman (tekst bez linku)
W sekcji ZARZĄD I WSPÓLNICY każda osoba ma pole "profile" z URL - UŻYJ GO!
Inne linki które MUSISZ dołączać gdy dostępne:
• Strona www firmy (pole "web" lub "url")
• Profil Google Maps (pole "maps_url")
• Profile social media (pole "url")
• FORUM: [tytuł tematu](/forum/ID) - np. [moje uwagi do CRM](/forum/18)
• B2B: [tytuł ogłoszenia](/ogloszenia/ID)
• AKTUALNOŚCI: [tytuł](/news/ID)
- Jeśli pytanie o osobę (np. "kto to Roszman") - szukaj w ZARZĄD I WSPÓLNICY, ZAREJESTROWANI PRZEDSTAWICIELE FIRM lub w polu "history"
- Jeśli pytanie "kto jest prezesem firmy X" - szukaj w ZARZĄD I WSPÓLNICY
- Jeśli pytanie "kto pracuje w firmie X" lub "kto reprezentuje firmę X" - szukaj w ZARZĄD I WSPÓLNICY i ZAREJESTROWANI PRZEDSTAWICIELE FIRM
- Jeśli pytanie "kto poleca firmę X" - szukaj w rekomendacjach
- Jeśli pytanie "co słychać" - sprawdź aktualności i wydarzenia
- Jeśli pytanie "kiedy następne spotkanie" - sprawdź kalendarz
- Jeśli pytanie "kto szuka/oferuje X" - sprawdź tablicę B2B
- Jeśli pytanie o dyskusje/tematy - sprawdź forum i ZAWSZE podaj link z pola "url"
- Jeśli pytanie o social media/followers - sprawdź SOCIAL MEDIA (dołącz linki!)
- Jeśli pytanie o Google opinie/recenzje - sprawdź AUDYT GOOGLE BUSINESS (dołącz link do Maps!)
- Jeśli pytanie o SEO/wydajność strony/PageSpeed - sprawdź AUDYT SEO (dołącz link do strony!)
- Odpowiadaj PO POLSKU
✍️ FORMATOWANIE ODPOWIEDZI:
🎯 PRECYZJA: Jeśli użytkownik pyta o konkretną liczbę (np. "wymień 5"), ZAWSZE podaj DOKŁADNIE tyle ile prosi!
- "wymień 5" → podaj dokładnie 5 elementów
- "podaj 3 firmy" → podaj dokładnie 3 firmy
- NIGDY nie podawaj mniej niż użytkownik prosi!
📝 FORMAT LIST - każdy element w JEDNEJ LINII ze szczegółami po przecinku:
PRAWIDŁOWO:
1. **Chwila dla Biznesu** (29.01.2026) - Hotel Olimp, Wejherowo, networking
2. **Rada Izby NORDA** (04.02.2026) - biuro Norda Biznes, spotkanie zarządu
3. **Chwila dla Biznesu** (26.02.2026) - Hotel Olimp, Wejherowo, networking
BŁĘDNIE (NIE RÓB - resetuje numerację):
1. **Chwila dla Biznesu**
Data: 29.01.2026
Miejsce: Hotel Olimp
1. **Rada Izby NORDA**
Data: 04.02.2026
- Używaj **pogrubienia** dla nazw firm i tytułów
- Wszystkie szczegóły elementu w JEDNEJ linii (po myślniku lub w nawiasie)
- Numeracja MUSI być sekwencyjna: 1. 2. 3. 4. 5. (nie 1. 1. 1. 1.)
"""
# Add thinking level specific instructions
if thinking_level == 'high':
system_prompt += """
🧠 TRYB GŁĘBOKIEJ ANALIZY - WYMAGANIA:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
W tym trybie użytkownik oczekuje SZCZEGÓŁOWEJ odpowiedzi:
1. ROZBUDOWANA analiza - podaj WIĘCEJ informacji niż standardowo
2. STRUKTURA - używaj list punktowanych/numerowanych (1. 2. 3.)
3. KONTEKST - dodaj tło, wyjaśnienia, powiązania między informacjami
4. WSZYSTKIE LINKI - każda firma, osoba, temat forum MUSI mieć link markdown
5. CYTATY - jeśli są rekomendacje lub opinie, cytuj je
6. WNIOSKI - na końcu możesz dodać krótkie podsumowanie lub wniosek
Przykład odpowiedzi w trybie głębokiej analizy:
"Pomysły [Jacka Pomieczyńskiego](link) w temacie **moje uwagi do CRM** są bardzo konkretne:
1. **Rozbudowa paska nawigacji** - dodanie zakładek: NordaGPT, B2B, Lokalne Projekty
2. **Usprawnienie kalendarza** - podział na przeszłe/przyszłe wydarzenia
3. **Identyfikacja wizualna** - logo Norda Biznes w interfejsie
W dyskusji [Artur Wiertel](link) pytał o moderację. Pełna treść: [moje uwagi do CRM](/forum/18)"
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
elif thinking_level == 'minimal':
system_prompt += """
⚡ TRYB SZYBKI - odpowiadaj zwięźle ale z PEŁNYMI linkami do firm i tematów.
"""
# Add feedback-based learning context (few-shot examples)
if FEEDBACK_LEARNING_AVAILABLE:
try:
feedback_service = get_feedback_learning_service()
learning_context = feedback_service.format_for_prompt()
if learning_context:
system_prompt += learning_context
except Exception as e:
# Don't fail if feedback learning has issues
import logging
logging.getLogger(__name__).warning(f"Feedback learning error: {e}")
# Add ALL companies in compact JSON format
if context.get('all_companies'):
system_prompt += "\n\n🏢 PEŁNA BAZA FIRM (wybierz najlepsze):\n"
system_prompt += json.dumps(context['all_companies'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add recommendations (peer endorsements)
if context.get('recommendations'):
system_prompt += "\n\n⭐ REKOMENDACJE CZŁONKÓW:\n"
system_prompt += json.dumps(context['recommendations'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add recent news
if context.get('recent_news'):
system_prompt += "\n\n📰 OSTATNIE AKTUALNOŚCI:\n"
system_prompt += json.dumps(context['recent_news'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add ZOPK Knowledge Base context (semantic search results)
if context.get('zopk_knowledge'):
zopk = context['zopk_knowledge']
system_prompt += "\n\n🌍 BAZA WIEDZY ZOPK (Zielony Okręg Przemysłowy Kaszubia):\n"
system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
# Collect all sources for citations at the end
sources_for_citation = []
# Add knowledge chunks (most relevant excerpts)
if zopk.get('chunks'):
system_prompt += "\n📄 FRAGMENTY WIEDZY (semantycznie dopasowane):\n"
for i, chunk in enumerate(zopk['chunks'][:5], 1):
source_name = chunk.get('source', 'nieznane')
source_url = chunk.get('source_url', '')
source_date = chunk.get('date', '')
system_prompt += f"\n[{i}] {chunk.get('summary', '')}\n"
if source_url:
system_prompt += f" Źródło: [{source_name}]({source_url}) ({source_date})\n"
if source_url and source_name:
sources_for_citation.append({
'name': source_name,
'url': source_url,
'date': source_date
})
else:
system_prompt += f" Źródło: {source_name} ({source_date})\n"
if chunk.get('content'):
content_preview = chunk['content'][:300]
if len(chunk['content']) > 300:
content_preview += "..."
system_prompt += f" Treść: {content_preview}\n"
# Add verified facts with source links
if zopk.get('facts'):
system_prompt += "\n📌 ZWERYFIKOWANE FAKTY:\n"
for fact in zopk['facts'][:10]:
confidence_stars = "" * int(fact.get('confidence', 0) * 5)
source_name = fact.get('source_name', '')
source_url = fact.get('source_url', '')
source_date = fact.get('source_date', '')
system_prompt += f"{fact.get('fact', '')} [{confidence_stars}]"
if source_name and source_url:
system_prompt += f" ([{source_name}]({source_url}), {source_date})"
sources_for_citation.append({
'name': source_name,
'url': source_url,
'date': source_date
})
system_prompt += "\n"
if fact.get('value') and fact.get('unit'):
system_prompt += f" Wartość: {fact['value']} {fact['unit']}\n"
# Add key entities
if zopk.get('entities'):
system_prompt += "\n🏢 KLUCZOWE PODMIOTY ZOPK:\n"
for entity in zopk['entities'][:8]:
entity_icon = {
'organization': '🏛️',
'company': '🏢',
'person': '👤',
'location': '📍',
'place': '📍',
'project': '🎯',
'technology': ''
}.get(entity.get('type', ''), '')
system_prompt += f"{entity_icon} {entity.get('name', '')} ({entity.get('type', '')})"
if entity.get('description'):
system_prompt += f" - {entity['description']}"
if entity.get('mentions'):
system_prompt += f" [{entity['mentions']} wzmianek]"
system_prompt += "\n"
# Add available sources for citation
if sources_for_citation:
# Deduplicate sources by URL
unique_sources = {s['url']: s for s in sources_for_citation if s.get('url')}.values()
system_prompt += "\n📚 DOSTĘPNE ŹRÓDŁA DO CYTOWANIA:\n"
for src in list(unique_sources)[:5]:
system_prompt += f"- [{src['name']}]({src['url']}) ({src['date']})\n"
system_prompt += "\n🎯 ZASADY ODPOWIEDZI O ZOPK:\n"
system_prompt += "1. Odpowiadaj na podstawie bazy wiedzy (NIE WYMYŚLAJ faktów)\n"
system_prompt += "2. FORMATUJ odpowiedzi używając:\n"
system_prompt += " - **Pogrubienia** dla kluczowych informacji\n"
system_prompt += " - Listy punktowane dla wielu faktów\n"
system_prompt += " - Nagłówki dla sekcji (## Inwestycje, ## Terminarz)\n"
system_prompt += "3. CYTUJ źródła w tekście: \"Według [nazwa portalu](URL) z dnia RRRR-MM-DD...\"\n"
system_prompt += "4. NA KOŃCU odpowiedzi DODAJ sekcję:\n"
system_prompt += " 📚 **Źródła:**\n"
system_prompt += " - [Nazwa portalu](URL) - krótki opis (data)\n"
system_prompt += "5. Podawaj konkretne daty i liczby gdy dostępne\n"
system_prompt += "6. Jeśli brak informacji w bazie - powiedz wprost: \"Nie mam tej informacji w bazie wiedzy ZOPK\"\n"
system_prompt += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
# Add upcoming events (Etap 2)
if context.get('upcoming_events'):
system_prompt += "\n\n📅 KALENDARZ WYDARZEŃ:\n"
system_prompt += json.dumps(context['upcoming_events'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add B2B classifieds (Etap 2)
if context.get('classifieds'):
system_prompt += "\n\n📋 TABLICA B2B:\n"
system_prompt += json.dumps(context['classifieds'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add forum topics (Etap 2)
if context.get('forum_topics'):
system_prompt += "\n\n💬 FORUM:\n"
system_prompt += json.dumps(context['forum_topics'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add company people - zarząd/wspólnicy (Etap 3)
if context.get('company_people'):
system_prompt += "\n\n👥 ZARZĄD I WSPÓLNICY:\n"
system_prompt += json.dumps(context['company_people'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add registered portal users (Option C)
if context.get('registered_users'):
system_prompt += "\n\n👤 ZAREJESTROWANI PRZEDSTAWICIELE FIRM:\n"
system_prompt += json.dumps(context['registered_users'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add social media per company (Etap 3)
if context.get('company_social_media'):
system_prompt += "\n\n📱 SOCIAL MEDIA:\n"
system_prompt += json.dumps(context['company_social_media'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add GBP audits (Etap 3)
if context.get('gbp_audits'):
system_prompt += "\n\n🏪 AUDYTY GOOGLE BUSINESS:\n"
system_prompt += json.dumps(context['gbp_audits'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add SEO audits (PageSpeed scores)
if context.get('seo_audits'):
system_prompt += "\n\n🔍 AUDYTY SEO (PageSpeed):\n"
system_prompt += json.dumps(context['seo_audits'], ensure_ascii=False, indent=None)
system_prompt += "\n"
# Add conversation history
full_prompt = system_prompt + "\n\n# HISTORIA ROZMOWY:\n"
for msg in context.get('recent_messages', []):
role_name = "Użytkownik" if msg['role'] == 'user' else "Ty"
full_prompt += f"{role_name}: {msg['content']}\n"
full_prompt += f"\nUżytkownik: {user_message}\nTy: "
# Get response with automatic cost tracking to ai_api_costs table
if self.use_global_service and self.gemini_service:
response_text = self.gemini_service.generate_text(
prompt=full_prompt,
feature='ai_chat',
user_id=user_id,
temperature=0.7,
thinking_level=thinking_level
)
# Post-process to ensure links are added even if AI didn't format them
return self._postprocess_links(response_text, context)
else:
# Legacy: direct API call (no centralized cost tracking)
response = self.model.generate_content(full_prompt)
# Post-process to ensure links are added even if AI didn't format them
return self._postprocess_links(response.text, context)
def _postprocess_links(self, text: str, context: Dict) -> str:
"""
Post-process AI response to add markdown links for companies and people.
This ensures consistent linking regardless of AI behavior.
Args:
text: AI response text
context: Context dict with company_people data
Returns:
Text with names replaced by markdown links
"""
import re
# Build lookup dict: name -> url
name_to_url = {}
# Extract companies and people from company_people context
company_people = context.get('company_people', {})
for company_name, data in company_people.items():
# Add company
if data.get('profile'):
name_to_url[company_name] = data['profile']
# Add people - normalize to Title Case (DB stores UPPERCASE)
for person in data.get('people', []):
if person.get('name') and person.get('profile'):
# Convert "MICHAŁ BOGDAN ROSZMAN" to "Michał Bogdan Roszman"
normalized_name = person['name'].title()
name_to_url[normalized_name] = person['profile']
# Also extract from companies list (context['companies'] has profile URLs)
# Companies format: list of dicts with 'name' and 'profile'
# This is populated by _company_to_compact_dict
# Sort by name length (longest first) to avoid partial replacements
sorted_names = sorted(name_to_url.keys(), key=len, reverse=True)
for name in sorted_names:
url = name_to_url[name]
if not name or not url:
continue
# Skip if already a markdown link
# Pattern: [Name](url) - already linked
already_linked = re.search(r'\[' + re.escape(name) + r'\]\([^)]+\)', text)
if already_linked:
continue
# Replace **Name** (bold) with [Name](url)
bold_pattern = r'\*\*' + re.escape(name) + r'\*\*'
if re.search(bold_pattern, text):
text = re.sub(bold_pattern, f'[{name}]({url})', text, count=1)
continue
# Replace plain "Name" at word boundaries (but not if already in link)
# Be careful not to replace inside existing markdown
plain_pattern = r'(?<!\[)(?<!\()' + re.escape(name) + r'(?!\])(?!\))'
if re.search(plain_pattern, text):
# Only replace first occurrence to avoid over-linking
text = re.sub(plain_pattern, f'[{name}]({url})', text, count=1)
return text
def _calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""
Calculate cost in USD
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
Returns:
Total cost in USD
"""
# Gemini 2.5 Flash pricing (per 1M tokens)
input_cost = (input_tokens / 1_000_000) * 0.075
output_cost = (output_tokens / 1_000_000) * 0.30
return input_cost + output_cost