nordabiz/context_builder.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

587 lines
20 KiB
Python

#!/usr/bin/env python3
"""
NordaGPT Context Builder
========================
Selective data loader for the Smart Router.
Instead of loading ALL data for every query, this module loads
only the categories requested by the Smart Router.
Usage:
from context_builder import build_selective_context
context = build_selective_context(
data_needed=["companies_all", "events"],
conversation_id=42,
current_message="Szukam firmy budowlanej",
user_context={"user_id": 5, "company_id": 12}
)
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-03-28
"""
import logging
from datetime import datetime, date, timedelta
from typing import Dict, List, Any, Optional
from sqlalchemy.orm import joinedload
from database import (
SessionLocal,
Company,
Category,
AIChatMessage,
ZOPKNews,
NordaEvent,
Classified,
ForumTopic,
ForumReply,
Person,
CompanyPerson,
CompanySocialMedia,
GBPAudit,
CompanyWebsiteAnalysis,
User,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_selective_context(
data_needed: List[str],
conversation_id: int,
current_message: str,
user_context: Optional[Dict] = None,
) -> Dict[str, Any]:
"""
Build context dict for _query_ai() by loading only the requested data
categories.
Always included (regardless of data_needed):
- basic stats: total_companies, categories
- conversation history: last 10 messages for conversation_id
Args:
data_needed: List of category strings such as:
"companies_all", "companies_filtered:IT",
"companies_single:pixlab-sp-z-o-o", "events", "news",
"classifieds", "forum", "company_people",
"registered_users", "social_media", "audits"
conversation_id: AIChatMessage conversation ID for history loading.
current_message: The user's current message (passed through to context).
user_context: Optional dict with extra user info (user_id, company_id, …).
Returns:
Context dict compatible with nordabiz_chat.py's _query_ai().
"""
db = SessionLocal()
try:
context: Dict[str, Any] = {}
# ---------------------------------------------------------------
# ALWAYS: basic stats
# ---------------------------------------------------------------
_load_basic_stats(db, context)
# ---------------------------------------------------------------
# ALWAYS: conversation history
# ---------------------------------------------------------------
_load_conversation_history(db, conversation_id, context)
# ---------------------------------------------------------------
# SELECTIVE: load only what the router asked for
# ---------------------------------------------------------------
for category in data_needed:
try:
_load_category(db, category, context)
except Exception as exc:
logger.warning("context_builder: failed to load '%s': %s", category, exc)
# Pass-through extras
context['current_message'] = current_message
if user_context:
context['user_context'] = user_context
return context
finally:
db.close()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _load_basic_stats(db, context: Dict) -> None:
"""Always-loaded: total active companies and category breakdown."""
all_active = db.query(Company).filter_by(status='active').count()
context['total_companies'] = all_active
categories = db.query(Category).all()
context['categories'] = [
{
'name': cat.name,
'slug': cat.slug,
'company_count': db.query(Company).filter_by(
category_id=cat.id, status='active'
).count(),
}
for cat in categories
]
def _load_conversation_history(db, conversation_id: int, context: Dict) -> None:
"""Always-loaded: last 10 messages in the conversation."""
messages = (
db.query(AIChatMessage)
.filter_by(conversation_id=conversation_id)
.order_by(AIChatMessage.created_at.desc())
.limit(10)
.all()
)
context['recent_messages'] = [
{'role': msg.role, 'content': msg.content}
for msg in reversed(messages)
]
def _load_category(db, category: str, context: Dict) -> None:
"""Dispatch a single category string to the appropriate loader."""
if category == 'companies_all':
_load_companies_all(db, context)
elif category.startswith('companies_filtered:'):
cat_name = category.split(':', 1)[1]
_load_companies_filtered(db, cat_name, context)
elif category.startswith('companies_single:'):
identifier = category.split(':', 1)[1]
_load_company_single(db, identifier, context)
elif category == 'events':
_load_events(db, context)
elif category == 'news':
_load_news(db, context)
elif category == 'classifieds':
_load_classifieds(db, context)
elif category == 'forum':
_load_forum(db, context)
elif category == 'company_people':
_load_company_people(db, context)
elif category == 'registered_users':
_load_registered_users(db, context)
elif category == 'social_media':
_load_social_media(db, context)
elif category == 'audits':
_load_audits(db, context)
else:
logger.debug("context_builder: unknown category '%s' — skipped", category)
# ---------------------------------------------------------------------------
# Category loaders
# ---------------------------------------------------------------------------
def _load_companies_all(db, context: Dict) -> None:
"""Load all active companies in compact format."""
companies = db.query(Company).filter_by(status='active').all()
context['all_companies'] = [_company_to_compact_dict(c) for c in companies]
def _load_companies_filtered(db, cat_name: str, context: Dict) -> None:
"""Load active companies filtered by category name (case-insensitive)."""
category = db.query(Category).filter(
Category.name.ilike(cat_name)
).first()
if category:
companies = db.query(Company).filter_by(
category_id=category.id, status='active'
).all()
else:
# Fallback: search by slug
category = db.query(Category).filter(
Category.slug.ilike(cat_name)
).first()
if category:
companies = db.query(Company).filter_by(
category_id=category.id, status='active'
).all()
else:
companies = []
context.setdefault('filtered_companies', [])
context['filtered_companies'].extend(
[_company_to_compact_dict(c) for c in companies]
)
context['filter_category'] = cat_name
def _load_company_single(db, identifier: str, context: Dict) -> None:
"""Load a single company by slug or partial name match."""
# Try slug first (exact match)
company = db.query(Company).filter_by(slug=identifier, status='active').first()
if not company:
# Partial name match
company = (
db.query(Company)
.filter(
Company.name.ilike(f'%{identifier}%'),
Company.status == 'active',
)
.first()
)
if company:
context.setdefault('single_companies', [])
context['single_companies'].append(_company_to_compact_dict(company))
else:
logger.debug("context_builder: company '%s' not found", identifier)
def _load_events(db, context: Dict) -> None:
"""Load upcoming events (next 60 days)."""
today = date.today()
cutoff = today + timedelta(days=60)
upcoming = (
db.query(NordaEvent)
.filter(
NordaEvent.event_date >= today,
NordaEvent.event_date <= cutoff,
)
.order_by(NordaEvent.event_date)
.limit(15)
.all()
)
context['upcoming_events'] = [
{
'title': (event.title or '')[:80],
'date': event.event_date.strftime('%Y-%m-%d') if event.event_date else '',
'type': event.event_type or 'meeting',
'location': (event.location or '')[:50],
'speaker': (event.speaker_name or '')[:30],
}
for event in upcoming
]
def _load_news(db, context: Dict) -> None:
"""Load recent approved ZOPK news (last 30 days, max 10)."""
cutoff = datetime.now() - timedelta(days=30)
news_items = (
db.query(ZOPKNews)
.filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.published_at >= cutoff,
)
.order_by(ZOPKNews.published_at.desc())
.limit(10)
.all()
)
context['recent_news'] = [
{
'title': news.title,
'description': (news.description or '')[:400],
'summary': (news.ai_summary or '')[:300],
'source': news.source_name or '',
'url': news.url or '',
'date': news.published_at.strftime('%Y-%m-%d') if news.published_at else '',
'type': news.news_type or 'news',
'keywords': (news.keywords or [])[:5],
}
for news in news_items
]
def _load_classifieds(db, context: Dict) -> None:
"""Load active non-test B2B classifieds (max 20)."""
classifieds = (
db.query(Classified)
.filter(
Classified.is_active == True,
Classified.is_test == False,
)
.order_by(Classified.created_at.desc())
.limit(20)
.all()
)
context['classifieds'] = [
{
'type': c.listing_type,
'category': c.category,
'title': c.title,
'description': (c.description or '')[:400],
'company': c.company.name if c.company else '',
'author': c.author.name if c.author else '',
'budget': c.budget_info or '',
'location': c.location_info or '',
'date': c.created_at.strftime('%Y-%m-%d') if c.created_at else '',
'views': c.views_count or 0,
'url': f'/classifieds/{c.id}',
}
for c in classifieds
]
def _load_forum(db, context: Dict) -> None:
"""Load recent forum topics with replies (non-test, max 15)."""
topics = (
db.query(ForumTopic)
.options(
joinedload(ForumTopic.author),
joinedload(ForumTopic.replies).joinedload(ForumReply.author),
)
.filter(ForumTopic.category != 'test')
.order_by(ForumTopic.created_at.desc())
.limit(15)
.all()
)
forum_data = []
for topic in topics:
topic_data = {
'title': topic.title,
'content': (topic.content or '')[:500],
'author': topic.author.name if topic.author else 'Anonim',
'category': topic.category_label if hasattr(topic, 'category_label') else topic.category,
'status': topic.status_label if hasattr(topic, 'status_label') else topic.status,
'date': topic.created_at.strftime('%Y-%m-%d') if topic.created_at else '',
'url': f'/forum/{topic.id}',
'views': topic.views_count or 0,
'pinned': topic.is_pinned,
'replies_count': topic.reply_count if hasattr(topic, 'reply_count') else 0,
'has_attachments': bool(topic.attachments) if topic.attachments else False,
}
if topic.replies:
sorted_replies = sorted(topic.replies, key=lambda r: r.created_at, reverse=True)
topic_data['replies'] = [
{
'author': reply.author.name if reply.author else 'Anonim',
'content': (reply.content or '')[:300],
'date': reply.created_at.strftime('%Y-%m-%d') if reply.created_at else '',
}
for reply in sorted_replies[:5]
]
forum_data.append(topic_data)
context['forum_topics'] = forum_data
def _load_company_people(db, context: Dict) -> None:
"""Load KRS company-people relationships grouped by company."""
company_people = (
db.query(CompanyPerson)
.options(
joinedload(CompanyPerson.person),
joinedload(CompanyPerson.company),
)
.order_by(CompanyPerson.company_id)
.all()
)
people_by_company: Dict[str, Any] = {}
for cp in company_people:
company_name = cp.company.name if cp.company else 'Nieznana'
company_profile = (
f"https://nordabiznes.pl/company/{cp.company.slug}"
if cp.company and cp.company.slug
else None
)
if company_name not in people_by_company:
people_by_company[company_name] = {'profile': company_profile, 'people': []}
person_info: Dict[str, Any] = {
'name': cp.person.full_name() if cp.person else '',
'profile': f"https://nordabiznes.pl/osoba/{cp.person.id}" if cp.person else None,
'role': (cp.role or '')[:30],
}
if cp.shares_percent:
person_info['shares'] = f"{cp.shares_percent}%"
people_by_company[company_name]['people'].append(person_info)
context['company_people'] = people_by_company
def _load_registered_users(db, context: Dict) -> None:
"""Load active portal users with company assignments grouped by company."""
users = (
db.query(User)
.filter(
User.is_active == True,
User.company_id.isnot(None),
)
.options(joinedload(User.company))
.all()
)
role_labels = {
'MANAGER': 'administrator profilu',
'EMPLOYEE': 'pracownik',
'VIEWER': 'obserwator',
}
users_by_company: Dict[str, Any] = {}
for u in users:
company_name = u.company.name if u.company else 'Nieznana'
company_profile = (
f"https://nordabiznes.pl/company/{u.company.slug}"
if u.company and u.company.slug
else None
)
if company_name not in users_by_company:
users_by_company[company_name] = {'profile': company_profile, 'users': []}
users_by_company[company_name]['users'].append({
'name': u.name,
'email': u.email,
'portal_role': role_labels.get(u.company_role, ''),
'member': u.is_norda_member,
'verified': u.is_verified,
})
context['registered_users'] = users_by_company
def _load_social_media(db, context: Dict) -> None:
"""Load valid company social media profiles grouped by company."""
social_items = (
db.query(CompanySocialMedia)
.filter(CompanySocialMedia.is_valid == True)
.options(joinedload(CompanySocialMedia.company))
.all()
)
social_by_company: Dict[str, List] = {}
for sm in social_items:
company_name = sm.company.name if sm.company else 'Nieznana'
if company_name not in social_by_company:
social_by_company[company_name] = []
social_by_company[company_name].append({
'platform': sm.platform,
'url': sm.url or '',
'followers': sm.followers_count or 0,
})
context['company_social_media'] = social_by_company
def _load_audits(db, context: Dict) -> None:
"""Load latest GBP audits and SEO PageSpeed scores."""
from sqlalchemy import func
# GBP audits — one per company, most recent
latest_subq = (
db.query(
GBPAudit.company_id,
func.max(GBPAudit.audit_date).label('max_date'),
)
.group_by(GBPAudit.company_id)
.subquery()
)
latest_audits = (
db.query(GBPAudit)
.join(
latest_subq,
(GBPAudit.company_id == latest_subq.c.company_id)
& (GBPAudit.audit_date == latest_subq.c.max_date),
)
.options(joinedload(GBPAudit.company))
.all()
)
context['gbp_audits'] = [
{
'company': audit.company.name if audit.company else '',
'score': audit.completeness_score or 0,
'reviews': audit.review_count or 0,
'rating': float(audit.average_rating) if audit.average_rating else 0,
'maps_url': audit.google_maps_url or '',
'profile_url': (
f'https://nordabiznes.pl/company/{audit.company.slug}'
if audit.company
else ''
),
}
for audit in latest_audits
]
# SEO / PageSpeed audits
seo_audits = (
db.query(CompanyWebsiteAnalysis)
.filter(CompanyWebsiteAnalysis.pagespeed_seo_score.isnot(None))
.options(joinedload(CompanyWebsiteAnalysis.company))
.all()
)
context['seo_audits'] = [
{
'company': audit.company.name if audit.company else '',
'seo': audit.pagespeed_seo_score or 0,
'performance': audit.pagespeed_performance_score or 0,
'accessibility': audit.pagespeed_accessibility_score or 0,
'best_practices': audit.pagespeed_best_practices_score or 0,
'overall': audit.seo_overall_score or 0,
'url': audit.company.website if audit.company else '',
'profile_url': (
f'https://nordabiznes.pl/company/{audit.company.slug}'
if audit.company
else ''
),
}
for audit in seo_audits
]
# ---------------------------------------------------------------------------
# Company compact format (mirrors nordabiz_chat._company_to_compact_dict)
# ---------------------------------------------------------------------------
def _company_to_compact_dict(c: Company) -> Dict[str, Any]:
"""
Convert a Company ORM object to a compact token-efficient dict.
Format matches nordabiz_chat.py's _company_to_compact_dict() exactly.
"""
compact: Dict[str, Any] = {
'name': c.name,
'cat': c.category.name if c.category else None,
'profile': f'https://nordabiznes.pl/company/{c.slug}',
}
# Only include non-empty fields to save tokens
if c.description_short:
compact['desc'] = c.description_short
if c.description_full:
compact['about'] = c.description_full
if c.founding_history:
compact['history'] = c.founding_history
if c.core_values:
compact['values'] = c.core_values
if c.services_offered:
compact['offerings'] = c.services_offered
if c.technologies_used:
compact['tech'] = c.technologies_used
if c.services:
services = [cs.service.name for cs in c.services if cs.service]
if services:
compact['svc'] = services
if c.competencies:
competencies = [cc.competency.name for cc in c.competencies if cc.competency]
if competencies:
compact['comp'] = competencies
if c.website:
compact['web'] = c.website
if c.phone:
compact['tel'] = c.phone
if c.email:
compact['mail'] = c.email
if c.address_city:
compact['city'] = c.address_city
if c.year_established:
compact['year'] = c.year_established
if c.certifications:
certs = [cert.name for cert in c.certifications if cert.is_active]
if certs:
compact['cert'] = certs[:3]
return compact