#!/usr/bin/env python3 """ Norda Knowledge Base Service ============================ Builds and maintains a knowledge base for Norda GPT from: 1. Forum topics and replies (public discussions) 2. AI chat responses (assistant messages with positive feedback) PRIVACY: User questions (role='user') are NEVER added to the knowledge base. Only AI responses (role='assistant') containing public company facts are included. Author: Norda Biznes Development Team Created: 2026-01-28 """ import logging from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from dataclasses import dataclass from enum import Enum from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Enum as SQLEnum from sqlalchemy.orm import relationship from database import ( Base, SessionLocal, ForumTopic, ForumReply, AIChatMessage, AIChatConversation, Company ) logger = logging.getLogger(__name__) class KnowledgeSourceType(Enum): """Source types for knowledge chunks""" FORUM_TOPIC = "forum_topic" FORUM_REPLY = "forum_reply" CHAT_RESPONSE = "chat_response" CHAT_QUESTION = "chat_question" # Anonymized user questions (for trend analysis) MANUAL = "manual" # Manually added by admin class InsightCategory(Enum): """Categories for development insights""" FEATURE_REQUEST = "feature_request" # User wants new feature BUG_REPORT = "bug_report" # Something doesn't work IMPROVEMENT = "improvement" # Enhancement to existing feature QUESTION = "question" # Common question (indicates missing docs/feature) PAIN_POINT = "pain_point" # User frustration POSITIVE_FEEDBACK = "positive_feedback" # What works well COMPANY_SEARCH = "company_search" # What companies users look for OTHER = "other" class NordaKnowledgeChunk(Base): """ Knowledge chunks extracted from various sources. Used for: 1. Norda GPT knowledge base 2. Development insights for roadmap 3. User feedback analysis """ __tablename__ = 'norda_knowledge_chunks' id = Column(Integer, primary_key=True) # Content content = Column(Text, nullable=False) summary = Column(String(500)) # Short summary for quick reference # Source tracking source_type = Column(String(50), nullable=False) # forum_topic, forum_reply, chat_response, chat_question, manual source_id = Column(Integer) # ID in source table source_url = Column(String(500)) # URL to original source # Metadata category = Column(String(100)) # Topic category or detected theme keywords = Column(Text) # Comma-separated keywords for search # Development insights (for roadmap) insight_category = Column(String(50)) # feature_request, bug_report, improvement, question, pain_point, etc. insight_priority = Column(Integer, default=0) # Higher = more important (based on frequency) insight_status = Column(String(50), default='new') # new, reviewed, planned, implemented, rejected # Quality indicators is_verified = Column(Boolean, default=False) # Admin verified confidence_score = Column(Integer, default=50) # 0-100 # Companies mentioned mentioned_company_ids = Column(Text) # Comma-separated company IDs # Timestamps created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) source_created_at = Column(DateTime) # When the source was created # Status is_active = Column(Boolean, default=True) def __repr__(self): return f"" @dataclass class KnowledgeChunkResult: """Result from knowledge search""" chunk_id: int content: str summary: Optional[str] source_type: str source_url: Optional[str] relevance_score: float created_at: datetime class NordaKnowledgeService: """ Service for building and querying the Norda knowledge base. """ def __init__(self): self.min_content_length = 50 # Minimum characters for a chunk self.max_content_length = 2000 # Maximum characters per chunk def sync_forum_knowledge(self, days_back: int = 30) -> Dict[str, int]: """ Sync knowledge from forum topics and replies. Args: days_back: How many days back to sync Returns: Dict with counts of added/updated/skipped items """ db = SessionLocal() stats = {'topics_added': 0, 'replies_added': 0, 'skipped': 0, 'errors': 0} try: cutoff_date = datetime.now() - timedelta(days=days_back) # Get forum topics (exclude test category) topics = db.query(ForumTopic).filter( ForumTopic.created_at >= cutoff_date, ForumTopic.category != 'test' ).all() for topic in topics: try: # Check if already exists existing = db.query(NordaKnowledgeChunk).filter_by( source_type=KnowledgeSourceType.FORUM_TOPIC.value, source_id=topic.id ).first() if existing: stats['skipped'] += 1 continue # Create knowledge chunk from topic if len(topic.content or '') >= self.min_content_length: chunk = NordaKnowledgeChunk( content=self._truncate_content(topic.content), summary=topic.title[:500] if topic.title else None, source_type=KnowledgeSourceType.FORUM_TOPIC.value, source_id=topic.id, source_url=f"/forum/topic/{topic.id}", category=topic.category, keywords=self._extract_keywords(topic.title + ' ' + topic.content), confidence_score=70, # Forum topics are moderately reliable source_created_at=topic.created_at, is_active=True ) db.add(chunk) stats['topics_added'] += 1 except Exception as e: logger.error(f"Error processing topic {topic.id}: {e}") stats['errors'] += 1 # Get forum replies replies = db.query(ForumReply).filter( ForumReply.created_at >= cutoff_date ).all() for reply in replies: try: # Skip if topic is test category if reply.topic and reply.topic.category == 'test': continue # Check if already exists existing = db.query(NordaKnowledgeChunk).filter_by( source_type=KnowledgeSourceType.FORUM_REPLY.value, source_id=reply.id ).first() if existing: stats['skipped'] += 1 continue # Create knowledge chunk from reply if len(reply.content or '') >= self.min_content_length: chunk = NordaKnowledgeChunk( content=self._truncate_content(reply.content), summary=f"Odpowiedź w: {reply.topic.title[:200]}" if reply.topic else None, source_type=KnowledgeSourceType.FORUM_REPLY.value, source_id=reply.id, source_url=f"/forum/topic/{reply.topic_id}#reply-{reply.id}" if reply.topic_id else None, category=reply.topic.category if reply.topic else None, keywords=self._extract_keywords(reply.content), confidence_score=60, # Replies are slightly less reliable source_created_at=reply.created_at, is_active=True ) db.add(chunk) stats['replies_added'] += 1 except Exception as e: logger.error(f"Error processing reply {reply.id}: {e}") stats['errors'] += 1 db.commit() logger.info(f"Forum knowledge sync complete: {stats}") except Exception as e: logger.error(f"Error syncing forum knowledge: {e}") db.rollback() raise finally: db.close() return stats def sync_chat_knowledge(self, days_back: int = 30) -> Dict[str, int]: """ Sync knowledge from AI chat responses. PRIVACY: Only syncs assistant messages (role='assistant'). User messages (role='user') are NEVER added. Prioritizes responses with positive feedback. Args: days_back: How many days back to sync Returns: Dict with counts of added/skipped items """ db = SessionLocal() stats = {'responses_added': 0, 'skipped': 0, 'errors': 0} try: cutoff_date = datetime.now() - timedelta(days=days_back) # PRIVACY: Only get assistant messages (NEVER user messages) # Prioritize messages with positive feedback messages = db.query(AIChatMessage).filter( AIChatMessage.role == 'assistant', # CRITICAL: Only assistant messages AIChatMessage.created_at >= cutoff_date, # Only include messages with positive or no feedback (exclude negative) (AIChatMessage.feedback_rating == 2) | (AIChatMessage.feedback_rating.is_(None)) ).all() for msg in messages: try: # Check if already exists existing = db.query(NordaKnowledgeChunk).filter_by( source_type=KnowledgeSourceType.CHAT_RESPONSE.value, source_id=msg.id ).first() if existing: stats['skipped'] += 1 continue # Skip short responses or greetings if len(msg.content or '') < self.min_content_length: stats['skipped'] += 1 continue # Skip generic responses without company information if not self._contains_company_info(msg.content): stats['skipped'] += 1 continue # Calculate confidence based on feedback confidence = 50 if msg.feedback_rating == 2: # Positive feedback confidence = 85 if msg.companies_mentioned and msg.companies_mentioned > 0: confidence += 10 # Create knowledge chunk chunk = NordaKnowledgeChunk( content=self._truncate_content(msg.content), summary=self._generate_summary(msg.content), source_type=KnowledgeSourceType.CHAT_RESPONSE.value, source_id=msg.id, source_url=None, # Chat responses don't have public URLs category='chat_knowledge', keywords=self._extract_keywords(msg.content), confidence_score=min(confidence, 100), source_created_at=msg.created_at, is_active=True ) db.add(chunk) stats['responses_added'] += 1 except Exception as e: logger.error(f"Error processing chat message {msg.id}: {e}") stats['errors'] += 1 db.commit() logger.info(f"Chat knowledge sync complete: {stats}") except Exception as e: logger.error(f"Error syncing chat knowledge: {e}") db.rollback() raise finally: db.close() return stats def analyze_user_questions(self, days_back: int = 30) -> Dict[str, int]: """ Analyze user questions for development insights. PRIVACY: Content is anonymized - we only extract PATTERNS/TOPICS, not actual questions. This helps identify: - Missing features (what users ask for that doesn't exist) - Common questions (need better docs/UI) - Popular company searches (demand analysis) Args: days_back: How many days back to analyze Returns: Dict with counts of insights by category """ db = SessionLocal() stats = {'insights_added': 0, 'patterns_found': 0} try: cutoff_date = datetime.now() - timedelta(days=days_back) # Get user messages (for pattern analysis only) messages = db.query(AIChatMessage).filter( AIChatMessage.role == 'user', AIChatMessage.created_at >= cutoff_date ).all() # Analyze patterns (anonymized) pattern_counts = {} for msg in messages: patterns = self._extract_insight_patterns(msg.content) for pattern, category in patterns: key = (pattern, category) pattern_counts[key] = pattern_counts.get(key, 0) + 1 # Create insight chunks for frequently occurring patterns for (pattern, category), count in pattern_counts.items(): if count >= 3: # Only if pattern appears 3+ times stats['patterns_found'] += 1 # Check if insight already exists existing = db.query(NordaKnowledgeChunk).filter( NordaKnowledgeChunk.source_type == KnowledgeSourceType.CHAT_QUESTION.value, NordaKnowledgeChunk.summary == pattern ).first() if existing: # Update priority based on frequency existing.insight_priority = max(existing.insight_priority, count) existing.updated_at = datetime.now() else: # Create new insight chunk = NordaKnowledgeChunk( content=f"Użytkownicy często pytają o: {pattern} ({count} razy w ostatnich {days_back} dniach)", summary=pattern, source_type=KnowledgeSourceType.CHAT_QUESTION.value, source_id=None, # Anonymized, no specific source insight_category=category, insight_priority=count, insight_status='new', confidence_score=min(50 + count * 5, 95), is_active=True ) db.add(chunk) stats['insights_added'] += 1 db.commit() logger.info(f"User question analysis complete: {stats}") except Exception as e: logger.error(f"Error analyzing user questions: {e}") db.rollback() raise finally: db.close() return stats def _extract_insight_patterns(self, question: str) -> List[tuple]: """ Extract anonymized patterns from user questions. Returns list of (pattern, category) tuples. """ patterns = [] q_lower = question.lower() # Feature requests feature_keywords = ['czy można', 'czy da się', 'chciałbym', 'przydałoby się', 'brakuje', 'potrzebuję'] for kw in feature_keywords: if kw in q_lower: patterns.append(('Prośba o nową funkcję', InsightCategory.FEATURE_REQUEST.value)) break # Bug reports bug_keywords = ['nie działa', 'błąd', 'problem', 'nie mogę', 'nie wyświetla', 'crash'] for kw in bug_keywords: if kw in q_lower: patterns.append(('Zgłoszenie problemu', InsightCategory.BUG_REPORT.value)) break # Company searches company_keywords = ['firma', 'firmy', 'szukam', 'potrzebuję', 'kto robi', 'kto oferuje'] for kw in company_keywords: if kw in q_lower: patterns.append(('Wyszukiwanie firm', InsightCategory.COMPANY_SEARCH.value)) break # Specific service searches (extract service type) service_patterns = [ ('budownictwo', 'Szukanie: usługi budowlane'), ('transport', 'Szukanie: transport/logistyka'), ('it', 'Szukanie: usługi IT'), ('marketing', 'Szukanie: marketing/reklama'), ('księgowość', 'Szukanie: księgowość'), ('prawo', 'Szukanie: usługi prawne'), ] for keyword, pattern in service_patterns: if keyword in q_lower: patterns.append((pattern, InsightCategory.COMPANY_SEARCH.value)) # Questions about portal portal_keywords = ['jak', 'gdzie', 'co to', 'po co', 'dlaczego'] for kw in portal_keywords: if kw in q_lower and ('portal' in q_lower or 'strona' in q_lower or 'norda' in q_lower): patterns.append(('Pytanie o portal', InsightCategory.QUESTION.value)) break return patterns def get_development_insights(self, status: Optional[str] = None) -> List[Dict[str, Any]]: """ Get development insights for roadmap planning. Args: status: Filter by status (new, reviewed, planned, implemented, rejected) Returns: List of insight dicts ordered by priority """ db = SessionLocal() try: query = db.query(NordaKnowledgeChunk).filter( NordaKnowledgeChunk.is_active == True, NordaKnowledgeChunk.insight_category.isnot(None) ) if status: query = query.filter(NordaKnowledgeChunk.insight_status == status) insights = query.order_by( NordaKnowledgeChunk.insight_priority.desc(), NordaKnowledgeChunk.created_at.desc() ).limit(100).all() return [ { 'id': i.id, 'summary': i.summary, 'content': i.content, 'category': i.insight_category, 'priority': i.insight_priority, 'status': i.insight_status, 'source_type': i.source_type, 'created_at': i.created_at.isoformat() if i.created_at else None } for i in insights ] finally: db.close() def update_insight_status(self, insight_id: int, status: str, note: Optional[str] = None) -> bool: """Update the status of a development insight""" db = SessionLocal() try: insight = db.query(NordaKnowledgeChunk).filter_by(id=insight_id).first() if not insight: return False insight.insight_status = status insight.updated_at = datetime.now() if note: insight.content = f"{insight.content}\n\n[Admin note: {note}]" db.commit() return True except Exception as e: logger.error(f"Error updating insight status: {e}") db.rollback() return False finally: db.close() def search_knowledge( self, query: str, limit: int = 5, source_types: Optional[List[str]] = None ) -> List[KnowledgeChunkResult]: """ Search knowledge base for relevant chunks. Args: query: Search query limit: Maximum results to return source_types: Filter by source types (None = all) Returns: List of KnowledgeChunkResult ordered by relevance """ db = SessionLocal() results = [] try: # Build base query base_query = db.query(NordaKnowledgeChunk).filter( NordaKnowledgeChunk.is_active == True ) # Filter by source types if source_types: base_query = base_query.filter( NordaKnowledgeChunk.source_type.in_(source_types) ) # Simple keyword search (can be enhanced with FTS later) query_words = query.lower().split() chunks = base_query.all() # Score each chunk scored_chunks = [] for chunk in chunks: score = self._calculate_relevance(query_words, chunk) if score > 0: scored_chunks.append((chunk, score)) # Sort by score and limit scored_chunks.sort(key=lambda x: x[1], reverse=True) top_chunks = scored_chunks[:limit] # Convert to results for chunk, score in top_chunks: results.append(KnowledgeChunkResult( chunk_id=chunk.id, content=chunk.content, summary=chunk.summary, source_type=chunk.source_type, source_url=chunk.source_url, relevance_score=score, created_at=chunk.source_created_at or chunk.created_at )) finally: db.close() return results def get_knowledge_stats(self) -> Dict[str, Any]: """Get statistics about the knowledge base""" db = SessionLocal() try: total = db.query(NordaKnowledgeChunk).filter( NordaKnowledgeChunk.is_active == True ).count() by_source = {} for source_type in KnowledgeSourceType: count = db.query(NordaKnowledgeChunk).filter( NordaKnowledgeChunk.is_active == True, NordaKnowledgeChunk.source_type == source_type.value ).count() by_source[source_type.value] = count verified = db.query(NordaKnowledgeChunk).filter( NordaKnowledgeChunk.is_active == True, NordaKnowledgeChunk.is_verified == True ).count() return { 'total_chunks': total, 'by_source': by_source, 'verified_chunks': verified, 'last_sync': datetime.now().isoformat() } finally: db.close() def _truncate_content(self, content: str) -> str: """Truncate content to max length""" if len(content) <= self.max_content_length: return content return content[:self.max_content_length - 3] + "..." def _extract_keywords(self, text: str) -> str: """Extract keywords from text (simple implementation)""" # Remove common words and extract significant terms stopwords = { 'i', 'a', 'the', 'to', 'w', 'z', 'na', 'do', 'jest', 'są', 'być', 'że', 'o', 'nie', 'się', 'jak', 'co', 'dla', 'po', 'od', 'za', 'ale', 'lub', 'oraz', 'czy', 'tak', 'już', 'tylko', 'też', 'jeszcze' } words = text.lower().split() keywords = [w for w in words if len(w) > 3 and w not in stopwords] # Return unique keywords, limited unique_keywords = list(dict.fromkeys(keywords))[:20] return ','.join(unique_keywords) def _generate_summary(self, content: str) -> Optional[str]: """Generate a short summary (first sentence or 100 chars)""" if not content: return None # Try to get first sentence sentences = content.split('.') if sentences: first = sentences[0].strip() if len(first) > 10: return first[:200] + ('...' if len(first) > 200 else '') return content[:100] + '...' def _contains_company_info(self, content: str) -> bool: """Check if content contains company-related information""" company_keywords = [ 'firma', 'firmy', 'firmę', 'spółka', 'przedsiębiorstwo', 'usługi', 'produkty', 'kontakt', 'telefon', 'email', 'norda', 'członek', 'biznes', 'współpraca' ] content_lower = content.lower() return any(kw in content_lower for kw in company_keywords) def _calculate_relevance(self, query_words: List[str], chunk: NordaKnowledgeChunk) -> float: """Calculate relevance score for a chunk""" score = 0.0 content_lower = (chunk.content or '').lower() keywords = (chunk.keywords or '').lower() summary = (chunk.summary or '').lower() for word in query_words: # Content matches if word in content_lower: score += 1.0 # Keyword matches (higher weight) if word in keywords: score += 2.0 # Summary matches if word in summary: score += 1.5 # Boost verified chunks if chunk.is_verified: score *= 1.2 # Boost by confidence score *= (chunk.confidence_score or 50) / 100 return score # Global instance _service_instance: Optional[NordaKnowledgeService] = None def get_knowledge_service() -> NordaKnowledgeService: """Get or create global NordaKnowledgeService instance""" global _service_instance if _service_instance is None: _service_instance = NordaKnowledgeService() return _service_instance # Convenience functions def sync_all_knowledge(days_back: int = 30) -> Dict[str, Any]: """Sync knowledge from all sources""" service = get_knowledge_service() return { 'forum': service.sync_forum_knowledge(days_back), 'chat': service.sync_chat_knowledge(days_back) } def search_knowledge(query: str, limit: int = 5) -> List[KnowledgeChunkResult]: """Search the knowledge base""" return get_knowledge_service().search_knowledge(query, limit)