nordabiz/norda_knowledge_service.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

723 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Norda Knowledge Base Service
============================
Builds and maintains a knowledge base for Norda GPT from:
1. Forum topics and replies (public discussions)
2. AI chat responses (assistant messages with positive feedback)
PRIVACY: User questions (role='user') are NEVER added to the knowledge base.
Only AI responses (role='assistant') containing public company facts are included.
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-01-28
"""
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Enum as SQLEnum
from sqlalchemy.orm import relationship
from database import (
Base,
SessionLocal,
ForumTopic,
ForumReply,
AIChatMessage,
AIChatConversation,
Company
)
logger = logging.getLogger(__name__)
class KnowledgeSourceType(Enum):
"""Source types for knowledge chunks"""
FORUM_TOPIC = "forum_topic"
FORUM_REPLY = "forum_reply"
CHAT_RESPONSE = "chat_response"
CHAT_QUESTION = "chat_question" # Anonymized user questions (for trend analysis)
MANUAL = "manual" # Manually added by admin
class InsightCategory(Enum):
"""Categories for development insights"""
FEATURE_REQUEST = "feature_request" # User wants new feature
BUG_REPORT = "bug_report" # Something doesn't work
IMPROVEMENT = "improvement" # Enhancement to existing feature
QUESTION = "question" # Common question (indicates missing docs/feature)
PAIN_POINT = "pain_point" # User frustration
POSITIVE_FEEDBACK = "positive_feedback" # What works well
COMPANY_SEARCH = "company_search" # What companies users look for
OTHER = "other"
class NordaKnowledgeChunk(Base):
"""
Knowledge chunks extracted from various sources.
Used for:
1. Norda GPT knowledge base
2. Development insights for roadmap
3. User feedback analysis
"""
__tablename__ = 'norda_knowledge_chunks'
id = Column(Integer, primary_key=True)
# Content
content = Column(Text, nullable=False)
summary = Column(String(500)) # Short summary for quick reference
# Source tracking
source_type = Column(String(50), nullable=False) # forum_topic, forum_reply, chat_response, chat_question, manual
source_id = Column(Integer) # ID in source table
source_url = Column(String(500)) # URL to original source
# Metadata
category = Column(String(100)) # Topic category or detected theme
keywords = Column(Text) # Comma-separated keywords for search
# Development insights (for roadmap)
insight_category = Column(String(50)) # feature_request, bug_report, improvement, question, pain_point, etc.
insight_priority = Column(Integer, default=0) # Higher = more important (based on frequency)
insight_status = Column(String(50), default='new') # new, reviewed, planned, implemented, rejected
# Quality indicators
is_verified = Column(Boolean, default=False) # Admin verified
confidence_score = Column(Integer, default=50) # 0-100
# Companies mentioned
mentioned_company_ids = Column(Text) # Comma-separated company IDs
# Timestamps
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
source_created_at = Column(DateTime) # When the source was created
# Status
is_active = Column(Boolean, default=True)
def __repr__(self):
return f"<NordaKnowledgeChunk {self.id}: {self.source_type}>"
@dataclass
class KnowledgeChunkResult:
"""Result from knowledge search"""
chunk_id: int
content: str
summary: Optional[str]
source_type: str
source_url: Optional[str]
relevance_score: float
created_at: datetime
class NordaKnowledgeService:
"""
Service for building and querying the Norda knowledge base.
"""
def __init__(self):
self.min_content_length = 50 # Minimum characters for a chunk
self.max_content_length = 2000 # Maximum characters per chunk
def sync_forum_knowledge(self, days_back: int = 30) -> Dict[str, int]:
"""
Sync knowledge from forum topics and replies.
Args:
days_back: How many days back to sync
Returns:
Dict with counts of added/updated/skipped items
"""
db = SessionLocal()
stats = {'topics_added': 0, 'replies_added': 0, 'skipped': 0, 'errors': 0}
try:
cutoff_date = datetime.now() - timedelta(days=days_back)
# Get forum topics (exclude test category)
topics = db.query(ForumTopic).filter(
ForumTopic.created_at >= cutoff_date,
ForumTopic.category != 'test'
).all()
for topic in topics:
try:
# Check if already exists
existing = db.query(NordaKnowledgeChunk).filter_by(
source_type=KnowledgeSourceType.FORUM_TOPIC.value,
source_id=topic.id
).first()
if existing:
stats['skipped'] += 1
continue
# Create knowledge chunk from topic
if len(topic.content or '') >= self.min_content_length:
chunk = NordaKnowledgeChunk(
content=self._truncate_content(topic.content),
summary=topic.title[:500] if topic.title else None,
source_type=KnowledgeSourceType.FORUM_TOPIC.value,
source_id=topic.id,
source_url=f"/forum/topic/{topic.id}",
category=topic.category,
keywords=self._extract_keywords(topic.title + ' ' + topic.content),
confidence_score=70, # Forum topics are moderately reliable
source_created_at=topic.created_at,
is_active=True
)
db.add(chunk)
stats['topics_added'] += 1
except Exception as e:
logger.error(f"Error processing topic {topic.id}: {e}")
stats['errors'] += 1
# Get forum replies
replies = db.query(ForumReply).filter(
ForumReply.created_at >= cutoff_date
).all()
for reply in replies:
try:
# Skip if topic is test category
if reply.topic and reply.topic.category == 'test':
continue
# Check if already exists
existing = db.query(NordaKnowledgeChunk).filter_by(
source_type=KnowledgeSourceType.FORUM_REPLY.value,
source_id=reply.id
).first()
if existing:
stats['skipped'] += 1
continue
# Create knowledge chunk from reply
if len(reply.content or '') >= self.min_content_length:
chunk = NordaKnowledgeChunk(
content=self._truncate_content(reply.content),
summary=f"Odpowiedź w: {reply.topic.title[:200]}" if reply.topic else None,
source_type=KnowledgeSourceType.FORUM_REPLY.value,
source_id=reply.id,
source_url=f"/forum/topic/{reply.topic_id}#reply-{reply.id}" if reply.topic_id else None,
category=reply.topic.category if reply.topic else None,
keywords=self._extract_keywords(reply.content),
confidence_score=60, # Replies are slightly less reliable
source_created_at=reply.created_at,
is_active=True
)
db.add(chunk)
stats['replies_added'] += 1
except Exception as e:
logger.error(f"Error processing reply {reply.id}: {e}")
stats['errors'] += 1
db.commit()
logger.info(f"Forum knowledge sync complete: {stats}")
except Exception as e:
logger.error(f"Error syncing forum knowledge: {e}")
db.rollback()
raise
finally:
db.close()
return stats
def sync_chat_knowledge(self, days_back: int = 30) -> Dict[str, int]:
"""
Sync knowledge from AI chat responses.
PRIVACY: Only syncs assistant messages (role='assistant').
User messages (role='user') are NEVER added.
Prioritizes responses with positive feedback.
Args:
days_back: How many days back to sync
Returns:
Dict with counts of added/skipped items
"""
db = SessionLocal()
stats = {'responses_added': 0, 'skipped': 0, 'errors': 0}
try:
cutoff_date = datetime.now() - timedelta(days=days_back)
# PRIVACY: Only get assistant messages (NEVER user messages)
# Prioritize messages with positive feedback
messages = db.query(AIChatMessage).filter(
AIChatMessage.role == 'assistant', # CRITICAL: Only assistant messages
AIChatMessage.created_at >= cutoff_date,
# Only include messages with positive or no feedback (exclude negative)
(AIChatMessage.feedback_rating == 2) | (AIChatMessage.feedback_rating.is_(None))
).all()
for msg in messages:
try:
# Check if already exists
existing = db.query(NordaKnowledgeChunk).filter_by(
source_type=KnowledgeSourceType.CHAT_RESPONSE.value,
source_id=msg.id
).first()
if existing:
stats['skipped'] += 1
continue
# Skip short responses or greetings
if len(msg.content or '') < self.min_content_length:
stats['skipped'] += 1
continue
# Skip generic responses without company information
if not self._contains_company_info(msg.content):
stats['skipped'] += 1
continue
# Calculate confidence based on feedback
confidence = 50
if msg.feedback_rating == 2: # Positive feedback
confidence = 85
if msg.companies_mentioned and msg.companies_mentioned > 0:
confidence += 10
# Create knowledge chunk
chunk = NordaKnowledgeChunk(
content=self._truncate_content(msg.content),
summary=self._generate_summary(msg.content),
source_type=KnowledgeSourceType.CHAT_RESPONSE.value,
source_id=msg.id,
source_url=None, # Chat responses don't have public URLs
category='chat_knowledge',
keywords=self._extract_keywords(msg.content),
confidence_score=min(confidence, 100),
source_created_at=msg.created_at,
is_active=True
)
db.add(chunk)
stats['responses_added'] += 1
except Exception as e:
logger.error(f"Error processing chat message {msg.id}: {e}")
stats['errors'] += 1
db.commit()
logger.info(f"Chat knowledge sync complete: {stats}")
except Exception as e:
logger.error(f"Error syncing chat knowledge: {e}")
db.rollback()
raise
finally:
db.close()
return stats
def analyze_user_questions(self, days_back: int = 30) -> Dict[str, int]:
"""
Analyze user questions for development insights.
PRIVACY: Content is anonymized - we only extract PATTERNS/TOPICS, not actual questions.
This helps identify:
- Missing features (what users ask for that doesn't exist)
- Common questions (need better docs/UI)
- Popular company searches (demand analysis)
Args:
days_back: How many days back to analyze
Returns:
Dict with counts of insights by category
"""
db = SessionLocal()
stats = {'insights_added': 0, 'patterns_found': 0}
try:
cutoff_date = datetime.now() - timedelta(days=days_back)
# Get user messages (for pattern analysis only)
messages = db.query(AIChatMessage).filter(
AIChatMessage.role == 'user',
AIChatMessage.created_at >= cutoff_date
).all()
# Analyze patterns (anonymized)
pattern_counts = {}
for msg in messages:
patterns = self._extract_insight_patterns(msg.content)
for pattern, category in patterns:
key = (pattern, category)
pattern_counts[key] = pattern_counts.get(key, 0) + 1
# Create insight chunks for frequently occurring patterns
for (pattern, category), count in pattern_counts.items():
if count >= 3: # Only if pattern appears 3+ times
stats['patterns_found'] += 1
# Check if insight already exists
existing = db.query(NordaKnowledgeChunk).filter(
NordaKnowledgeChunk.source_type == KnowledgeSourceType.CHAT_QUESTION.value,
NordaKnowledgeChunk.summary == pattern
).first()
if existing:
# Update priority based on frequency
existing.insight_priority = max(existing.insight_priority, count)
existing.updated_at = datetime.now()
else:
# Create new insight
chunk = NordaKnowledgeChunk(
content=f"Użytkownicy często pytają o: {pattern} ({count} razy w ostatnich {days_back} dniach)",
summary=pattern,
source_type=KnowledgeSourceType.CHAT_QUESTION.value,
source_id=None, # Anonymized, no specific source
insight_category=category,
insight_priority=count,
insight_status='new',
confidence_score=min(50 + count * 5, 95),
is_active=True
)
db.add(chunk)
stats['insights_added'] += 1
db.commit()
logger.info(f"User question analysis complete: {stats}")
except Exception as e:
logger.error(f"Error analyzing user questions: {e}")
db.rollback()
raise
finally:
db.close()
return stats
def _extract_insight_patterns(self, question: str) -> List[tuple]:
"""
Extract anonymized patterns from user questions.
Returns list of (pattern, category) tuples.
"""
patterns = []
q_lower = question.lower()
# Feature requests
feature_keywords = ['czy można', 'czy da się', 'chciałbym', 'przydałoby się', 'brakuje', 'potrzebuję']
for kw in feature_keywords:
if kw in q_lower:
patterns.append(('Prośba o nową funkcję', InsightCategory.FEATURE_REQUEST.value))
break
# Bug reports
bug_keywords = ['nie działa', 'błąd', 'problem', 'nie mogę', 'nie wyświetla', 'crash']
for kw in bug_keywords:
if kw in q_lower:
patterns.append(('Zgłoszenie problemu', InsightCategory.BUG_REPORT.value))
break
# Company searches
company_keywords = ['firma', 'firmy', 'szukam', 'potrzebuję', 'kto robi', 'kto oferuje']
for kw in company_keywords:
if kw in q_lower:
patterns.append(('Wyszukiwanie firm', InsightCategory.COMPANY_SEARCH.value))
break
# Specific service searches (extract service type)
service_patterns = [
('budownictwo', 'Szukanie: usługi budowlane'),
('transport', 'Szukanie: transport/logistyka'),
('it', 'Szukanie: usługi IT'),
('marketing', 'Szukanie: marketing/reklama'),
('księgowość', 'Szukanie: księgowość'),
('prawo', 'Szukanie: usługi prawne'),
]
for keyword, pattern in service_patterns:
if keyword in q_lower:
patterns.append((pattern, InsightCategory.COMPANY_SEARCH.value))
# Questions about portal
portal_keywords = ['jak', 'gdzie', 'co to', 'po co', 'dlaczego']
for kw in portal_keywords:
if kw in q_lower and ('portal' in q_lower or 'strona' in q_lower or 'norda' in q_lower):
patterns.append(('Pytanie o portal', InsightCategory.QUESTION.value))
break
return patterns
def get_development_insights(self, status: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Get development insights for roadmap planning.
Args:
status: Filter by status (new, reviewed, planned, implemented, rejected)
Returns:
List of insight dicts ordered by priority
"""
db = SessionLocal()
try:
query = db.query(NordaKnowledgeChunk).filter(
NordaKnowledgeChunk.is_active == True,
NordaKnowledgeChunk.insight_category.isnot(None)
)
if status:
query = query.filter(NordaKnowledgeChunk.insight_status == status)
insights = query.order_by(
NordaKnowledgeChunk.insight_priority.desc(),
NordaKnowledgeChunk.created_at.desc()
).limit(100).all()
return [
{
'id': i.id,
'summary': i.summary,
'content': i.content,
'category': i.insight_category,
'priority': i.insight_priority,
'status': i.insight_status,
'source_type': i.source_type,
'created_at': i.created_at.isoformat() if i.created_at else None
}
for i in insights
]
finally:
db.close()
def update_insight_status(self, insight_id: int, status: str, note: Optional[str] = None) -> bool:
"""Update the status of a development insight"""
db = SessionLocal()
try:
insight = db.query(NordaKnowledgeChunk).filter_by(id=insight_id).first()
if not insight:
return False
insight.insight_status = status
insight.updated_at = datetime.now()
if note:
insight.content = f"{insight.content}\n\n[Admin note: {note}]"
db.commit()
return True
except Exception as e:
logger.error(f"Error updating insight status: {e}")
db.rollback()
return False
finally:
db.close()
def search_knowledge(
self,
query: str,
limit: int = 5,
source_types: Optional[List[str]] = None
) -> List[KnowledgeChunkResult]:
"""
Search knowledge base for relevant chunks.
Args:
query: Search query
limit: Maximum results to return
source_types: Filter by source types (None = all)
Returns:
List of KnowledgeChunkResult ordered by relevance
"""
db = SessionLocal()
results = []
try:
# Build base query
base_query = db.query(NordaKnowledgeChunk).filter(
NordaKnowledgeChunk.is_active == True
)
# Filter by source types
if source_types:
base_query = base_query.filter(
NordaKnowledgeChunk.source_type.in_(source_types)
)
# Simple keyword search (can be enhanced with FTS later)
query_words = query.lower().split()
chunks = base_query.all()
# Score each chunk
scored_chunks = []
for chunk in chunks:
score = self._calculate_relevance(query_words, chunk)
if score > 0:
scored_chunks.append((chunk, score))
# Sort by score and limit
scored_chunks.sort(key=lambda x: x[1], reverse=True)
top_chunks = scored_chunks[:limit]
# Convert to results
for chunk, score in top_chunks:
results.append(KnowledgeChunkResult(
chunk_id=chunk.id,
content=chunk.content,
summary=chunk.summary,
source_type=chunk.source_type,
source_url=chunk.source_url,
relevance_score=score,
created_at=chunk.source_created_at or chunk.created_at
))
finally:
db.close()
return results
def get_knowledge_stats(self) -> Dict[str, Any]:
"""Get statistics about the knowledge base"""
db = SessionLocal()
try:
total = db.query(NordaKnowledgeChunk).filter(
NordaKnowledgeChunk.is_active == True
).count()
by_source = {}
for source_type in KnowledgeSourceType:
count = db.query(NordaKnowledgeChunk).filter(
NordaKnowledgeChunk.is_active == True,
NordaKnowledgeChunk.source_type == source_type.value
).count()
by_source[source_type.value] = count
verified = db.query(NordaKnowledgeChunk).filter(
NordaKnowledgeChunk.is_active == True,
NordaKnowledgeChunk.is_verified == True
).count()
return {
'total_chunks': total,
'by_source': by_source,
'verified_chunks': verified,
'last_sync': datetime.now().isoformat()
}
finally:
db.close()
def _truncate_content(self, content: str) -> str:
"""Truncate content to max length"""
if len(content) <= self.max_content_length:
return content
return content[:self.max_content_length - 3] + "..."
def _extract_keywords(self, text: str) -> str:
"""Extract keywords from text (simple implementation)"""
# Remove common words and extract significant terms
stopwords = {
'i', 'a', 'the', 'to', 'w', 'z', 'na', 'do', 'jest', '', 'być',
'że', 'o', 'nie', 'się', 'jak', 'co', 'dla', 'po', 'od', 'za',
'ale', 'lub', 'oraz', 'czy', 'tak', 'już', 'tylko', 'też', 'jeszcze'
}
words = text.lower().split()
keywords = [w for w in words if len(w) > 3 and w not in stopwords]
# Return unique keywords, limited
unique_keywords = list(dict.fromkeys(keywords))[:20]
return ','.join(unique_keywords)
def _generate_summary(self, content: str) -> Optional[str]:
"""Generate a short summary (first sentence or 100 chars)"""
if not content:
return None
# Try to get first sentence
sentences = content.split('.')
if sentences:
first = sentences[0].strip()
if len(first) > 10:
return first[:200] + ('...' if len(first) > 200 else '')
return content[:100] + '...'
def _contains_company_info(self, content: str) -> bool:
"""Check if content contains company-related information"""
company_keywords = [
'firma', 'firmy', 'firmę', 'spółka', 'przedsiębiorstwo',
'usługi', 'produkty', 'kontakt', 'telefon', 'email',
'norda', 'członek', 'biznes', 'współpraca'
]
content_lower = content.lower()
return any(kw in content_lower for kw in company_keywords)
def _calculate_relevance(self, query_words: List[str], chunk: NordaKnowledgeChunk) -> float:
"""Calculate relevance score for a chunk"""
score = 0.0
content_lower = (chunk.content or '').lower()
keywords = (chunk.keywords or '').lower()
summary = (chunk.summary or '').lower()
for word in query_words:
# Content matches
if word in content_lower:
score += 1.0
# Keyword matches (higher weight)
if word in keywords:
score += 2.0
# Summary matches
if word in summary:
score += 1.5
# Boost verified chunks
if chunk.is_verified:
score *= 1.2
# Boost by confidence
score *= (chunk.confidence_score or 50) / 100
return score
# Global instance
_service_instance: Optional[NordaKnowledgeService] = None
def get_knowledge_service() -> NordaKnowledgeService:
"""Get or create global NordaKnowledgeService instance"""
global _service_instance
if _service_instance is None:
_service_instance = NordaKnowledgeService()
return _service_instance
# Convenience functions
def sync_all_knowledge(days_back: int = 30) -> Dict[str, Any]:
"""Sync knowledge from all sources"""
service = get_knowledge_service()
return {
'forum': service.sync_forum_knowledge(days_back),
'chat': service.sync_chat_knowledge(days_back)
}
def search_knowledge(query: str, limit: int = 5) -> List[KnowledgeChunkResult]:
"""Search the knowledge base"""
return get_knowledge_service().search_knowledge(query, limit)