Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
587 lines
20 KiB
Python
587 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
NordaGPT Context Builder
|
|
========================
|
|
|
|
Selective data loader for the Smart Router.
|
|
Instead of loading ALL data for every query, this module loads
|
|
only the categories requested by the Smart Router.
|
|
|
|
Usage:
|
|
from context_builder import build_selective_context
|
|
|
|
context = build_selective_context(
|
|
data_needed=["companies_all", "events"],
|
|
conversation_id=42,
|
|
current_message="Szukam firmy budowlanej",
|
|
user_context={"user_id": 5, "company_id": 12}
|
|
)
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-03-28
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, date, timedelta
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from database import (
|
|
SessionLocal,
|
|
Company,
|
|
Category,
|
|
AIChatMessage,
|
|
ZOPKNews,
|
|
NordaEvent,
|
|
Classified,
|
|
ForumTopic,
|
|
ForumReply,
|
|
Person,
|
|
CompanyPerson,
|
|
CompanySocialMedia,
|
|
GBPAudit,
|
|
CompanyWebsiteAnalysis,
|
|
User,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_selective_context(
|
|
data_needed: List[str],
|
|
conversation_id: int,
|
|
current_message: str,
|
|
user_context: Optional[Dict] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build context dict for _query_ai() by loading only the requested data
|
|
categories.
|
|
|
|
Always included (regardless of data_needed):
|
|
- basic stats: total_companies, categories
|
|
- conversation history: last 10 messages for conversation_id
|
|
|
|
Args:
|
|
data_needed: List of category strings such as:
|
|
"companies_all", "companies_filtered:IT",
|
|
"companies_single:pixlab-sp-z-o-o", "events", "news",
|
|
"classifieds", "forum", "company_people",
|
|
"registered_users", "social_media", "audits"
|
|
conversation_id: AIChatMessage conversation ID for history loading.
|
|
current_message: The user's current message (passed through to context).
|
|
user_context: Optional dict with extra user info (user_id, company_id, …).
|
|
|
|
Returns:
|
|
Context dict compatible with nordabiz_chat.py's _query_ai().
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
context: Dict[str, Any] = {}
|
|
|
|
# ---------------------------------------------------------------
|
|
# ALWAYS: basic stats
|
|
# ---------------------------------------------------------------
|
|
_load_basic_stats(db, context)
|
|
|
|
# ---------------------------------------------------------------
|
|
# ALWAYS: conversation history
|
|
# ---------------------------------------------------------------
|
|
_load_conversation_history(db, conversation_id, context)
|
|
|
|
# ---------------------------------------------------------------
|
|
# SELECTIVE: load only what the router asked for
|
|
# ---------------------------------------------------------------
|
|
for category in data_needed:
|
|
try:
|
|
_load_category(db, category, context)
|
|
except Exception as exc:
|
|
logger.warning("context_builder: failed to load '%s': %s", category, exc)
|
|
|
|
# Pass-through extras
|
|
context['current_message'] = current_message
|
|
if user_context:
|
|
context['user_context'] = user_context
|
|
|
|
return context
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_basic_stats(db, context: Dict) -> None:
|
|
"""Always-loaded: total active companies and category breakdown."""
|
|
all_active = db.query(Company).filter_by(status='active').count()
|
|
context['total_companies'] = all_active
|
|
|
|
categories = db.query(Category).all()
|
|
context['categories'] = [
|
|
{
|
|
'name': cat.name,
|
|
'slug': cat.slug,
|
|
'company_count': db.query(Company).filter_by(
|
|
category_id=cat.id, status='active'
|
|
).count(),
|
|
}
|
|
for cat in categories
|
|
]
|
|
|
|
|
|
def _load_conversation_history(db, conversation_id: int, context: Dict) -> None:
|
|
"""Always-loaded: last 10 messages in the conversation."""
|
|
messages = (
|
|
db.query(AIChatMessage)
|
|
.filter_by(conversation_id=conversation_id)
|
|
.order_by(AIChatMessage.created_at.desc())
|
|
.limit(10)
|
|
.all()
|
|
)
|
|
context['recent_messages'] = [
|
|
{'role': msg.role, 'content': msg.content}
|
|
for msg in reversed(messages)
|
|
]
|
|
|
|
|
|
def _load_category(db, category: str, context: Dict) -> None:
|
|
"""Dispatch a single category string to the appropriate loader."""
|
|
if category == 'companies_all':
|
|
_load_companies_all(db, context)
|
|
elif category.startswith('companies_filtered:'):
|
|
cat_name = category.split(':', 1)[1]
|
|
_load_companies_filtered(db, cat_name, context)
|
|
elif category.startswith('companies_single:'):
|
|
identifier = category.split(':', 1)[1]
|
|
_load_company_single(db, identifier, context)
|
|
elif category == 'events':
|
|
_load_events(db, context)
|
|
elif category == 'news':
|
|
_load_news(db, context)
|
|
elif category == 'classifieds':
|
|
_load_classifieds(db, context)
|
|
elif category == 'forum':
|
|
_load_forum(db, context)
|
|
elif category == 'company_people':
|
|
_load_company_people(db, context)
|
|
elif category == 'registered_users':
|
|
_load_registered_users(db, context)
|
|
elif category == 'social_media':
|
|
_load_social_media(db, context)
|
|
elif category == 'audits':
|
|
_load_audits(db, context)
|
|
else:
|
|
logger.debug("context_builder: unknown category '%s' — skipped", category)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Category loaders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_companies_all(db, context: Dict) -> None:
|
|
"""Load all active companies in compact format."""
|
|
companies = db.query(Company).filter_by(status='active').all()
|
|
context['all_companies'] = [_company_to_compact_dict(c) for c in companies]
|
|
|
|
|
|
def _load_companies_filtered(db, cat_name: str, context: Dict) -> None:
|
|
"""Load active companies filtered by category name (case-insensitive)."""
|
|
category = db.query(Category).filter(
|
|
Category.name.ilike(cat_name)
|
|
).first()
|
|
|
|
if category:
|
|
companies = db.query(Company).filter_by(
|
|
category_id=category.id, status='active'
|
|
).all()
|
|
else:
|
|
# Fallback: search by slug
|
|
category = db.query(Category).filter(
|
|
Category.slug.ilike(cat_name)
|
|
).first()
|
|
if category:
|
|
companies = db.query(Company).filter_by(
|
|
category_id=category.id, status='active'
|
|
).all()
|
|
else:
|
|
companies = []
|
|
|
|
context.setdefault('filtered_companies', [])
|
|
context['filtered_companies'].extend(
|
|
[_company_to_compact_dict(c) for c in companies]
|
|
)
|
|
context['filter_category'] = cat_name
|
|
|
|
|
|
def _load_company_single(db, identifier: str, context: Dict) -> None:
|
|
"""Load a single company by slug or partial name match."""
|
|
# Try slug first (exact match)
|
|
company = db.query(Company).filter_by(slug=identifier, status='active').first()
|
|
|
|
if not company:
|
|
# Partial name match
|
|
company = (
|
|
db.query(Company)
|
|
.filter(
|
|
Company.name.ilike(f'%{identifier}%'),
|
|
Company.status == 'active',
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if company:
|
|
context.setdefault('single_companies', [])
|
|
context['single_companies'].append(_company_to_compact_dict(company))
|
|
else:
|
|
logger.debug("context_builder: company '%s' not found", identifier)
|
|
|
|
|
|
def _load_events(db, context: Dict) -> None:
|
|
"""Load upcoming events (next 60 days)."""
|
|
today = date.today()
|
|
cutoff = today + timedelta(days=60)
|
|
upcoming = (
|
|
db.query(NordaEvent)
|
|
.filter(
|
|
NordaEvent.event_date >= today,
|
|
NordaEvent.event_date <= cutoff,
|
|
)
|
|
.order_by(NordaEvent.event_date)
|
|
.limit(15)
|
|
.all()
|
|
)
|
|
context['upcoming_events'] = [
|
|
{
|
|
'title': (event.title or '')[:80],
|
|
'date': event.event_date.strftime('%Y-%m-%d') if event.event_date else '',
|
|
'type': event.event_type or 'meeting',
|
|
'location': (event.location or '')[:50],
|
|
'speaker': (event.speaker_name or '')[:30],
|
|
}
|
|
for event in upcoming
|
|
]
|
|
|
|
|
|
def _load_news(db, context: Dict) -> None:
|
|
"""Load recent approved ZOPK news (last 30 days, max 10)."""
|
|
cutoff = datetime.now() - timedelta(days=30)
|
|
news_items = (
|
|
db.query(ZOPKNews)
|
|
.filter(
|
|
ZOPKNews.status.in_(['approved', 'auto_approved']),
|
|
ZOPKNews.published_at >= cutoff,
|
|
)
|
|
.order_by(ZOPKNews.published_at.desc())
|
|
.limit(10)
|
|
.all()
|
|
)
|
|
context['recent_news'] = [
|
|
{
|
|
'title': news.title,
|
|
'description': (news.description or '')[:400],
|
|
'summary': (news.ai_summary or '')[:300],
|
|
'source': news.source_name or '',
|
|
'url': news.url or '',
|
|
'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '',
|
|
'type': news.news_type or 'news',
|
|
'keywords': (news.keywords or [])[:5],
|
|
}
|
|
for news in news_items
|
|
]
|
|
|
|
|
|
def _load_classifieds(db, context: Dict) -> None:
|
|
"""Load active non-test B2B classifieds (max 20)."""
|
|
classifieds = (
|
|
db.query(Classified)
|
|
.filter(
|
|
Classified.is_active == True,
|
|
Classified.is_test == False,
|
|
)
|
|
.order_by(Classified.created_at.desc())
|
|
.limit(20)
|
|
.all()
|
|
)
|
|
context['classifieds'] = [
|
|
{
|
|
'type': c.listing_type,
|
|
'category': c.category,
|
|
'title': c.title,
|
|
'description': (c.description or '')[:400],
|
|
'company': c.company.name if c.company else '',
|
|
'author': c.author.name if c.author else '',
|
|
'budget': c.budget_info or '',
|
|
'location': c.location_info or '',
|
|
'date': c.created_at.strftime('%Y-%m-%d') if c.created_at else '',
|
|
'views': c.views_count or 0,
|
|
'url': f'/classifieds/{c.id}',
|
|
}
|
|
for c in classifieds
|
|
]
|
|
|
|
|
|
def _load_forum(db, context: Dict) -> None:
|
|
"""Load recent forum topics with replies (non-test, max 15)."""
|
|
topics = (
|
|
db.query(ForumTopic)
|
|
.options(
|
|
joinedload(ForumTopic.author),
|
|
joinedload(ForumTopic.replies).joinedload(ForumReply.author),
|
|
)
|
|
.filter(ForumTopic.category != 'test')
|
|
.order_by(ForumTopic.created_at.desc())
|
|
.limit(15)
|
|
.all()
|
|
)
|
|
|
|
forum_data = []
|
|
for topic in topics:
|
|
topic_data = {
|
|
'title': topic.title,
|
|
'content': (topic.content or '')[:500],
|
|
'author': topic.author.name if topic.author else 'Anonim',
|
|
'category': topic.category_label if hasattr(topic, 'category_label') else topic.category,
|
|
'status': topic.status_label if hasattr(topic, 'status_label') else topic.status,
|
|
'date': topic.created_at.strftime('%Y-%m-%d') if topic.created_at else '',
|
|
'url': f'/forum/{topic.id}',
|
|
'views': topic.views_count or 0,
|
|
'pinned': topic.is_pinned,
|
|
'replies_count': topic.reply_count if hasattr(topic, 'reply_count') else 0,
|
|
'has_attachments': bool(topic.attachments) if topic.attachments else False,
|
|
}
|
|
if topic.replies:
|
|
sorted_replies = sorted(topic.replies, key=lambda r: r.created_at, reverse=True)
|
|
topic_data['replies'] = [
|
|
{
|
|
'author': reply.author.name if reply.author else 'Anonim',
|
|
'content': (reply.content or '')[:300],
|
|
'date': reply.created_at.strftime('%Y-%m-%d') if reply.created_at else '',
|
|
}
|
|
for reply in sorted_replies[:5]
|
|
]
|
|
forum_data.append(topic_data)
|
|
|
|
context['forum_topics'] = forum_data
|
|
|
|
|
|
def _load_company_people(db, context: Dict) -> None:
|
|
"""Load KRS company-people relationships grouped by company."""
|
|
company_people = (
|
|
db.query(CompanyPerson)
|
|
.options(
|
|
joinedload(CompanyPerson.person),
|
|
joinedload(CompanyPerson.company),
|
|
)
|
|
.order_by(CompanyPerson.company_id)
|
|
.all()
|
|
)
|
|
|
|
people_by_company: Dict[str, Any] = {}
|
|
for cp in company_people:
|
|
company_name = cp.company.name if cp.company else 'Nieznana'
|
|
company_profile = (
|
|
f"https://nordabiznes.pl/company/{cp.company.slug}"
|
|
if cp.company and cp.company.slug
|
|
else None
|
|
)
|
|
if company_name not in people_by_company:
|
|
people_by_company[company_name] = {'profile': company_profile, 'people': []}
|
|
|
|
person_info: Dict[str, Any] = {
|
|
'name': cp.person.full_name() if cp.person else '',
|
|
'profile': f"https://nordabiznes.pl/osoba/{cp.person.id}" if cp.person else None,
|
|
'role': (cp.role or '')[:30],
|
|
}
|
|
if cp.shares_percent:
|
|
person_info['shares'] = f"{cp.shares_percent}%"
|
|
people_by_company[company_name]['people'].append(person_info)
|
|
|
|
context['company_people'] = people_by_company
|
|
|
|
|
|
def _load_registered_users(db, context: Dict) -> None:
|
|
"""Load active portal users with company assignments grouped by company."""
|
|
users = (
|
|
db.query(User)
|
|
.filter(
|
|
User.is_active == True,
|
|
User.company_id.isnot(None),
|
|
)
|
|
.options(joinedload(User.company))
|
|
.all()
|
|
)
|
|
|
|
role_labels = {
|
|
'MANAGER': 'administrator profilu',
|
|
'EMPLOYEE': 'pracownik',
|
|
'VIEWER': 'obserwator',
|
|
}
|
|
|
|
users_by_company: Dict[str, Any] = {}
|
|
for u in users:
|
|
company_name = u.company.name if u.company else 'Nieznana'
|
|
company_profile = (
|
|
f"https://nordabiznes.pl/company/{u.company.slug}"
|
|
if u.company and u.company.slug
|
|
else None
|
|
)
|
|
if company_name not in users_by_company:
|
|
users_by_company[company_name] = {'profile': company_profile, 'users': []}
|
|
users_by_company[company_name]['users'].append({
|
|
'name': u.name,
|
|
'email': u.email,
|
|
'portal_role': role_labels.get(u.company_role, ''),
|
|
'member': u.is_norda_member,
|
|
'verified': u.is_verified,
|
|
})
|
|
|
|
context['registered_users'] = users_by_company
|
|
|
|
|
|
def _load_social_media(db, context: Dict) -> None:
|
|
"""Load valid company social media profiles grouped by company."""
|
|
social_items = (
|
|
db.query(CompanySocialMedia)
|
|
.filter(CompanySocialMedia.is_valid == True)
|
|
.options(joinedload(CompanySocialMedia.company))
|
|
.all()
|
|
)
|
|
|
|
social_by_company: Dict[str, List] = {}
|
|
for sm in social_items:
|
|
company_name = sm.company.name if sm.company else 'Nieznana'
|
|
if company_name not in social_by_company:
|
|
social_by_company[company_name] = []
|
|
social_by_company[company_name].append({
|
|
'platform': sm.platform,
|
|
'url': sm.url or '',
|
|
'followers': sm.followers_count or 0,
|
|
})
|
|
|
|
context['company_social_media'] = social_by_company
|
|
|
|
|
|
def _load_audits(db, context: Dict) -> None:
|
|
"""Load latest GBP audits and SEO PageSpeed scores."""
|
|
from sqlalchemy import func
|
|
|
|
# GBP audits — one per company, most recent
|
|
latest_subq = (
|
|
db.query(
|
|
GBPAudit.company_id,
|
|
func.max(GBPAudit.audit_date).label('max_date'),
|
|
)
|
|
.group_by(GBPAudit.company_id)
|
|
.subquery()
|
|
)
|
|
latest_audits = (
|
|
db.query(GBPAudit)
|
|
.join(
|
|
latest_subq,
|
|
(GBPAudit.company_id == latest_subq.c.company_id)
|
|
& (GBPAudit.audit_date == latest_subq.c.max_date),
|
|
)
|
|
.options(joinedload(GBPAudit.company))
|
|
.all()
|
|
)
|
|
context['gbp_audits'] = [
|
|
{
|
|
'company': audit.company.name if audit.company else '',
|
|
'score': audit.completeness_score or 0,
|
|
'reviews': audit.review_count or 0,
|
|
'rating': float(audit.average_rating) if audit.average_rating else 0,
|
|
'maps_url': audit.google_maps_url or '',
|
|
'profile_url': (
|
|
f'https://nordabiznes.pl/company/{audit.company.slug}'
|
|
if audit.company
|
|
else ''
|
|
),
|
|
}
|
|
for audit in latest_audits
|
|
]
|
|
|
|
# SEO / PageSpeed audits
|
|
seo_audits = (
|
|
db.query(CompanyWebsiteAnalysis)
|
|
.filter(CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None))
|
|
.options(joinedload(CompanyWebsiteAnalysis.company))
|
|
.all()
|
|
)
|
|
context['seo_audits'] = [
|
|
{
|
|
'company': audit.company.name if audit.company else '',
|
|
'seo': audit.pagespeed_seo_score or 0,
|
|
'performance': audit.pagespeed_performance_score or 0,
|
|
'accessibility': audit.pagespeed_accessibility_score or 0,
|
|
'best_practices': audit.pagespeed_best_practices_score or 0,
|
|
'overall': audit.seo_overall_score or 0,
|
|
'url': audit.company.website if audit.company else '',
|
|
'profile_url': (
|
|
f'https://nordabiznes.pl/company/{audit.company.slug}'
|
|
if audit.company
|
|
else ''
|
|
),
|
|
}
|
|
for audit in seo_audits
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Company compact format (mirrors nordabiz_chat._company_to_compact_dict)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _company_to_compact_dict(c: Company) -> Dict[str, Any]:
|
|
"""
|
|
Convert a Company ORM object to a compact token-efficient dict.
|
|
Format matches nordabiz_chat.py's _company_to_compact_dict() exactly.
|
|
"""
|
|
compact: Dict[str, Any] = {
|
|
'name': c.name,
|
|
'cat': c.category.name if c.category else None,
|
|
'profile': f'https://nordabiznes.pl/company/{c.slug}',
|
|
}
|
|
|
|
# Only include non-empty fields to save tokens
|
|
if c.description_short:
|
|
compact['desc'] = c.description_short
|
|
if c.description_full:
|
|
compact['about'] = c.description_full
|
|
if c.founding_history:
|
|
compact['history'] = c.founding_history
|
|
if c.core_values:
|
|
compact['values'] = c.core_values
|
|
if c.services_offered:
|
|
compact['offerings'] = c.services_offered
|
|
if c.technologies_used:
|
|
compact['tech'] = c.technologies_used
|
|
if c.services:
|
|
services = [cs.service.name for cs in c.services if cs.service]
|
|
if services:
|
|
compact['svc'] = services
|
|
if c.competencies:
|
|
competencies = [cc.competency.name for cc in c.competencies if cc.competency]
|
|
if competencies:
|
|
compact['comp'] = competencies
|
|
if c.website:
|
|
compact['web'] = c.website
|
|
if c.phone:
|
|
compact['tel'] = c.phone
|
|
if c.email:
|
|
compact['mail'] = c.email
|
|
if c.address_city:
|
|
compact['city'] = c.address_city
|
|
if c.year_established:
|
|
compact['year'] = c.year_established
|
|
if c.certifications:
|
|
certs = [cert.name for cert in c.certifications if cert.is_active]
|
|
if certs:
|
|
compact['cert'] = certs[:3]
|
|
|
|
return compact
|