nordabiz/memory_service.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

453 lines
14 KiB
Python

"""
Memory Service
==============
Manages persistent user memory for NordaGPT conversations.
Features:
- Extract key facts from conversations using Flash-Lite
- Generate and update conversation summaries
- Format memory for injection into AI prompts
- CRUD operations for user memory facts
Author: Maciej Pienczyn, InPi sp. z o.o.
"""
import json
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from database import SessionLocal, AIUserMemory, AIConversationSummary, AIChatMessage
logger = logging.getLogger(__name__)
FLASH_LITE_MODEL = 'gemini-3.1-flash-lite-preview'
def get_user_memory(user_id: int, limit: int = 10) -> List[Dict[str, Any]]:
"""
Returns list of active, non-expired memory facts for a user.
Sorted by confidence desc, then by created_at desc (recency).
Args:
user_id: User ID
limit: Maximum number of facts to return
Returns:
List of dicts with fact data
"""
db = SessionLocal()
try:
now = datetime.now()
query = db.query(AIUserMemory).filter(
AIUserMemory.user_id == user_id,
AIUserMemory.is_active == True,
(AIUserMemory.expires_at == None) | (AIUserMemory.expires_at > now)
).order_by(
AIUserMemory.confidence.desc(),
AIUserMemory.created_at.desc()
).limit(limit)
facts = query.all()
return [
{
'id': f.id,
'fact': f.fact,
'category': f.category or 'general',
'confidence': float(f.confidence) if f.confidence else 1.0,
'created_at': f.created_at.isoformat() if f.created_at else None,
'expires_at': f.expires_at.isoformat() if f.expires_at else None,
'source_conversation_id': f.source_conversation_id
}
for f in facts
]
finally:
db.close()
def get_conversation_summaries(user_id: int, limit: int = 5) -> List[Dict[str, Any]]:
"""
Returns list of recent conversation summaries for a user.
Args:
user_id: User ID
limit: Maximum number of summaries to return
Returns:
List of dicts with summary data
"""
db = SessionLocal()
try:
summaries = db.query(AIConversationSummary).filter(
AIConversationSummary.user_id == user_id
).order_by(
AIConversationSummary.updated_at.desc()
).limit(limit).all()
return [
{
'id': s.id,
'conversation_id': s.conversation_id,
'summary': s.summary,
'key_topics': s.key_topics or [],
'created_at': s.created_at.isoformat() if s.created_at else None,
'updated_at': s.updated_at.isoformat() if s.updated_at else None
}
for s in summaries
]
finally:
db.close()
def format_memory_for_prompt(user_id: int) -> str:
"""
Formats user memory and conversation summaries as a string for AI prompt injection.
Returns empty string if no memory exists.
Args:
user_id: User ID
Returns:
Formatted memory string or empty string
"""
facts = get_user_memory(user_id, limit=10)
summaries = get_conversation_summaries(user_id, limit=5)
if not facts and not summaries:
return ""
lines = ["\n# PAMIĘĆ O UŻYTKOWNIKU"]
if facts:
lines.append("Znane fakty:")
for f in facts:
category = f.get('category', 'general')
fact_text = f.get('fact', '')
lines.append(f"- [{category}] {fact_text}")
if summaries:
lines.append("\nOstatnie rozmowy:")
for s in summaries:
# Format date from updated_at
date_str = ''
if s.get('updated_at'):
try:
dt = datetime.fromisoformat(s['updated_at'])
date_str = dt.strftime('%Y-%m-%d')
except Exception:
date_str = s['updated_at'][:10] if s.get('updated_at') else ''
summary_text = s.get('summary', '')
topics = s.get('key_topics', [])
topics_str = ', '.join(topics) if topics else ''
if topics_str:
lines.append(f"- {date_str}: {summary_text} (tematy: {topics_str})")
else:
lines.append(f"- {date_str}: {summary_text}")
lines.append("\nWykorzystuj tę wiedzę do personalizacji odpowiedzi.")
return "\n".join(lines) + "\n"
def extract_facts_async(
conversation_id: int,
user_id: int,
user_context: Optional[Dict[str, Any]],
gemini_service
) -> None:
"""
Calls Flash-Lite to extract key facts from conversation and saves to DB.
Deduplicates against existing facts. Max 3 facts per extraction.
Handles all errors gracefully.
Args:
conversation_id: Conversation ID to extract facts from
user_id: User ID
user_context: Optional user context dict
gemini_service: GeminiService instance for API calls
"""
db = SessionLocal()
try:
# Load last 10 messages from the conversation
messages = db.query(AIChatMessage).filter(
AIChatMessage.conversation_id == conversation_id
).order_by(AIChatMessage.created_at.desc()).limit(10).all()
if not messages:
return
# Build conversation text (chronological order)
conversation_text = ""
for msg in reversed(messages):
role = "Użytkownik" if msg.role == 'user' else "Asystent"
conversation_text += f"{role}: {msg.content}\n"
# Load existing facts for deduplication
existing_facts = get_user_memory(user_id, limit=20)
existing_texts = [f['fact'] for f in existing_facts]
existing_str = "\n".join(f"- {t}" for t in existing_texts) if existing_texts else "(brak)"
# Build extraction prompt
prompt = f"""Przeanalizuj poniższy fragment rozmowy i wyodrębnij maksymalnie 3 kluczowe fakty o UŻYTKOWNIKU (nie asystencie).
Fakty powinny dotyczyć: preferencji użytkownika, jego firmy, branży, zainteresowań biznesowych, planów, potrzeb.
ISTNIEJĄCE FAKTY (nie duplikuj):
{existing_str}
ROZMOWA:
{conversation_text}
Odpowiedz TYLKO w formacie JSON (bez markdown, bez komentarzy):
[
{{"fact": "treść faktu po polsku", "category": "kategoria"}},
...
]
Kategorie: business, interest, preference, contact, plan, general
Jeśli nie ma nowych faktów wartych zapamiętania, zwróć pustą tablicę: []"""
# Call Flash-Lite
response_text = gemini_service.generate_text(
prompt=prompt,
feature='memory_extraction',
user_id=user_id,
temperature=0.3,
model=FLASH_LITE_MODEL
)
if not response_text:
return
# Strip markdown code fences if present
cleaned = response_text.strip()
if cleaned.startswith("```"):
# Remove first and last fence lines
lines = cleaned.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
cleaned = "\n".join(lines).strip()
# Parse JSON
try:
extracted = json.loads(cleaned)
except json.JSONDecodeError:
logger.warning(f"Memory extraction: JSON parse failed for user {user_id}: {cleaned[:200]}")
return
if not isinstance(extracted, list):
return
# Save new facts (max 3, deduplicate)
saved_count = 0
for item in extracted[:3]:
if not isinstance(item, dict):
continue
fact_text = item.get('fact', '').strip()
category = item.get('category', 'general')
if not fact_text:
continue
# Simple deduplication: skip if very similar to existing
is_duplicate = any(
_similarity_check(fact_text, existing)
for existing in existing_texts
)
if is_duplicate:
continue
memory = AIUserMemory(
user_id=user_id,
fact=fact_text,
category=category,
source_conversation_id=conversation_id,
confidence=0.8,
is_active=True
)
db.add(memory)
saved_count += 1
if saved_count > 0:
db.commit()
logger.info(f"Memory: saved {saved_count} facts for user {user_id}")
except Exception as e:
logger.warning(f"Memory extraction failed for user {user_id}, conv {conversation_id}: {e}")
finally:
db.close()
def summarize_conversation_async(
conversation_id: int,
user_id: int,
gemini_service
) -> None:
"""
Generates or updates conversation summary using Flash-Lite.
Handles all errors gracefully.
Args:
conversation_id: Conversation ID to summarize
user_id: User ID
gemini_service: GeminiService instance for API calls
"""
db = SessionLocal()
try:
# Load messages from the conversation
messages = db.query(AIChatMessage).filter(
AIChatMessage.conversation_id == conversation_id
).order_by(AIChatMessage.created_at.asc()).all()
if not messages:
return
# Build conversation text
conversation_text = ""
for msg in messages:
role = "Użytkownik" if msg.role == 'user' else "Asystent"
conversation_text += f"{role}: {msg.content[:300]}\n"
# Trim if too long
if len(conversation_text) > 4000:
conversation_text = conversation_text[:4000] + "..."
prompt = f"""Napisz krótkie podsumowanie (max 2 zdania) poniższej rozmowy, skupiając się na głównym temacie i potrzebach użytkownika. Podaj też listę 3-5 słów kluczowych opisujących tematykę.
ROZMOWA:
{conversation_text}
Odpowiedz TYLKO w formacie JSON (bez markdown):
{{"summary": "podsumowanie rozmowy", "key_topics": ["temat1", "temat2", "temat3"]}}"""
response_text = gemini_service.generate_text(
prompt=prompt,
feature='memory_summary',
user_id=user_id,
temperature=0.3,
model=FLASH_LITE_MODEL
)
if not response_text:
return
# Strip markdown code fences
cleaned = response_text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
cleaned = "\n".join(lines).strip()
# Parse JSON
try:
result = json.loads(cleaned)
except json.JSONDecodeError:
logger.warning(f"Summary: JSON parse failed for conv {conversation_id}: {cleaned[:200]}")
return
if not isinstance(result, dict):
return
summary_text = result.get('summary', '').strip()
key_topics = result.get('key_topics', [])
if not summary_text:
return
# Upsert summary (unique on conversation_id)
existing = db.query(AIConversationSummary).filter(
AIConversationSummary.conversation_id == conversation_id
).first()
if existing:
existing.summary = summary_text
existing.key_topics = key_topics
existing.updated_at = datetime.now()
else:
summary = AIConversationSummary(
conversation_id=conversation_id,
user_id=user_id,
summary=summary_text,
key_topics=key_topics
)
db.add(summary)
db.commit()
logger.info(f"Memory: updated summary for conv {conversation_id}")
except Exception as e:
logger.warning(f"Summary generation failed for conv {conversation_id}: {e}")
finally:
db.close()
def delete_user_fact(user_id: int, fact_id: int) -> bool:
"""
Soft-deletes a fact (sets is_active=False).
Args:
user_id: User ID (for ownership check)
fact_id: Fact ID to delete
Returns:
True if deleted, False if not found or not owned by user
"""
db = SessionLocal()
try:
fact = db.query(AIUserMemory).filter(
AIUserMemory.id == fact_id,
AIUserMemory.user_id == user_id,
AIUserMemory.is_active == True
).first()
if not fact:
return False
fact.is_active = False
db.commit()
logger.info(f"Memory: soft-deleted fact {fact_id} for user {user_id}")
return True
except Exception as e:
logger.warning(f"Memory delete failed for fact {fact_id}: {e}")
return False
finally:
db.close()
def _similarity_check(new_fact: str, existing_fact: str, threshold: float = 0.6) -> bool:
"""
Simple word-overlap similarity check for deduplication.
Args:
new_fact: New fact text
existing_fact: Existing fact text
threshold: Overlap ratio threshold (0-1)
Returns:
True if facts are considered duplicates
"""
new_words = set(new_fact.lower().split())
existing_words = set(existing_fact.lower().split())
if not new_words or not existing_words:
return False
# Remove short/common words
stop_words = {'w', 'z', 'i', 'a', 'to', 'na', 'do', 'jest', 'się', 'że', 'nie', 'jak', 'co'}
new_words -= stop_words
existing_words -= stop_words
if not new_words or not existing_words:
return False
intersection = new_words & existing_words
overlap = len(intersection) / min(len(new_words), len(existing_words))
return overlap >= threshold