Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
316 lines
11 KiB
Python
316 lines
11 KiB
Python
"""
|
|
NordaGPT Smart Router
|
|
=====================
|
|
Classifies query complexity and decides which data categories to load
|
|
and which AI model to use. Provides fast keyword-based routing with
|
|
AI-powered fallback when keyword matching is uncertain.
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-03-28
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Model selection mapping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MODEL_MAP = {
|
|
'simple': {'model': '3.1-flash-lite', 'thinking': 'minimal'},
|
|
'medium': {'model': '3-flash', 'thinking': 'low'},
|
|
'complex': {'model': '3-flash', 'thinking': 'high'},
|
|
}
|
|
|
|
# All data categories that can be loaded
|
|
ALL_CATEGORIES = [
|
|
'companies_all',
|
|
'events',
|
|
'news',
|
|
'classifieds',
|
|
'forum',
|
|
'company_people',
|
|
'registered_users',
|
|
'social_media',
|
|
'audits',
|
|
]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Keyword maps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Keywords that signal "no external data needed" — quick personal/greeting queries
|
|
_SIMPLE_KEYWORDS = [
|
|
'kim jestem', 'co wiesz o mnie', 'co o mnie wiesz',
|
|
'moje dane', 'mój profil', 'kim ty jesteś', 'jak masz na imię',
|
|
'co potrafisz', 'pomoc', 'help',
|
|
]
|
|
|
|
_GREETING_PATTERNS = [
|
|
'cześć', 'hej', 'helo', 'witaj', 'dzień dobry', 'dobry wieczór',
|
|
'siema', 'yo', 'hoi', 'hello', 'hi',
|
|
]
|
|
|
|
# Keyword → category mapping (order matters — first match wins for single-category)
|
|
_CATEGORY_KEYWORDS: dict[str, list[str]] = {
|
|
'companies_all': [
|
|
'wszystkie firmy', 'ile firm', 'lista firm', 'katalog firm',
|
|
'porównaj firmy', 'katalog',
|
|
],
|
|
'events': [
|
|
'wydarzen', 'spotkanie', 'spotkania', 'kalendarz', 'konferencja',
|
|
'szkolenie', 'kiedy',
|
|
],
|
|
'news': [
|
|
'aktualności', 'nowości', 'wiadomości', 'pej', 'atom',
|
|
'elektrownia', 'zopk', 'projekt jądrowy',
|
|
],
|
|
'classifieds': [
|
|
'ogłoszenie', 'b2b', 'zlecenie', 'oferta', 'szukam', 'oferuję',
|
|
'ogłoszenia', 'oferty',
|
|
],
|
|
'forum': [
|
|
'forum', 'dyskusja', 'temat', 'wątek', 'post', 'dyskusje',
|
|
],
|
|
'company_people': [
|
|
'zarząd', 'krs', 'właściciel', 'prezes', 'udziały', 'wspólnik',
|
|
'reprezentacja', 'zarządzający',
|
|
],
|
|
'registered_users': [
|
|
'użytkownik', 'kto jest', 'profil', 'zarejestrowany', 'członek',
|
|
'konta', 'zarejestrowani',
|
|
],
|
|
'social_media': [
|
|
'facebook', 'instagram', 'linkedin', 'social media',
|
|
'media społeczn', 'tiktok', 'twitter', 'youtube',
|
|
],
|
|
'audits': [
|
|
'seo', 'google', 'gbp', 'opinie', 'ocena', 'pozycjonowanie',
|
|
'wyniki google',
|
|
],
|
|
}
|
|
|
|
|
|
def _normalize(text: str) -> str:
|
|
"""Lowercase and strip extra whitespace."""
|
|
return re.sub(r'\s+', ' ', text.lower().strip())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fast keyword-based routing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def route_query_fast(message: str, user_context: dict) -> Optional[dict]:
|
|
"""
|
|
Attempt fast keyword-based routing without any API call.
|
|
|
|
Returns a routing decision dict if confident, or None if uncertain
|
|
(caller should fall back to AI routing).
|
|
"""
|
|
msg = _normalize(message)
|
|
|
|
# Very short messages or greetings → simple, no data needed
|
|
if len(message.strip()) < 30:
|
|
for greeting in _GREETING_PATTERNS:
|
|
if greeting in msg:
|
|
logger.debug("fast_router: greeting detected")
|
|
return _build_result('simple', [], 'fast')
|
|
|
|
# Personal / meta questions → simple, no data needed
|
|
for kw in _SIMPLE_KEYWORDS:
|
|
if kw in msg:
|
|
logger.debug("fast_router: personal/meta query detected")
|
|
return _build_result('simple', [], 'fast')
|
|
|
|
# Scan for category keywords
|
|
matched_categories: list[str] = []
|
|
for category, keywords in _CATEGORY_KEYWORDS.items():
|
|
for kw in keywords:
|
|
if kw in msg:
|
|
if category not in matched_categories:
|
|
matched_categories.append(category)
|
|
break # no need to check more keywords for this category
|
|
|
|
if not matched_categories:
|
|
# No confident match — signal caller to use AI routing
|
|
logger.debug("fast_router: no confident match, deferring to AI router")
|
|
return None
|
|
|
|
# Determine complexity by number of matched categories + message length + intent
|
|
is_long = len(message) > 150
|
|
multi_question = any(w in msg for w in ['jak ', 'jakie ', 'w jaki sposób', 'kto mógł'])
|
|
is_strategic = any(w in msg for w in [
|
|
'partner', 'współprac', 'inwestow', 'konsorcj', 'strategi',
|
|
'local content', 'projekt pej', 'elektrowni', 'serwerowni',
|
|
'porównaj', 'rekomend', 'analiz', 'doradzić', 'zaplanow'
|
|
])
|
|
|
|
if is_strategic or len(matched_categories) >= 3 or (len(matched_categories) >= 2 and is_long):
|
|
complexity = 'complex'
|
|
elif len(matched_categories) >= 2 or is_long or multi_question:
|
|
complexity = 'medium'
|
|
else:
|
|
complexity = 'simple' if len(message) < 80 else 'medium'
|
|
|
|
logger.debug(
|
|
"fast_router: matched categories=%s complexity=%s",
|
|
matched_categories, complexity,
|
|
)
|
|
return _build_result(complexity, matched_categories, 'fast')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AI-powered routing (fallback)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_AI_ROUTER_PROMPT = """Jesteś klasyfikatorem zapytań dla systemu NordaGPT — asystenta AI polskiej Izby Biznesu Norda Biznes.
|
|
|
|
Twoim zadaniem jest przeanalizowanie zapytania użytkownika i zdecydowanie:
|
|
1. Jak złożone jest zapytanie (complexity)
|
|
2. Jakie kategorie danych są potrzebne do odpowiedzi (data_needed)
|
|
|
|
Dostępne kategorie danych:
|
|
- companies_all — lista i profile firm w Izbie
|
|
- events — wydarzenia, spotkania, konferencje, szkolenia, kalendarz
|
|
- news — aktualności, nowości, PEJ, atom, elektrownia jądrowa, ZOPK
|
|
- classifieds — ogłoszenia B2B, zlecenia, oferty, szukam/oferuję
|
|
- forum — dyskusje, tematy, wątki, posty
|
|
- company_people — zarząd, właściciel, prezes, udziały, KRS
|
|
- registered_users — użytkownicy portalu, profile, zarejestrowani członkowie
|
|
- social_media — Facebook, Instagram, LinkedIn, YouTube, TikTok
|
|
- audits — SEO, Google Business Profile, opinie, oceny Google
|
|
|
|
Poziomy złożoności:
|
|
- simple — pytanie o użytkownika, powitanie, ogólne pytanie, nie potrzeba danych kontekstowych
|
|
- medium — pytanie dotyczące jednej lub dwóch kategorii
|
|
- complex — pytanie przekrojowe, wymaga wielu kategorii lub głębokiej analizy
|
|
|
|
Zwróć TYLKO JSON (bez markdown, bez objaśnień):
|
|
{{
|
|
"complexity": "simple|medium|complex",
|
|
"data_needed": ["category1", "category2"]
|
|
}}
|
|
|
|
Zapytanie użytkownika: {message}
|
|
"""
|
|
|
|
|
|
def route_query_ai(message: str, user_context: dict, gemini_service) -> dict:
|
|
"""
|
|
Use Gemini Flash-Lite to classify the query and determine data needs.
|
|
Falls back to _fallback_route() on any error.
|
|
"""
|
|
prompt = _AI_ROUTER_PROMPT.format(message=message)
|
|
t0 = time.monotonic()
|
|
try:
|
|
response_text = gemini_service.generate_text(
|
|
prompt=prompt,
|
|
temperature=0.0,
|
|
max_tokens=256,
|
|
model='3.1-flash-lite',
|
|
thinking_level='minimal',
|
|
feature='smart_router',
|
|
)
|
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
|
|
|
# Strip markdown code fences if present
|
|
cleaned = re.sub(r'^```[a-z]*\s*', '', response_text.strip(), flags=re.IGNORECASE)
|
|
cleaned = re.sub(r'\s*```$', '', cleaned)
|
|
|
|
parsed = json.loads(cleaned)
|
|
|
|
complexity = parsed.get('complexity', 'medium')
|
|
if complexity not in MODEL_MAP:
|
|
complexity = 'medium'
|
|
|
|
data_needed = parsed.get('data_needed', [])
|
|
# Sanitize — only allow known categories
|
|
data_needed = [c for c in data_needed if c in ALL_CATEGORIES]
|
|
|
|
logger.debug(
|
|
"ai_router: complexity=%s data_needed=%s latency=%dms",
|
|
complexity, data_needed, latency_ms,
|
|
)
|
|
result = _build_result(complexity, data_needed, 'ai')
|
|
result['router_latency_ms'] = latency_ms
|
|
return result
|
|
|
|
except json.JSONDecodeError as exc:
|
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
|
logger.warning("ai_router: JSON parse error (%s), falling back", exc)
|
|
result = _fallback_route()
|
|
result['router_latency_ms'] = latency_ms
|
|
return result
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
|
logger.warning("ai_router: unexpected error (%s), falling back", exc)
|
|
result = _fallback_route()
|
|
result['router_latency_ms'] = latency_ms
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def route_query(message: str, user_context: dict, gemini_service=None) -> dict:
|
|
"""
|
|
Classify query complexity and determine data loading strategy.
|
|
|
|
Tries fast keyword routing first. Falls back to AI routing if uncertain.
|
|
Falls back to full-context safe default if AI routing is unavailable or fails.
|
|
|
|
Args:
|
|
message: The user's query string.
|
|
user_context: Dict with user metadata (e.g. user_id, roles).
|
|
gemini_service: Optional GeminiService instance for AI fallback.
|
|
|
|
Returns:
|
|
Routing decision dict with keys:
|
|
complexity, data_needed, model, thinking, routed_by,
|
|
router_latency_ms (AI routing only).
|
|
"""
|
|
# 1. Try fast keyword routing
|
|
fast_result = route_query_fast(message, user_context)
|
|
if fast_result is not None:
|
|
return fast_result
|
|
|
|
# 2. Try AI routing if service is available
|
|
if gemini_service is not None:
|
|
return route_query_ai(message, user_context, gemini_service)
|
|
|
|
# 3. Safe fallback — load everything
|
|
logger.debug("route_query: no gemini_service available, using fallback")
|
|
return _fallback_route()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_result(complexity: str, data_needed: list, routed_by: str) -> dict:
|
|
"""Build a standardised routing result dict."""
|
|
model_cfg = MODEL_MAP[complexity]
|
|
return {
|
|
'complexity': complexity,
|
|
'data_needed': data_needed,
|
|
'model': model_cfg['model'],
|
|
'thinking': model_cfg['thinking'],
|
|
'routed_by': routed_by,
|
|
}
|
|
|
|
|
|
def _fallback_route() -> dict:
|
|
"""
|
|
Safe default routing — loads all data categories with medium complexity.
|
|
Replicates the current nordabiz_chat.py behaviour of loading everything.
|
|
"""
|
|
logger.debug("smart_router: using fallback route (all categories)")
|
|
result = _build_result('medium', ALL_CATEGORIES[:], 'fallback')
|
|
return result
|