Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
453 lines
14 KiB
Python
453 lines
14 KiB
Python
"""
|
|
Memory Service
|
|
==============
|
|
|
|
Manages persistent user memory for NordaGPT conversations.
|
|
|
|
Features:
|
|
- Extract key facts from conversations using Flash-Lite
|
|
- Generate and update conversation summaries
|
|
- Format memory for injection into AI prompts
|
|
- CRUD operations for user memory facts
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
from database import SessionLocal, AIUserMemory, AIConversationSummary, AIChatMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FLASH_LITE_MODEL = 'gemini-3.1-flash-lite-preview'
|
|
|
|
|
|
def get_user_memory(user_id: int, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Returns list of active, non-expired memory facts for a user.
|
|
|
|
Sorted by confidence desc, then by created_at desc (recency).
|
|
|
|
Args:
|
|
user_id: User ID
|
|
limit: Maximum number of facts to return
|
|
|
|
Returns:
|
|
List of dicts with fact data
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
now = datetime.now()
|
|
query = db.query(AIUserMemory).filter(
|
|
AIUserMemory.user_id == user_id,
|
|
AIUserMemory.is_active == True,
|
|
(AIUserMemory.expires_at == None) | (AIUserMemory.expires_at > now)
|
|
).order_by(
|
|
AIUserMemory.confidence.desc(),
|
|
AIUserMemory.created_at.desc()
|
|
).limit(limit)
|
|
|
|
facts = query.all()
|
|
return [
|
|
{
|
|
'id': f.id,
|
|
'fact': f.fact,
|
|
'category': f.category or 'general',
|
|
'confidence': float(f.confidence) if f.confidence else 1.0,
|
|
'created_at': f.created_at.isoformat() if f.created_at else None,
|
|
'expires_at': f.expires_at.isoformat() if f.expires_at else None,
|
|
'source_conversation_id': f.source_conversation_id
|
|
}
|
|
for f in facts
|
|
]
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def get_conversation_summaries(user_id: int, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""
|
|
Returns list of recent conversation summaries for a user.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
limit: Maximum number of summaries to return
|
|
|
|
Returns:
|
|
List of dicts with summary data
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
summaries = db.query(AIConversationSummary).filter(
|
|
AIConversationSummary.user_id == user_id
|
|
).order_by(
|
|
AIConversationSummary.updated_at.desc()
|
|
).limit(limit).all()
|
|
|
|
return [
|
|
{
|
|
'id': s.id,
|
|
'conversation_id': s.conversation_id,
|
|
'summary': s.summary,
|
|
'key_topics': s.key_topics or [],
|
|
'created_at': s.created_at.isoformat() if s.created_at else None,
|
|
'updated_at': s.updated_at.isoformat() if s.updated_at else None
|
|
}
|
|
for s in summaries
|
|
]
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def format_memory_for_prompt(user_id: int) -> str:
|
|
"""
|
|
Formats user memory and conversation summaries as a string for AI prompt injection.
|
|
|
|
Returns empty string if no memory exists.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
Formatted memory string or empty string
|
|
"""
|
|
facts = get_user_memory(user_id, limit=10)
|
|
summaries = get_conversation_summaries(user_id, limit=5)
|
|
|
|
if not facts and not summaries:
|
|
return ""
|
|
|
|
lines = ["\n# PAMIĘĆ O UŻYTKOWNIKU"]
|
|
|
|
if facts:
|
|
lines.append("Znane fakty:")
|
|
for f in facts:
|
|
category = f.get('category', 'general')
|
|
fact_text = f.get('fact', '')
|
|
lines.append(f"- [{category}] {fact_text}")
|
|
|
|
if summaries:
|
|
lines.append("\nOstatnie rozmowy:")
|
|
for s in summaries:
|
|
# Format date from updated_at
|
|
date_str = ''
|
|
if s.get('updated_at'):
|
|
try:
|
|
dt = datetime.fromisoformat(s['updated_at'])
|
|
date_str = dt.strftime('%Y-%m-%d')
|
|
except Exception:
|
|
date_str = s['updated_at'][:10] if s.get('updated_at') else ''
|
|
|
|
summary_text = s.get('summary', '')
|
|
topics = s.get('key_topics', [])
|
|
topics_str = ', '.join(topics) if topics else ''
|
|
|
|
if topics_str:
|
|
lines.append(f"- {date_str}: {summary_text} (tematy: {topics_str})")
|
|
else:
|
|
lines.append(f"- {date_str}: {summary_text}")
|
|
|
|
lines.append("\nWykorzystuj tę wiedzę do personalizacji odpowiedzi.")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def extract_facts_async(
|
|
conversation_id: int,
|
|
user_id: int,
|
|
user_context: Optional[Dict[str, Any]],
|
|
gemini_service
|
|
) -> None:
|
|
"""
|
|
Calls Flash-Lite to extract key facts from conversation and saves to DB.
|
|
|
|
Deduplicates against existing facts. Max 3 facts per extraction.
|
|
Handles all errors gracefully.
|
|
|
|
Args:
|
|
conversation_id: Conversation ID to extract facts from
|
|
user_id: User ID
|
|
user_context: Optional user context dict
|
|
gemini_service: GeminiService instance for API calls
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Load last 10 messages from the conversation
|
|
messages = db.query(AIChatMessage).filter(
|
|
AIChatMessage.conversation_id == conversation_id
|
|
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
|
|
|
|
if not messages:
|
|
return
|
|
|
|
# Build conversation text (chronological order)
|
|
conversation_text = ""
|
|
for msg in reversed(messages):
|
|
role = "Użytkownik" if msg.role == 'user' else "Asystent"
|
|
conversation_text += f"{role}: {msg.content}\n"
|
|
|
|
# Load existing facts for deduplication
|
|
existing_facts = get_user_memory(user_id, limit=20)
|
|
existing_texts = [f['fact'] for f in existing_facts]
|
|
existing_str = "\n".join(f"- {t}" for t in existing_texts) if existing_texts else "(brak)"
|
|
|
|
# Build extraction prompt
|
|
prompt = f"""Przeanalizuj poniższy fragment rozmowy i wyodrębnij maksymalnie 3 kluczowe fakty o UŻYTKOWNIKU (nie asystencie).
|
|
|
|
Fakty powinny dotyczyć: preferencji użytkownika, jego firmy, branży, zainteresowań biznesowych, planów, potrzeb.
|
|
|
|
ISTNIEJĄCE FAKTY (nie duplikuj):
|
|
{existing_str}
|
|
|
|
ROZMOWA:
|
|
{conversation_text}
|
|
|
|
Odpowiedz TYLKO w formacie JSON (bez markdown, bez komentarzy):
|
|
[
|
|
{{"fact": "treść faktu po polsku", "category": "kategoria"}},
|
|
...
|
|
]
|
|
|
|
Kategorie: business, interest, preference, contact, plan, general
|
|
Jeśli nie ma nowych faktów wartych zapamiętania, zwróć pustą tablicę: []"""
|
|
|
|
# Call Flash-Lite
|
|
response_text = gemini_service.generate_text(
|
|
prompt=prompt,
|
|
feature='memory_extraction',
|
|
user_id=user_id,
|
|
temperature=0.3,
|
|
model=FLASH_LITE_MODEL
|
|
)
|
|
|
|
if not response_text:
|
|
return
|
|
|
|
# Strip markdown code fences if present
|
|
cleaned = response_text.strip()
|
|
if cleaned.startswith("```"):
|
|
# Remove first and last fence lines
|
|
lines = cleaned.split("\n")
|
|
lines = [l for l in lines if not l.strip().startswith("```")]
|
|
cleaned = "\n".join(lines).strip()
|
|
|
|
# Parse JSON
|
|
try:
|
|
extracted = json.loads(cleaned)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Memory extraction: JSON parse failed for user {user_id}: {cleaned[:200]}")
|
|
return
|
|
|
|
if not isinstance(extracted, list):
|
|
return
|
|
|
|
# Save new facts (max 3, deduplicate)
|
|
saved_count = 0
|
|
for item in extracted[:3]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
fact_text = item.get('fact', '').strip()
|
|
category = item.get('category', 'general')
|
|
|
|
if not fact_text:
|
|
continue
|
|
|
|
# Simple deduplication: skip if very similar to existing
|
|
is_duplicate = any(
|
|
_similarity_check(fact_text, existing)
|
|
for existing in existing_texts
|
|
)
|
|
if is_duplicate:
|
|
continue
|
|
|
|
memory = AIUserMemory(
|
|
user_id=user_id,
|
|
fact=fact_text,
|
|
category=category,
|
|
source_conversation_id=conversation_id,
|
|
confidence=0.8,
|
|
is_active=True
|
|
)
|
|
db.add(memory)
|
|
saved_count += 1
|
|
|
|
if saved_count > 0:
|
|
db.commit()
|
|
logger.info(f"Memory: saved {saved_count} facts for user {user_id}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Memory extraction failed for user {user_id}, conv {conversation_id}: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def summarize_conversation_async(
|
|
conversation_id: int,
|
|
user_id: int,
|
|
gemini_service
|
|
) -> None:
|
|
"""
|
|
Generates or updates conversation summary using Flash-Lite.
|
|
|
|
Handles all errors gracefully.
|
|
|
|
Args:
|
|
conversation_id: Conversation ID to summarize
|
|
user_id: User ID
|
|
gemini_service: GeminiService instance for API calls
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Load messages from the conversation
|
|
messages = db.query(AIChatMessage).filter(
|
|
AIChatMessage.conversation_id == conversation_id
|
|
).order_by(AIChatMessage.created_at.asc()).all()
|
|
|
|
if not messages:
|
|
return
|
|
|
|
# Build conversation text
|
|
conversation_text = ""
|
|
for msg in messages:
|
|
role = "Użytkownik" if msg.role == 'user' else "Asystent"
|
|
conversation_text += f"{role}: {msg.content[:300]}\n"
|
|
|
|
# Trim if too long
|
|
if len(conversation_text) > 4000:
|
|
conversation_text = conversation_text[:4000] + "..."
|
|
|
|
prompt = f"""Napisz krótkie podsumowanie (max 2 zdania) poniższej rozmowy, skupiając się na głównym temacie i potrzebach użytkownika. Podaj też listę 3-5 słów kluczowych opisujących tematykę.
|
|
|
|
ROZMOWA:
|
|
{conversation_text}
|
|
|
|
Odpowiedz TYLKO w formacie JSON (bez markdown):
|
|
{{"summary": "podsumowanie rozmowy", "key_topics": ["temat1", "temat2", "temat3"]}}"""
|
|
|
|
response_text = gemini_service.generate_text(
|
|
prompt=prompt,
|
|
feature='memory_summary',
|
|
user_id=user_id,
|
|
temperature=0.3,
|
|
model=FLASH_LITE_MODEL
|
|
)
|
|
|
|
if not response_text:
|
|
return
|
|
|
|
# Strip markdown code fences
|
|
cleaned = response_text.strip()
|
|
if cleaned.startswith("```"):
|
|
lines = cleaned.split("\n")
|
|
lines = [l for l in lines if not l.strip().startswith("```")]
|
|
cleaned = "\n".join(lines).strip()
|
|
|
|
# Parse JSON
|
|
try:
|
|
result = json.loads(cleaned)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Summary: JSON parse failed for conv {conversation_id}: {cleaned[:200]}")
|
|
return
|
|
|
|
if not isinstance(result, dict):
|
|
return
|
|
|
|
summary_text = result.get('summary', '').strip()
|
|
key_topics = result.get('key_topics', [])
|
|
|
|
if not summary_text:
|
|
return
|
|
|
|
# Upsert summary (unique on conversation_id)
|
|
existing = db.query(AIConversationSummary).filter(
|
|
AIConversationSummary.conversation_id == conversation_id
|
|
).first()
|
|
|
|
if existing:
|
|
existing.summary = summary_text
|
|
existing.key_topics = key_topics
|
|
existing.updated_at = datetime.now()
|
|
else:
|
|
summary = AIConversationSummary(
|
|
conversation_id=conversation_id,
|
|
user_id=user_id,
|
|
summary=summary_text,
|
|
key_topics=key_topics
|
|
)
|
|
db.add(summary)
|
|
|
|
db.commit()
|
|
logger.info(f"Memory: updated summary for conv {conversation_id}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Summary generation failed for conv {conversation_id}: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def delete_user_fact(user_id: int, fact_id: int) -> bool:
|
|
"""
|
|
Soft-deletes a fact (sets is_active=False).
|
|
|
|
Args:
|
|
user_id: User ID (for ownership check)
|
|
fact_id: Fact ID to delete
|
|
|
|
Returns:
|
|
True if deleted, False if not found or not owned by user
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
fact = db.query(AIUserMemory).filter(
|
|
AIUserMemory.id == fact_id,
|
|
AIUserMemory.user_id == user_id,
|
|
AIUserMemory.is_active == True
|
|
).first()
|
|
|
|
if not fact:
|
|
return False
|
|
|
|
fact.is_active = False
|
|
db.commit()
|
|
logger.info(f"Memory: soft-deleted fact {fact_id} for user {user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Memory delete failed for fact {fact_id}: {e}")
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _similarity_check(new_fact: str, existing_fact: str, threshold: float = 0.6) -> bool:
|
|
"""
|
|
Simple word-overlap similarity check for deduplication.
|
|
|
|
Args:
|
|
new_fact: New fact text
|
|
existing_fact: Existing fact text
|
|
threshold: Overlap ratio threshold (0-1)
|
|
|
|
Returns:
|
|
True if facts are considered duplicates
|
|
"""
|
|
new_words = set(new_fact.lower().split())
|
|
existing_words = set(existing_fact.lower().split())
|
|
|
|
if not new_words or not existing_words:
|
|
return False
|
|
|
|
# Remove short/common words
|
|
stop_words = {'w', 'z', 'i', 'a', 'to', 'na', 'do', 'jest', 'się', 'że', 'nie', 'jak', 'co'}
|
|
new_words -= stop_words
|
|
existing_words -= stop_words
|
|
|
|
if not new_words or not existing_words:
|
|
return False
|
|
|
|
intersection = new_words & existing_words
|
|
overlap = len(intersection) / min(len(new_words), len(existing_words))
|
|
return overlap >= threshold
|