nordabiz/zopk_knowledge_service.py
Maciej Pienczyn 55088f0ccb
Some checks are pending
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: ZOPK knowledge base image display and data quality issues
- Fix broken news thumbnails by adding og:image extraction during content
  scraping (replaces Brave proxy URLs that block hotlinking)
- Add image onerror fallback in templates showing domain favicon when
  original image fails to load
- Decode Brave proxy image URLs to original source URLs before saving
- Enforce English-only entity types in AI extraction prompt to prevent
  mixed Polish/English type names
- Add migration 083 to normalize 14 existing Polish entity types and
  clean up 5 stale fetch jobs stuck in 'running' status
- Add backfill script for existing articles with broken image URLs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 08:57:41 +01:00

2859 lines
98 KiB
Python

"""
ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy.
Pipeline:
1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów
2. AI Extraction - ekstrakcja faktów i encji przez Gemini
3. Entity Linking - identyfikacja i deduplikacja encji
4. Relation Extraction - wykrywanie relacji między encjami
5. Embedding Generation - wektory dla semantic search (FAZA 2)
Usage:
from zopk_knowledge_service import ZOPKKnowledgeService
service = ZOPKKnowledgeService(db_session)
result = service.extract_from_news(news_id=123)
# lub batch:
result = service.batch_extract(limit=50)
"""
import re
import json
import logging
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
# Import progress tracking from scraper
from zopk_content_scraper import ProgressUpdate, ProgressCallback
from database import (
ZOPKNews,
ZOPKKnowledgeChunk,
ZOPKKnowledgeEntity,
ZOPKKnowledgeFact,
ZOPKKnowledgeEntityMention,
ZOPKKnowledgeRelation,
)
# Configure logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURATION
# ============================================================
# Chunk size settings
# NOTE: Reduced from 1000 to 500 tokens due to Gemini safety filter issues
# Long texts (~4000 chars) trigger safety blocks, ~2000 chars work reliably
MIN_CHUNK_SIZE = 200 # tokens
MAX_CHUNK_SIZE = 500 # tokens (~2000 chars for Polish text)
CHUNK_OVERLAP = 50 # tokens overlap between chunks (reduced proportionally)
APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation
# AI extraction settings
MAX_FACTS_PER_CHUNK = 10
MAX_ENTITIES_PER_CHUNK = 15
# Entity types
ENTITY_TYPES = [
'company', # Firma, organizacja biznesowa
'person', # Osoba
'place', # Miejsce, lokalizacja
'organization', # Instytucja rządowa, NGO
'project', # Projekt, inicjatywa
'technology', # Technologia, produkt
'event', # Wydarzenie, konferencja
]
# Fact types
FACT_TYPES = [
'statistic', # Liczby, dane ilościowe
'event', # Zdarzenie z datą
'statement', # Wypowiedź, deklaracja
'decision', # Decyzja, postanowienie
'milestone', # Kamień milowy projektu
'partnership', # Partnerstwo, współpraca
'investment', # Inwestycja
]
# Relation types
RELATION_TYPES = [
'investor_in', # A inwestuje w B
'partner_of', # A jest partnerem B
'located_in', # A znajduje się w B
'manages', # A zarządza B
'works_for', # A pracuje dla B
'part_of', # A jest częścią B
'cooperates_with', # A współpracuje z B
'produces', # A produkuje B
'supplies_to', # A dostarcza do B
]
# ============================================================
# AI PROMPTS
# ============================================================
# Ultra-simplified prompt to avoid Gemini safety filter issues
# Note: Complex JSON schemas with pipe characters were triggering filters
# Note: max_tokens parameter also triggers filters - don't use it!
EXTRACTION_USER_PROMPT = """Przeanalizuj artykuł i wyodrębnij informacje w formacie JSON.
ARTYKUŁ:
{chunk_text}
Zwróć JSON z następującą strukturą:
{{
"facts": [
{{"text": "pełny fakt", "type": "investment"}}
],
"entities": [
{{"name": "Nazwa", "type": "company"}}
],
"summary": "krótkie podsumowanie"
}}
Typy faktów (TYLKO te angielskie nazwy): investment, decision, event, statistic, partnership, milestone
Typy encji (TYLKO te angielskie nazwy): company, person, place, organization, project
WAŻNE: Nigdy nie używaj polskich nazw typów (np. Organizacja, Lokalizacja, Osoba). Zawsze angielskie."""
# System prompt is now empty - the user prompt contains all necessary instructions
EXTRACTION_SYSTEM_PROMPT = ""
# ============================================================
# DATA CLASSES
# ============================================================
@dataclass
class ChunkData:
"""Data for a single chunk."""
content: str
index: int
token_count: int
@dataclass
class ExtractionResult:
"""Result of knowledge extraction from a news article."""
success: bool
news_id: int
chunks_created: int = 0
facts_created: int = 0
entities_created: int = 0
relations_created: int = 0
error: Optional[str] = None
processing_time: float = 0.0
# ============================================================
# CHUNKING FUNCTIONS
# ============================================================
def estimate_tokens(text: str) -> int:
"""Estimate token count for text."""
if not text:
return 0
# Polish text: ~4 chars per token on average
return len(text) // APPROX_CHARS_PER_TOKEN
def split_into_sentences(text: str) -> List[str]:
"""Split text into sentences."""
# Polish sentence boundaries
sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])'
sentences = re.split(sentence_pattern, text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks(text: str) -> List[ChunkData]:
"""
Split text into chunks of appropriate size.
Strategy:
- Split by paragraphs first
- Combine small paragraphs
- Split large paragraphs by sentences
- Maintain overlap between chunks
"""
if not text:
return []
chunks = []
current_chunk = []
current_tokens = 0
# Split by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
para = para.strip()
if not para:
continue
para_tokens = estimate_tokens(para)
# If paragraph is too large, split by sentences
if para_tokens > MAX_CHUNK_SIZE:
sentences = split_into_sentences(para)
for sentence in sentences:
sent_tokens = estimate_tokens(sentence)
if current_tokens + sent_tokens > MAX_CHUNK_SIZE:
# Save current chunk
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
# Overlap: keep last sentence
current_chunk = [current_chunk[-1]] if current_chunk else []
current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0
current_chunk.append(sentence)
current_tokens += sent_tokens
else:
# Add paragraph to current chunk
if current_tokens + para_tokens > MAX_CHUNK_SIZE:
# Save current chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
current_chunk = []
current_tokens = 0
current_chunk.append(para)
current_tokens += para_tokens
# Save last chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk)
if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(ChunkData(
content=chunk_text,
index=len(chunks),
token_count=current_tokens
))
return chunks
# ============================================================
# KNOWLEDGE SERVICE CLASS
# ============================================================
class ZOPKKnowledgeService:
"""
Service for extracting knowledge from ZOPK news articles.
Features:
- Text chunking with overlap
- AI-powered fact extraction (Gemini)
- Named entity recognition
- Relation extraction
- Entity deduplication
"""
def __init__(self, db_session, user_id: Optional[int] = None):
"""
Initialize service.
Args:
db_session: SQLAlchemy database session
user_id: Optional user ID for cost tracking
"""
self.db = db_session
self.user_id = user_id
self._gemini_service = None
@property
def gemini(self):
"""Lazy-load Gemini service."""
if self._gemini_service is None:
from gemini_service import GeminiService
self._gemini_service = GeminiService()
return self._gemini_service
def _normalize_entity_name(self, name: str) -> str:
"""Normalize entity name for deduplication."""
if not name:
return ''
# Lowercase, remove extra spaces
normalized = name.lower().strip()
normalized = re.sub(r'\s+', ' ', normalized)
# Remove Polish diacritics for matching
trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ')
normalized = normalized.translate(trans)
return normalized
def _find_or_create_entity(
self,
name: str,
entity_type: str,
description: Optional[str] = None
) -> ZOPKKnowledgeEntity:
"""
Find existing entity or create new one.
Uses normalized name for deduplication.
"""
normalized = self._normalize_entity_name(name)
# Try to find existing
existing = self.db.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.normalized_name == normalized
).first()
if existing:
# Update mention count
existing.mentions_count = (existing.mentions_count or 0) + 1
existing.last_mentioned_at = datetime.now()
# Update description if better
if description and (not existing.description or len(description) > len(existing.description)):
existing.description = description
return existing
# Create new entity
entity = ZOPKKnowledgeEntity(
entity_type=entity_type,
name=name,
normalized_name=normalized,
description=description,
short_description=description[:500] if description else None,
mentions_count=1,
first_mentioned_at=datetime.now(),
last_mentioned_at=datetime.now()
)
self.db.add(entity)
self.db.flush() # Get ID
return entity
def _extract_with_ai(
self,
chunk: ChunkData,
source_name: str,
published_date: str
) -> Optional[Dict]:
"""
Extract facts, entities, and relations using Gemini AI.
Returns parsed JSON or None on error.
"""
try:
# Truncate chunk to avoid Gemini safety filter issues with long texts
# Testing showed ~4000 chars triggers safety blocks, ~2000 chars works
MAX_PROMPT_CHARS = 2000
chunk_text = chunk.content[:MAX_PROMPT_CHARS]
if len(chunk.content) > MAX_PROMPT_CHARS:
logger.debug(f"Truncated chunk from {len(chunk.content)} to {MAX_PROMPT_CHARS} chars")
# Simplified single prompt (system prompt removed to avoid safety filter issues)
prompt = EXTRACTION_USER_PROMPT.format(
chunk_text=chunk_text,
source_name=source_name,
published_date=published_date
)
# NOTE: max_tokens removed - testing showed it triggers safety filters!
response = self.gemini.generate_text(
prompt=prompt,
temperature=0.1, # Low temperature for consistency
user_id=self.user_id,
feature='zopk_knowledge_extraction'
)
if not response:
logger.warning("Empty response from Gemini")
return None
# Parse JSON from response
# Handle markdown code blocks
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response)
if json_match:
json_str = json_match.group(1)
else:
json_str = response
# Clean and parse
json_str = json_str.strip()
data = json.loads(json_str)
return data
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
return None
except Exception as e:
logger.error(f"AI extraction error: {e}")
return None
def _save_chunk(
self,
news: ZOPKNews,
chunk: ChunkData,
extraction_data: Optional[Dict]
) -> ZOPKKnowledgeChunk:
"""Save chunk to database."""
db_chunk = ZOPKKnowledgeChunk(
source_news_id=news.id,
content=chunk.content,
chunk_index=chunk.index,
token_count=chunk.token_count,
chunk_type='narrative', # Default
summary=extraction_data.get('summary') if extraction_data else None,
keywords=extraction_data.get('keywords') if extraction_data else None,
language='pl',
extraction_model='gemini-3-flash-preview',
extracted_at=datetime.now()
)
self.db.add(db_chunk)
self.db.flush()
return db_chunk
def _save_facts(
self,
chunk: ZOPKKnowledgeChunk,
news: ZOPKNews,
facts_data: List[Dict]
) -> int:
"""Save extracted facts to database."""
count = 0
for fact in facts_data[:MAX_FACTS_PER_CHUNK]:
try:
# Parse numeric value
numeric_value = fact.get('numeric_value')
if numeric_value is not None:
try:
numeric_value = float(numeric_value)
except (ValueError, TypeError):
numeric_value = None
# Parse date
date_value = fact.get('date_value')
if date_value:
try:
from datetime import datetime as dt
date_value = dt.strptime(date_value, '%Y-%m-%d').date()
except (ValueError, TypeError):
date_value = None
# Support both old format (full_text) and new simplified format (text)
fact_text = fact.get('text') or fact.get('full_text', '')
db_fact = ZOPKKnowledgeFact(
source_chunk_id=chunk.id,
source_news_id=news.id,
fact_type=fact.get('type', 'statement'),
subject=fact.get('subject'),
predicate=fact.get('predicate'),
object=fact.get('object'),
full_text=fact_text,
numeric_value=numeric_value,
numeric_unit=fact.get('numeric_unit'),
date_value=date_value,
confidence_score=fact.get('confidence', 0.5)
)
self.db.add(db_fact)
count += 1
except Exception as e:
logger.error(f"Error saving fact: {e}")
continue
return count
def _save_entities_and_relations(
self,
chunk: ZOPKKnowledgeChunk,
entities_data: List[Dict],
relations_data: List[Dict]
) -> Tuple[int, int]:
"""Save entities and relations to database."""
entity_count = 0
relation_count = 0
# Map of name -> entity for relations
entity_map = {}
# Save entities
for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]:
try:
entity = self._find_or_create_entity(
name=ent.get('name', ''),
entity_type=ent.get('type', 'organization'),
description=ent.get('description')
)
entity_map[ent.get('name', '').lower()] = entity
# Create mention link
mention = ZOPKKnowledgeEntityMention(
chunk_id=chunk.id,
entity_id=entity.id,
mention_text=ent.get('name'),
mention_type='direct',
role_in_context=ent.get('role'),
confidence=0.9
)
self.db.add(mention)
entity_count += 1
except Exception as e:
logger.error(f"Error saving entity: {e}")
continue
# Flush to get entity IDs
self.db.flush()
# Save relations
for rel in relations_data:
try:
entity_a_name = rel.get('entity_a', '').lower()
entity_b_name = rel.get('entity_b', '').lower()
entity_a = entity_map.get(entity_a_name)
entity_b = entity_map.get(entity_b_name)
if not entity_a or not entity_b:
continue
# Check if relation already exists
existing = self.db.query(ZOPKKnowledgeRelation).filter(
ZOPKKnowledgeRelation.entity_a_id == entity_a.id,
ZOPKKnowledgeRelation.entity_b_id == entity_b.id,
ZOPKKnowledgeRelation.relation_type == rel.get('relation')
).first()
if not existing:
db_relation = ZOPKKnowledgeRelation(
entity_a_id=entity_a.id,
entity_b_id=entity_b.id,
relation_type=rel.get('relation', 'cooperates_with'),
evidence_text=rel.get('description'),
source_chunk_id=chunk.id,
confidence=0.8,
strength=3
)
self.db.add(db_relation)
relation_count += 1
except Exception as e:
logger.error(f"Error saving relation: {e}")
continue
return entity_count, relation_count
def extract_from_news(self, news_id: int) -> ExtractionResult:
"""
Extract knowledge from a single news article.
Args:
news_id: ID of ZOPKNews record
Returns:
ExtractionResult with statistics
"""
import time
start_time = time.time()
# Get news record
news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first()
if not news:
return ExtractionResult(
success=False,
news_id=news_id,
error=f"News record {news_id} not found"
)
# Check if already extracted
if news.knowledge_extracted:
return ExtractionResult(
success=True,
news_id=news_id,
error="Już wyekstrahowano"
)
# Check if content is scraped
if not news.full_content:
return ExtractionResult(
success=False,
news_id=news_id,
error="Brak zescrapowanej treści"
)
logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...")
# Create chunks
chunks = create_chunks(news.full_content)
if not chunks:
return ExtractionResult(
success=False,
news_id=news_id,
error="Treść za krótka do ekstrakcji"
)
# Statistics
chunks_created = 0
facts_created = 0
entities_created = 0
relations_created = 0
# Process each chunk
for chunk in chunks:
try:
# Extract with AI
extraction_data = self._extract_with_ai(
chunk=chunk,
source_name=news.source_name or 'unknown',
published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown'
)
# Save chunk
db_chunk = self._save_chunk(news, chunk, extraction_data)
chunks_created += 1
if extraction_data:
# Save facts
facts_created += self._save_facts(
db_chunk,
news,
extraction_data.get('facts', [])
)
# Save entities and relations
ent_count, rel_count = self._save_entities_and_relations(
db_chunk,
extraction_data.get('entities', []),
extraction_data.get('relations', [])
)
entities_created += ent_count
relations_created += rel_count
except Exception as e:
logger.error(f"Error processing chunk {chunk.index}: {e}")
continue
# Mark as extracted
news.knowledge_extracted = True
news.knowledge_extracted_at = datetime.now()
self.db.commit()
processing_time = time.time() - start_time
logger.info(
f"Extracted from news {news_id}: "
f"{chunks_created} chunks, {facts_created} facts, "
f"{entities_created} entities, {relations_created} relations "
f"in {processing_time:.2f}s"
)
return ExtractionResult(
success=True,
news_id=news_id,
chunks_created=chunks_created,
facts_created=facts_created,
entities_created=entities_created,
relations_created=relations_created,
processing_time=processing_time
)
def batch_extract(self, limit: int = 50, progress_callback: ProgressCallback = None) -> Dict:
"""
Batch extract knowledge from scraped articles.
Args:
limit: Maximum number of articles to process
progress_callback: Optional callback for progress updates
Returns:
Dict with statistics
"""
import time
logger.info(f"Starting batch extraction: limit={limit}")
# Find articles ready for extraction
articles = self.db.query(ZOPKNews).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).order_by(
ZOPKNews.created_at.desc()
).limit(limit).all()
total = len(articles)
stats = {
'total': total,
'success': 0,
'failed': 0,
'chunks_created': 0,
'facts_created': 0,
'entities_created': 0,
'relations_created': 0,
'errors': [],
'processing_time': 0
}
# Send initial progress
if progress_callback and total > 0:
progress_callback(ProgressUpdate(
current=0,
total=total,
percent=0.0,
stage='extracting',
status='processing',
message=f'Rozpoczynam ekstrakcję wiedzy z {total} artykułów...',
details={'success': 0, 'failed': 0, 'chunks': 0, 'facts': 0, 'entities': 0}
))
start_time = time.time()
for idx, article in enumerate(articles, 1):
# Send progress update before processing
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round((idx - 1) / total * 100, 1),
stage='extracting',
status='processing',
message=f'Analizuję przez AI: {article.title[:50]}...',
article_id=article.id,
article_title=article.title[:80],
details={
'success': stats['success'],
'failed': stats['failed'],
'chunks': stats['chunks_created'],
'facts': stats['facts_created'],
'entities': stats['entities_created']
}
))
result = self.extract_from_news(article.id)
if result.success:
stats['success'] += 1
stats['chunks_created'] += result.chunks_created
stats['facts_created'] += result.facts_created
stats['entities_created'] += result.entities_created
stats['relations_created'] += result.relations_created
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='extracting',
status='success',
message=f'✓ Wyekstrahowano: {result.chunks_created} chunks, {result.facts_created} faktów, {result.entities_created} encji',
article_id=article.id,
article_title=article.title[:80],
details={
'success': stats['success'],
'failed': stats['failed'],
'chunks': stats['chunks_created'],
'facts': stats['facts_created'],
'entities': stats['entities_created'],
'new_chunks': result.chunks_created,
'new_facts': result.facts_created,
'new_entities': result.entities_created
}
))
else:
stats['failed'] += 1
if result.error:
stats['errors'].append({
'id': article.id,
'title': article.title[:100],
'error': result.error
})
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='extracting',
status='failed',
message=f'✗ Błąd ekstrakcji: {result.error[:50]}...' if result.error else '✗ Błąd',
article_id=article.id,
article_title=article.title[:80],
details={
'success': stats['success'],
'failed': stats['failed'],
'error': result.error
}
))
stats['processing_time'] = round(time.time() - start_time, 2)
# Send completion progress
if progress_callback:
progress_callback(ProgressUpdate(
current=total,
total=total,
percent=100.0,
stage='extracting',
status='complete',
message=f'Zakończono: {stats["success"]}/{total} artykułów. '
f'Utworzono: {stats["chunks_created"]} chunks, {stats["facts_created"]} faktów, '
f'{stats["entities_created"]} encji',
details={
'success': stats['success'],
'failed': stats['failed'],
'chunks': stats['chunks_created'],
'facts': stats['facts_created'],
'entities': stats['entities_created'],
'relations': stats['relations_created'],
'processing_time': stats['processing_time']
}
))
logger.info(
f"Batch extraction complete: {stats['success']}/{stats['total']} success "
f"in {stats['processing_time']}s"
)
return stats
def get_extraction_statistics(self) -> Dict:
"""Get knowledge extraction statistics."""
from sqlalchemy import func
# Articles stats
total_approved = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved'])
).scalar()
scraped = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped'
).scalar()
# Pending scrape: approved but not yet scraped
pending_scrape = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.status.in_(['approved', 'auto_approved']),
ZOPKNews.scrape_status.in_(['pending', None])
).scalar()
extracted = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.knowledge_extracted == True
).scalar()
pending_extract = self.db.query(func.count(ZOPKNews.id)).filter(
ZOPKNews.scrape_status == 'scraped',
ZOPKNews.knowledge_extracted == False
).scalar()
# Knowledge base stats
total_chunks = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar()
total_facts = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar()
total_entities = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar()
total_relations = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar()
# Embeddings stats
chunks_with_embeddings = self.db.query(func.count(ZOPKKnowledgeChunk.id)).filter(
ZOPKKnowledgeChunk.embedding.isnot(None)
).scalar()
chunks_without_embeddings = (total_chunks or 0) - (chunks_with_embeddings or 0)
# Top entities by mentions
top_entities = self.db.query(
ZOPKKnowledgeEntity.name,
ZOPKKnowledgeEntity.entity_type,
ZOPKKnowledgeEntity.mentions_count
).order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).limit(10).all()
return {
'articles': {
'total_approved': total_approved or 0,
'scraped': scraped or 0,
'pending_scrape': pending_scrape or 0,
'extracted': extracted or 0,
'pending_extract': pending_extract or 0
},
'knowledge_base': {
'total_chunks': total_chunks or 0,
'total_facts': total_facts or 0,
'total_entities': total_entities or 0,
'total_relations': total_relations or 0,
'chunks_with_embeddings': chunks_with_embeddings or 0,
'chunks_without_embeddings': chunks_without_embeddings or 0
},
'top_entities': [
{'name': e[0], 'type': e[1], 'mentions': e[2]}
for e in top_entities
]
}
# ============================================================
# SEMANTIC SEARCH (FAZA 2)
# ============================================================
def search_knowledge(
db_session,
query: str,
limit: int = 5,
min_similarity: float = 0.3,
user_id: Optional[int] = None
) -> List[Dict]:
"""
Semantic search in ZOPK knowledge base.
Args:
db_session: SQLAlchemy session
query: Search query
limit: Max results to return
min_similarity: Minimum cosine similarity (0-1)
user_id: User ID for cost tracking
Returns:
List of matching chunks with similarity scores
"""
from gemini_service import GeminiService
import json
# Generate query embedding
gemini = GeminiService()
query_embedding = gemini.generate_embedding(
text=query,
task_type='retrieval_query',
user_id=user_id,
feature='zopk_knowledge_search'
)
if not query_embedding:
logger.warning("Failed to generate query embedding")
return []
# Search in database
# Note: This uses PostgreSQL pgvector for efficient similarity search
# For now, we'll do a simpler approach with JSON embeddings
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.embedding.isnot(None)
).all()
results = []
for chunk in chunks:
try:
# Parse stored embedding
if isinstance(chunk.embedding, str):
chunk_embedding = json.loads(chunk.embedding)
else:
chunk_embedding = chunk.embedding
if not chunk_embedding:
continue
# Calculate cosine similarity
similarity = _cosine_similarity(query_embedding, chunk_embedding)
if similarity >= min_similarity:
results.append({
'chunk_id': chunk.id,
'content': chunk.content[:500],
'summary': chunk.summary,
'keywords': chunk.keywords,
'similarity': round(similarity, 4),
'source_news_id': chunk.source_news_id,
'importance': chunk.importance_score
})
except Exception as e:
logger.error(f"Error processing chunk {chunk.id}: {e}")
continue
# Sort by similarity
results.sort(key=lambda x: x['similarity'], reverse=True)
return results[:limit]
def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""Calculate cosine similarity between two vectors."""
import math
if len(vec1) != len(vec2):
return 0.0
dot_product = sum(a * b for a, b in zip(vec1, vec2))
norm1 = math.sqrt(sum(a * a for a in vec1))
norm2 = math.sqrt(sum(b * b for b in vec2))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def get_relevant_facts(
db_session,
query: str,
limit: int = 10
) -> List[Dict]:
"""
Get facts relevant to a query.
Uses keyword matching for now, can be enhanced with embeddings.
"""
from sqlalchemy import or_
# Simple keyword search
keywords = query.lower().split()
# NOTE: Removed is_verified filter - auto-extracted facts are usable
# Future: add manual verification workflow and re-enable filter
facts = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.confidence_score >= 0.3 # Minimum confidence threshold
).all()
results = []
for fact in facts:
score = 0
fact_text = (fact.full_text or '').lower()
for keyword in keywords:
if keyword in fact_text:
score += 1
if score > 0:
# Get source URL from related news
source_url = ''
source_name = ''
source_date = ''
if fact.source_news:
source_url = fact.source_news.url or ''
source_name = fact.source_news.source_name or fact.source_news.source_domain or ''
if fact.source_news.published_at:
source_date = fact.source_news.published_at.strftime('%Y-%m-%d')
results.append({
'fact_id': fact.id,
'fact_type': fact.fact_type,
'full_text': fact.full_text,
'subject': fact.subject,
'numeric_value': float(fact.numeric_value) if fact.numeric_value else None,
'numeric_unit': fact.numeric_unit,
'confidence': float(fact.confidence_score) if fact.confidence_score else None,
'relevance_score': score,
'source_url': source_url,
'source_name': source_name,
'source_date': source_date
})
# Sort by relevance
results.sort(key=lambda x: x['relevance_score'], reverse=True)
return results[:limit]
def generate_chunk_embeddings(
db_session,
limit: int = 100,
user_id: Optional[int] = None,
progress_callback: ProgressCallback = None
) -> Dict:
"""
Generate embeddings for chunks that don't have them.
Args:
db_session: SQLAlchemy session
limit: Max chunks to process
user_id: User ID for cost tracking
progress_callback: Optional callback for progress updates
Returns:
Dict with statistics
"""
import json
import time
from gemini_service import GeminiService
gemini = GeminiService()
# Find chunks without embeddings
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.embedding.is_(None)
).limit(limit).all()
total = len(chunks)
stats = {
'total': total,
'success': 0,
'failed': 0,
'processing_time': 0
}
# Send initial progress
if progress_callback and total > 0:
progress_callback(ProgressUpdate(
current=0,
total=total,
percent=0.0,
stage='embedding',
status='processing',
message=f'Rozpoczynam generowanie embeddingów dla {total} chunks...',
details={'success': 0, 'failed': 0}
))
start_time = time.time()
for idx, chunk in enumerate(chunks, 1):
# Send progress update before processing
if progress_callback:
# Get article title from chunk's source news
article_title = None
if chunk.source_news:
article_title = chunk.source_news.title[:80]
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round((idx - 1) / total * 100, 1),
stage='embedding',
status='processing',
message=f'Generuję embedding {idx}/{total}: {chunk.summary[:40] if chunk.summary else "chunk"}...',
article_id=chunk.source_news_id,
article_title=article_title,
details={
'success': stats['success'],
'failed': stats['failed'],
'chunk_id': chunk.id
}
))
try:
embedding = gemini.generate_embedding(
text=chunk.content,
task_type='retrieval_document',
title=chunk.summary,
user_id=user_id,
feature='zopk_chunk_embedding'
)
if embedding:
# Store as JSON string
chunk.embedding = json.dumps(embedding)
stats['success'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='embedding',
status='success',
message=f'✓ Wygenerowano embedding (768 dim)',
article_id=chunk.source_news_id,
details={
'success': stats['success'],
'failed': stats['failed'],
'chunk_id': chunk.id
}
))
else:
stats['failed'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='embedding',
status='failed',
message='✗ Nie udało się wygenerować embeddingu',
article_id=chunk.source_news_id,
details={'success': stats['success'], 'failed': stats['failed']}
))
except Exception as e:
logger.error(f"Error generating embedding for chunk {chunk.id}: {e}")
stats['failed'] += 1
if progress_callback:
progress_callback(ProgressUpdate(
current=idx,
total=total,
percent=round(idx / total * 100, 1),
stage='embedding',
status='failed',
message=f'✗ Błąd: {str(e)[:50]}...',
article_id=chunk.source_news_id,
details={'success': stats['success'], 'failed': stats['failed'], 'error': str(e)}
))
db_session.commit()
stats['processing_time'] = round(time.time() - start_time, 2)
# Send completion progress
if progress_callback:
progress_callback(ProgressUpdate(
current=total,
total=total,
percent=100.0,
stage='embedding',
status='complete',
message=f'Zakończono: {stats["success"]}/{total} embeddingów wygenerowanych',
details={
'success': stats['success'],
'failed': stats['failed'],
'processing_time': stats['processing_time']
}
))
logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success")
return stats
# ============================================================
# STANDALONE FUNCTIONS FOR CRON/CLI
# ============================================================
def extract_pending_articles(db_session, limit: int = 50) -> Dict:
"""
Convenience function for cron jobs.
Usage:
from zopk_knowledge_service import extract_pending_articles
result = extract_pending_articles(db_session, limit=50)
"""
service = ZOPKKnowledgeService(db_session)
return service.batch_extract(limit=limit)
def get_knowledge_stats(db_session) -> Dict:
"""
Get knowledge extraction statistics for monitoring.
"""
service = ZOPKKnowledgeService(db_session)
return service.get_extraction_statistics()
# ============================================================
# ADMIN PANEL - LIST FUNCTIONS
# ============================================================
def list_chunks(
db_session,
page: int = 1,
per_page: int = 20,
source_news_id: Optional[int] = None,
has_embedding: Optional[bool] = None,
is_verified: Optional[bool] = None
) -> Dict:
"""
List knowledge chunks with pagination and filtering.
Args:
db_session: Database session
page: Page number (1-based)
per_page: Items per page
source_news_id: Filter by source article
has_embedding: Filter by embedding status
is_verified: Filter by verification status
Returns:
{
'chunks': [...],
'total': int,
'page': int,
'per_page': int,
'pages': int
}
"""
from sqlalchemy import func
query = db_session.query(ZOPKKnowledgeChunk)
# Apply filters
if source_news_id:
query = query.filter(ZOPKKnowledgeChunk.source_news_id == source_news_id)
if has_embedding is not None:
if has_embedding:
query = query.filter(ZOPKKnowledgeChunk.embedding.isnot(None))
else:
query = query.filter(ZOPKKnowledgeChunk.embedding.is_(None))
if is_verified is not None:
query = query.filter(ZOPKKnowledgeChunk.is_verified == is_verified)
# Get total count
total = query.count()
# Calculate pagination
pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
# Get chunks with source news info
chunks = query.order_by(
ZOPKKnowledgeChunk.created_at.desc()
).offset(offset).limit(per_page).all()
return {
'chunks': [
{
'id': c.id,
'content': c.content[:300] + '...' if len(c.content) > 300 else c.content,
'full_content': c.content,
'summary': c.summary,
'chunk_type': c.chunk_type,
'chunk_index': c.chunk_index,
'token_count': c.token_count,
'importance_score': c.importance_score,
'confidence_score': float(c.confidence_score) if c.confidence_score else None,
'has_embedding': c.embedding is not None,
'is_verified': c.is_verified,
'source_news_id': c.source_news_id,
'source_title': c.source_news.title if c.source_news else None,
'source_url': c.source_news.url if c.source_news else None,
'created_at': c.created_at.isoformat() if c.created_at else None,
'keywords': c.keywords if isinstance(c.keywords, list) else []
}
for c in chunks
],
'total': total,
'page': page,
'per_page': per_page,
'pages': pages
}
def list_facts(
db_session,
page: int = 1,
per_page: int = 20,
fact_type: Optional[str] = None,
is_verified: Optional[bool] = None,
source_news_id: Optional[int] = None
) -> Dict:
"""
List knowledge facts with pagination and filtering.
Args:
db_session: Database session
page: Page number (1-based)
per_page: Items per page
fact_type: Filter by fact type (statistic, event, statement, decision, milestone)
is_verified: Filter by verification status
source_news_id: Filter by source article
Returns:
{
'facts': [...],
'total': int,
'page': int,
'per_page': int,
'pages': int,
'fact_types': [...] - available types for filtering
}
"""
from sqlalchemy import func, distinct
query = db_session.query(ZOPKKnowledgeFact)
# Apply filters
if fact_type:
query = query.filter(ZOPKKnowledgeFact.fact_type == fact_type)
if is_verified is not None:
query = query.filter(ZOPKKnowledgeFact.is_verified == is_verified)
if source_news_id:
query = query.filter(ZOPKKnowledgeFact.source_news_id == source_news_id)
# Get total count
total = query.count()
# Get available fact types
fact_types = db_session.query(
distinct(ZOPKKnowledgeFact.fact_type)
).filter(
ZOPKKnowledgeFact.fact_type.isnot(None)
).all()
fact_types = [f[0] for f in fact_types if f[0]]
# Calculate pagination
pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
# Get facts
facts = query.order_by(
ZOPKKnowledgeFact.created_at.desc()
).offset(offset).limit(per_page).all()
return {
'facts': [
{
'id': f.id,
'fact_type': f.fact_type,
'subject': f.subject,
'predicate': f.predicate,
'object': f.object,
'full_text': f.full_text,
'numeric_value': float(f.numeric_value) if f.numeric_value else None,
'numeric_unit': f.numeric_unit,
'date_value': f.date_value.isoformat() if f.date_value else None,
'confidence_score': float(f.confidence_score) if f.confidence_score else None,
'is_verified': f.is_verified,
'source_news_id': f.source_news_id,
'source_chunk_id': f.source_chunk_id,
'source_title': f.source_news.title if f.source_news else None,
'source_url': f.source_news.url if f.source_news else None,
'entities_involved': f.entities_involved if isinstance(f.entities_involved, list) else [],
'created_at': f.created_at.isoformat() if f.created_at else None
}
for f in facts
],
'total': total,
'page': page,
'per_page': per_page,
'pages': pages,
'fact_types': fact_types
}
def list_entities(
db_session,
page: int = 1,
per_page: int = 20,
entity_type: Optional[str] = None,
is_verified: Optional[bool] = None,
min_mentions: Optional[int] = None
) -> Dict:
"""
List knowledge entities with pagination and filtering.
Args:
db_session: Database session
page: Page number (1-based)
per_page: Items per page
entity_type: Filter by entity type (company, person, place, organization, project, technology)
is_verified: Filter by verification status
min_mentions: Filter by minimum mention count
Returns:
{
'entities': [...],
'total': int,
'page': int,
'per_page': int,
'pages': int,
'entity_types': [...] - available types for filtering
}
"""
from sqlalchemy import func, distinct
query = db_session.query(ZOPKKnowledgeEntity)
# Exclude merged entities
query = query.filter(ZOPKKnowledgeEntity.merged_into_id.is_(None))
# Apply filters
if entity_type:
query = query.filter(ZOPKKnowledgeEntity.entity_type == entity_type)
if is_verified is not None:
query = query.filter(ZOPKKnowledgeEntity.is_verified == is_verified)
if min_mentions:
query = query.filter(ZOPKKnowledgeEntity.mentions_count >= min_mentions)
# Get total count
total = query.count()
# Get available entity types
entity_types = db_session.query(
distinct(ZOPKKnowledgeEntity.entity_type)
).filter(
ZOPKKnowledgeEntity.entity_type.isnot(None)
).all()
entity_types = [e[0] for e in entity_types if e[0]]
# Calculate pagination
pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
# Get entities sorted by mentions
entities = query.order_by(
ZOPKKnowledgeEntity.mentions_count.desc()
).offset(offset).limit(per_page).all()
return {
'entities': [
{
'id': e.id,
'name': e.name,
'normalized_name': e.normalized_name,
'entity_type': e.entity_type,
'description': e.description,
'short_description': e.short_description,
'aliases': e.aliases if isinstance(e.aliases, list) else [],
'mentions_count': e.mentions_count or 0,
'is_verified': e.is_verified,
'company_id': e.company_id,
'external_url': e.external_url,
'first_mentioned_at': e.first_mentioned_at.isoformat() if e.first_mentioned_at else None,
'last_mentioned_at': e.last_mentioned_at.isoformat() if e.last_mentioned_at else None,
'created_at': e.created_at.isoformat() if e.created_at else None
}
for e in entities
],
'total': total,
'page': page,
'per_page': per_page,
'pages': pages,
'entity_types': entity_types
}
def get_chunk_detail(db_session, chunk_id: int) -> Optional[Dict]:
"""Get detailed information about a single chunk."""
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.id == chunk_id
).first()
if not chunk:
return None
# Get facts from this chunk
facts = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.source_chunk_id == chunk_id
).all()
# Get entity mentions
mentions = db_session.query(ZOPKKnowledgeEntityMention).filter(
ZOPKKnowledgeEntityMention.chunk_id == chunk_id
).all()
return {
'id': chunk.id,
'content': chunk.content,
'content_clean': chunk.content_clean,
'summary': chunk.summary,
'chunk_type': chunk.chunk_type,
'chunk_index': chunk.chunk_index,
'token_count': chunk.token_count,
'importance_score': chunk.importance_score,
'confidence_score': float(chunk.confidence_score) if chunk.confidence_score else None,
'has_embedding': chunk.embedding is not None,
'is_verified': chunk.is_verified,
'keywords': chunk.keywords if isinstance(chunk.keywords, list) else [],
'context_date': chunk.context_date.isoformat() if chunk.context_date else None,
'context_location': chunk.context_location,
'extraction_model': chunk.extraction_model,
'extracted_at': chunk.extracted_at.isoformat() if chunk.extracted_at else None,
'created_at': chunk.created_at.isoformat() if chunk.created_at else None,
'source_news': {
'id': chunk.source_news.id,
'title': chunk.source_news.title,
'url': chunk.source_news.url,
'source_name': chunk.source_news.source_name
} if chunk.source_news else None,
'facts': [
{
'id': f.id,
'fact_type': f.fact_type,
'full_text': f.full_text,
'is_verified': f.is_verified
}
for f in facts
],
'entity_mentions': [
{
'id': m.id,
'entity_id': m.entity_id,
'entity_name': m.entity.name if m.entity else None,
'entity_type': m.entity.entity_type if m.entity else None,
'mention_text': m.mention_text
}
for m in mentions
]
}
def update_chunk_verification(db_session, chunk_id: int, is_verified: bool, user_id: int) -> bool:
"""Update chunk verification status."""
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.id == chunk_id
).first()
if not chunk:
return False
chunk.is_verified = is_verified
chunk.verified_by = user_id
chunk.verified_at = datetime.now()
db_session.commit()
return True
def update_fact_verification(db_session, fact_id: int, is_verified: bool) -> bool:
"""Update fact verification status."""
fact = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.id == fact_id
).first()
if not fact:
return False
fact.is_verified = is_verified
db_session.commit()
return True
def update_entity_verification(db_session, entity_id: int, is_verified: bool) -> bool:
"""Update entity verification status."""
entity = db_session.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.id == entity_id
).first()
if not entity:
return False
entity.is_verified = is_verified
db_session.commit()
return True
def delete_chunk(db_session, chunk_id: int) -> bool:
"""Delete a chunk and its associated facts and mentions."""
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
ZOPKKnowledgeChunk.id == chunk_id
).first()
if not chunk:
return False
# Delete associated facts
db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.source_chunk_id == chunk_id
).delete()
# Delete associated mentions
db_session.query(ZOPKKnowledgeEntityMention).filter(
ZOPKKnowledgeEntityMention.chunk_id == chunk_id
).delete()
# Delete chunk
db_session.delete(chunk)
db_session.commit()
return True
# ============================================================
# DUPLICATE ENTITY DETECTION AND MERGING
# ============================================================
def find_duplicate_entities(
db_session,
entity_type: Optional[str] = None,
min_similarity: float = 0.5,
limit: int = 100
) -> List[Dict]:
"""
Find potential duplicate entities using fuzzy matching.
Uses PostgreSQL pg_trgm extension for similarity matching.
Returns pairs of entities that might be duplicates.
Args:
db_session: SQLAlchemy session
entity_type: Filter by entity type (company, person, etc.)
min_similarity: Minimum similarity threshold (0.0-1.0)
limit: Maximum number of pairs to return
Returns:
List of dicts with duplicate pairs:
[
{
'entity1': {...},
'entity2': {...},
'similarity': 0.85,
'match_type': 'fuzzy' # or 'substring'
}
]
"""
from sqlalchemy import text
# Build query with pg_trgm similarity
# Use conditional SQL with COALESCE to avoid f-string interpolation
query = text("""
SELECT
e1.id as id1, e1.name as name1, e1.entity_type as type1,
e1.mentions_count as mentions1, e1.is_verified as verified1,
e2.id as id2, e2.name as name2, e2.entity_type as type2,
e2.mentions_count as mentions2, e2.is_verified as verified2,
similarity(LOWER(e1.name), LOWER(e2.name)) as sim,
CASE
WHEN LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%'
OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%'
THEN 'substring'
ELSE 'fuzzy'
END as match_type
FROM zopk_knowledge_entities e1
JOIN zopk_knowledge_entities e2
ON e1.id < e2.id
AND e1.entity_type = e2.entity_type
WHERE (
similarity(LOWER(e1.name), LOWER(e2.name)) > :min_sim
OR LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%'
OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%'
)
AND (:entity_type IS NULL OR e1.entity_type = :entity_type)
ORDER BY
sim DESC,
e1.entity_type,
GREATEST(e1.mentions_count, e2.mentions_count) DESC
LIMIT :limit
""")
params = {'min_sim': min_similarity, 'limit': limit, 'entity_type': entity_type}
result = db_session.execute(query, params)
duplicates = []
for row in result:
duplicates.append({
'entity1': {
'id': row.id1,
'name': row.name1,
'entity_type': row.type1,
'mentions_count': row.mentions1,
'is_verified': row.verified1
},
'entity2': {
'id': row.id2,
'name': row.name2,
'entity_type': row.type2,
'mentions_count': row.mentions2,
'is_verified': row.verified2
},
'similarity': float(row.sim) if row.sim else 0.0,
'match_type': row.match_type
})
return duplicates
def merge_entities(
db_session,
primary_id: int,
duplicate_id: int,
new_name: Optional[str] = None
) -> Dict:
"""
Merge two entities - keep primary, delete duplicate.
Transfers all relationships from duplicate to primary:
- Entity mentions
- Facts (subject/object references)
- Relations (source/target)
- Updates mentions_count
Args:
db_session: SQLAlchemy session
primary_id: ID of entity to keep
duplicate_id: ID of entity to merge and delete
new_name: Optional new canonical name for primary
Returns:
Dict with merge results:
{
'success': True,
'primary_id': 123,
'deleted_id': 456,
'transfers': {
'mentions': 15,
'facts_subject': 3,
'facts_object': 2,
'relations_source': 1,
'relations_target': 0
}
}
"""
from sqlalchemy import text
# Get both entities
primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id)
duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id)
if not primary:
return {'success': False, 'error': f'Primary entity {primary_id} not found'}
if not duplicate:
return {'success': False, 'error': f'Duplicate entity {duplicate_id} not found'}
if primary.entity_type != duplicate.entity_type:
return {'success': False, 'error': 'Cannot merge entities of different types'}
transfers = {
'mentions': 0,
'facts': 0,
'relations_source': 0,
'relations_target': 0
}
try:
# 1. Transfer mentions
result = db_session.execute(text("""
UPDATE zopk_knowledge_entity_mentions
SET entity_id = :primary_id
WHERE entity_id = :duplicate_id
"""), {'primary_id': primary_id, 'duplicate_id': duplicate_id})
transfers['mentions'] = result.rowcount
# 2. Transfer facts - update entities_involved JSONB
# Replace duplicate entity ID with primary ID in the JSONB array
result = db_session.execute(text("""
UPDATE zopk_knowledge_facts
SET entities_involved = (
SELECT jsonb_agg(
CASE
WHEN (elem->>'id')::int = :duplicate_id
THEN jsonb_set(elem, '{id}', to_jsonb(:primary_id))
ELSE elem
END
)
FROM jsonb_array_elements(entities_involved::jsonb) AS elem
)
WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb)
"""), {
'primary_id': primary_id,
'duplicate_id': duplicate_id,
'entity_json': f'[{{"id": {duplicate_id}}}]'
})
transfers['facts'] = result.rowcount
# 4. Transfer relations (entity_a)
result = db_session.execute(text("""
UPDATE zopk_knowledge_relations
SET entity_a_id = :primary_id
WHERE entity_a_id = :duplicate_id
"""), {'primary_id': primary_id, 'duplicate_id': duplicate_id})
transfers['relations_source'] = result.rowcount
# 5. Transfer relations (entity_b)
result = db_session.execute(text("""
UPDATE zopk_knowledge_relations
SET entity_b_id = :primary_id
WHERE entity_b_id = :duplicate_id
"""), {'primary_id': primary_id, 'duplicate_id': duplicate_id})
transfers['relations_target'] = result.rowcount
# 6. Update primary entity
primary.mentions_count += duplicate.mentions_count
if new_name:
primary.canonical_name = new_name
# Merge aliases
if duplicate.aliases:
existing_aliases = primary.aliases or []
new_aliases = duplicate.aliases
# Add duplicate name as alias
if duplicate.name not in existing_aliases:
existing_aliases.append(duplicate.name)
# Add duplicate's aliases
for alias in new_aliases:
if alias not in existing_aliases:
existing_aliases.append(alias)
primary.aliases = existing_aliases
# 7. Delete duplicate
db_session.delete(duplicate)
db_session.commit()
return {
'success': True,
'primary_id': primary_id,
'deleted_id': duplicate_id,
'new_mentions_count': primary.mentions_count,
'transfers': transfers
}
except Exception as e:
db_session.rollback()
logger.error(f"Error merging entities: {e}")
return {'success': False, 'error': str(e)}
def get_entity_merge_preview(
db_session,
primary_id: int,
duplicate_id: int
) -> Dict:
"""
Preview what would happen if two entities are merged.
Returns counts of items that would be transferred.
"""
from sqlalchemy import text, func
primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id)
duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id)
if not primary or not duplicate:
return {'error': 'Entity not found'}
# Count items that would be transferred
mentions = db_session.query(func.count(ZOPKKnowledgeEntityMention.id)).filter(
ZOPKKnowledgeEntityMention.entity_id == duplicate_id
).scalar() or 0
# Facts use entities_involved (JSONB) not FK columns, so count via JSONB query
# Count facts where duplicate entity is in entities_involved array
facts_with_entity = db_session.execute(text("""
SELECT COUNT(*) FROM zopk_knowledge_facts
WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb)
"""), {'entity_json': f'[{{"id": {duplicate_id}}}]'}).scalar() or 0
relations_source = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter(
ZOPKKnowledgeRelation.entity_a_id == duplicate_id
).scalar() or 0
relations_target = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter(
ZOPKKnowledgeRelation.entity_b_id == duplicate_id
).scalar() or 0
return {
'primary': {
'id': primary.id,
'name': primary.name,
'entity_type': primary.entity_type,
'mentions_count': primary.mentions_count,
'aliases': primary.aliases or []
},
'duplicate': {
'id': duplicate.id,
'name': duplicate.name,
'entity_type': duplicate.entity_type,
'mentions_count': duplicate.mentions_count,
'aliases': duplicate.aliases or []
},
'transfers': {
'mentions': mentions,
'facts': facts_with_entity,
'relations_source': relations_source,
'relations_target': relations_target,
'total': mentions + facts_with_entity + relations_source + relations_target
},
'result': {
'new_mentions_count': primary.mentions_count + duplicate.mentions_count
}
}
# ============================================================
# FACT DEDUPLICATION
# ============================================================
def find_duplicate_facts(
db_session,
min_similarity: float = 0.7,
limit: int = 100,
fact_type: Optional[str] = None
) -> List[Dict]:
"""Find potential duplicate facts using text similarity.
Uses pg_trgm % operator with GiST index for fast similarity search.
"""
from sqlalchemy import text
# Set similarity threshold and use % operator (uses GiST index)
db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"), {'threshold': min_similarity})
query = text("""
SELECT
f1.id as id1, f1.full_text as text1, f1.fact_type as type1,
f1.is_verified as verified1, f1.confidence_score as score1,
f2.id as id2, f2.full_text as text2, f2.fact_type as type2,
f2.is_verified as verified2, f2.confidence_score as score2,
similarity(f1.full_text, f2.full_text) as sim
FROM zopk_knowledge_facts f1
JOIN zopk_knowledge_facts f2 ON f1.id < f2.id
WHERE f1.full_text % f2.full_text
AND (:fact_type IS NULL OR f1.fact_type = :fact_type)
ORDER BY sim DESC, COALESCE(GREATEST(f1.confidence_score, f2.confidence_score), 0) DESC
LIMIT :limit
""")
params = {'limit': limit, 'fact_type': fact_type}
result = db_session.execute(query, params)
duplicates = []
for row in result:
duplicates.append({
'fact1': {
'id': row.id1, 'text': row.text1, 'fact_type': row.type1,
'is_verified': row.verified1,
'confidence_score': float(row.score1) if row.score1 else 0
},
'fact2': {
'id': row.id2, 'text': row.text2, 'fact_type': row.type2,
'is_verified': row.verified2,
'confidence_score': float(row.score2) if row.score2 else 0
},
'similarity': float(row.sim)
})
return duplicates
def merge_facts(db_session, primary_id: int, duplicate_id: int, new_text: Optional[str] = None) -> Dict:
"""Merge duplicate fact into primary."""
primary = db_session.query(ZOPKKnowledgeFact).get(primary_id)
duplicate = db_session.query(ZOPKKnowledgeFact).get(duplicate_id)
if not primary:
return {'success': False, 'error': f'Primary fact {primary_id} not found'}
if not duplicate:
return {'success': False, 'error': f'Duplicate fact {duplicate_id} not found'}
try:
if new_text:
primary.full_text = new_text
if duplicate.importance_score and (not primary.importance_score or duplicate.importance_score > primary.importance_score):
primary.importance_score = duplicate.importance_score
if duplicate.confidence_score and (not primary.confidence_score or duplicate.confidence_score > primary.confidence_score):
primary.confidence_score = duplicate.confidence_score
if duplicate.is_verified:
primary.is_verified = True
db_session.delete(duplicate)
db_session.commit()
return {'success': True, 'primary_id': primary_id, 'deleted_id': duplicate_id}
except Exception as e:
db_session.rollback()
return {'success': False, 'error': str(e)}
# ============================================================
# AUTO-VERIFICATION
# ============================================================
def auto_verify_top_entities(db_session, min_mentions: int = 5, limit: int = 100) -> Dict:
"""Auto-verify entities with high mention counts."""
entities = db_session.query(ZOPKKnowledgeEntity).filter(
ZOPKKnowledgeEntity.is_verified == False,
ZOPKKnowledgeEntity.mentions_count >= min_mentions
).order_by(ZOPKKnowledgeEntity.mentions_count.desc()).limit(limit).all()
for entity in entities:
entity.is_verified = True
db_session.commit()
return {'success': True, 'verified_count': len(entities), 'min_mentions': min_mentions}
def auto_verify_top_facts(db_session, min_importance: float = 0.7, limit: int = 200) -> Dict:
"""Auto-verify facts with high confidence scores (≥70% by default)."""
# Note: Table uses confidence_score, not importance_score
facts = db_session.query(ZOPKKnowledgeFact).filter(
ZOPKKnowledgeFact.is_verified == False,
ZOPKKnowledgeFact.confidence_score >= min_importance
).order_by(ZOPKKnowledgeFact.confidence_score.desc()).limit(limit).all()
for fact in facts:
fact.is_verified = True
db_session.commit()
return {'success': True, 'verified_count': len(facts), 'min_confidence': min_importance}
def find_similar_to_verified_facts(
db_session,
min_similarity: float = 0.8,
limit: int = 100
) -> List[Dict]:
"""
Find unverified facts that are similar to already verified facts.
Uses pg_trgm similarity search to "learn" from verified examples.
Returns list of suggestions with similarity scores.
"""
from sqlalchemy import text, func
# Check if we have any verified facts to learn from
verified_count = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(
ZOPKKnowledgeFact.is_verified == True
).scalar() or 0
if verified_count == 0:
return []
# Set similarity threshold
db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"),
{'threshold': min_similarity})
# Find unverified facts similar to verified ones
query = text("""
SELECT DISTINCT ON (unverified.id)
unverified.id as fact_id,
unverified.full_text as fact_text,
unverified.fact_type,
unverified.confidence_score,
verified.id as similar_to_id,
verified.full_text as similar_to_text,
similarity(unverified.full_text, verified.full_text) as sim_score
FROM zopk_knowledge_facts unverified
JOIN zopk_knowledge_facts verified
ON verified.is_verified = TRUE
AND unverified.full_text % verified.full_text
WHERE unverified.is_verified = FALSE
ORDER BY unverified.id, sim_score DESC
LIMIT :limit
""")
result = db_session.execute(query, {'limit': limit})
suggestions = []
for row in result:
suggestions.append({
'fact_id': row.fact_id,
'fact_text': row.fact_text,
'fact_type': row.fact_type,
'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5,
'similar_to_id': row.similar_to_id,
'similar_to_text': row.similar_to_text,
'similarity': float(row.sim_score)
})
return suggestions
def auto_verify_similar_to_verified(
db_session,
min_similarity: float = 0.8,
limit: int = 100
) -> Dict:
"""
Auto-verify facts that are similar to already verified facts.
This enables "learning" from manual verifications.
Args:
min_similarity: Minimum similarity threshold (0.8 = 80% similar)
limit: Maximum number of facts to verify at once
Returns:
Dict with success status and count of verified facts
"""
from sqlalchemy import text, func
# Check if we have any verified facts to learn from
verified_count = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(
ZOPKKnowledgeFact.is_verified == True
).scalar() or 0
if verified_count == 0:
return {
'success': False,
'error': 'Brak zweryfikowanych faktów do nauki. Najpierw zweryfikuj kilka faktów ręcznie.',
'verified_count': 0
}
# Set similarity threshold
db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"),
{'threshold': min_similarity})
# First, find matching facts with their details and similarity scores
find_query = text("""
SELECT DISTINCT ON (unverified.id)
unverified.id as fact_id,
unverified.full_text as fact_text,
unverified.fact_type,
unverified.confidence_score,
verified.id as pattern_id,
verified.full_text as pattern_text,
similarity(unverified.full_text, verified.full_text) as sim_score
FROM zopk_knowledge_facts unverified
JOIN zopk_knowledge_facts verified
ON verified.is_verified = TRUE
AND unverified.full_text % verified.full_text
WHERE unverified.is_verified = FALSE
ORDER BY unverified.id, sim_score DESC
LIMIT :limit
""")
result = db_session.execute(find_query, {'limit': limit})
facts_to_verify = []
for row in result:
facts_to_verify.append({
'fact_id': row.fact_id,
'fact_text': row.fact_text,
'fact_type': row.fact_type,
'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5,
'pattern_id': row.pattern_id,
'pattern_text': row.pattern_text,
'similarity': round(float(row.sim_score) * 100)
})
if not facts_to_verify:
return {
'success': True,
'verified_count': 0,
'verified_facts': [],
'min_similarity': min_similarity,
'learned_from': verified_count
}
# Now verify the found facts
fact_ids = [f['fact_id'] for f in facts_to_verify]
update_query = text("""
UPDATE zopk_knowledge_facts
SET is_verified = TRUE
WHERE id = ANY(:ids)
""")
db_session.execute(update_query, {'ids': fact_ids})
db_session.commit()
return {
'success': True,
'verified_count': len(facts_to_verify),
'verified_facts': facts_to_verify,
'min_similarity': min_similarity,
'learned_from': verified_count
}
# ============================================================
# DASHBOARD STATS
# ============================================================
def get_knowledge_dashboard_stats(db_session) -> Dict:
"""Get comprehensive stats for knowledge dashboard."""
from sqlalchemy import func, text
chunks_total = db_session.query(func.count(ZOPKKnowledgeChunk.id)).scalar() or 0
chunks_verified = db_session.query(func.count(ZOPKKnowledgeChunk.id)).filter(ZOPKKnowledgeChunk.is_verified == True).scalar() or 0
chunks_with_embedding = db_session.query(func.count(ZOPKKnowledgeChunk.id)).filter(ZOPKKnowledgeChunk.embedding.isnot(None)).scalar() or 0
entities_total = db_session.query(func.count(ZOPKKnowledgeEntity.id)).scalar() or 0
entities_verified = db_session.query(func.count(ZOPKKnowledgeEntity.id)).filter(ZOPKKnowledgeEntity.is_verified == True).scalar() or 0
facts_total = db_session.query(func.count(ZOPKKnowledgeFact.id)).scalar() or 0
facts_verified = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(ZOPKKnowledgeFact.is_verified == True).scalar() or 0
news_total = db_session.execute(text("SELECT COUNT(*) FROM zopk_news WHERE status IN ('approved', 'auto_approved')")).scalar() or 0
news_with_extraction = db_session.execute(text('''
SELECT COUNT(DISTINCT n.id) FROM zopk_news n
JOIN zopk_knowledge_chunks c ON c.source_news_id = n.id
WHERE n.status IN ('approved', 'auto_approved')
''')).scalar() or 0
entity_types = db_session.execute(text('SELECT entity_type, COUNT(*) FROM zopk_knowledge_entities GROUP BY entity_type ORDER BY 2 DESC')).fetchall()
fact_types = db_session.execute(text('SELECT fact_type, COUNT(*) FROM zopk_knowledge_facts GROUP BY fact_type ORDER BY 2 DESC')).fetchall()
top_entities = db_session.query(ZOPKKnowledgeEntity).order_by(ZOPKKnowledgeEntity.mentions_count.desc()).limit(10).all()
return {
'chunks': {'total': chunks_total, 'verified': chunks_verified, 'with_embedding': chunks_with_embedding,
'verified_pct': round(100 * chunks_verified / chunks_total, 1) if chunks_total else 0},
'entities': {'total': entities_total, 'verified': entities_verified,
'verified_pct': round(100 * entities_verified / entities_total, 1) if entities_total else 0,
'by_type': [{'type': r[0], 'count': r[1]} for r in entity_types]},
'facts': {'total': facts_total, 'verified': facts_verified,
'verified_pct': round(100 * facts_verified / facts_total, 1) if facts_total else 0,
'by_type': [{'type': r[0] or 'unknown', 'count': r[1]} for r in fact_types]},
'news': {'total': news_total, 'with_extraction': news_with_extraction, 'pending': news_total - news_with_extraction},
'top_entities': [{'id': e.id, 'name': e.name, 'type': e.entity_type, 'mentions': e.mentions_count} for e in top_entities]
}
# ============================================================
# TIMELINE SUGGESTIONS (Auto-populate from Knowledge Base)
# ============================================================
def get_timeline_suggestions(
db_session,
limit: int = 50,
only_verified: bool = True
) -> Dict:
"""
Get milestone facts from knowledge base that could become timeline milestones.
Finds verified milestone facts that are NOT yet linked to any timeline milestone.
Groups similar facts and ranks by confidence score.
Args:
db_session: Database session
limit: Max suggestions to return
only_verified: Only include verified facts
Returns:
{
'success': True,
'suggestions': [...],
'total_milestone_facts': int,
'already_in_timeline': int
}
"""
from sqlalchemy import text, func
from database import ZOPKMilestone
try:
# Count total milestone facts
total_query = text("""
SELECT COUNT(*) FROM zopk_knowledge_facts
WHERE fact_type = 'milestone'
""")
total_milestone_facts = db_session.execute(total_query).scalar() or 0
# Count facts already linked to timeline
linked_query = text("""
SELECT COUNT(DISTINCT f.id)
FROM zopk_knowledge_facts f
JOIN zopk_milestones m ON m.source_news_id = f.source_news_id
WHERE f.fact_type = 'milestone'
""")
already_linked = db_session.execute(linked_query).scalar() or 0
# Get milestone facts not yet in timeline
# Prioritize: verified, high confidence, has numeric value (dates/amounts)
suggestions_query = text("""
SELECT DISTINCT ON (f.id)
f.id as fact_id,
f.full_text,
f.subject,
f.predicate,
f.object,
f.confidence_score,
f.numeric_value,
f.numeric_unit,
f.is_verified,
f.source_news_id,
n.title as news_title,
n.published_at as news_date,
n.url as news_url,
n.source_name
FROM zopk_knowledge_facts f
LEFT JOIN zopk_news n ON n.id = f.source_news_id
WHERE f.fact_type = 'milestone'
AND (:only_verified = FALSE OR f.is_verified = TRUE)
AND NOT EXISTS (
SELECT 1 FROM zopk_milestones m
WHERE m.source_news_id = f.source_news_id
AND similarity(m.title, f.full_text) > 0.5
)
ORDER BY f.id, f.confidence_score DESC NULLS LAST, f.is_verified DESC
LIMIT :limit
""")
params = {'limit': limit, 'only_verified': bool(only_verified)}
results = db_session.execute(suggestions_query, params).fetchall()
suggestions = []
for row in results:
# Auto-detect category based on keywords
category = _detect_milestone_category(row.full_text, row.subject)
# Try to extract date from text or use news date
target_date = _extract_date_from_text(row.full_text)
if not target_date and row.news_date:
target_date = row.news_date.strftime('%Y-%m-%d') if hasattr(row.news_date, 'strftime') else str(row.news_date)
suggestions.append({
'fact_id': row.fact_id,
'full_text': row.full_text,
'subject': row.subject,
'predicate': row.predicate,
'object': row.object,
'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5,
'is_verified': row.is_verified,
'source_news_id': row.source_news_id,
'news_title': row.news_title,
'news_date': row.news_date.isoformat() if row.news_date else None,
'news_url': row.news_url,
'source_name': row.source_name,
# Auto-suggested values for timeline
'suggested_title': _generate_milestone_title(row.full_text, row.subject),
'suggested_category': category,
'suggested_date': target_date,
'suggested_status': 'completed' if _is_past_event(row.full_text) else 'planned'
})
return {
'success': True,
'suggestions': suggestions,
'total_milestone_facts': total_milestone_facts,
'already_in_timeline': already_linked,
'suggestions_count': len(suggestions)
}
except Exception as e:
logger.error(f"Error getting timeline suggestions: {e}")
return {
'success': False,
'error': str(e),
'suggestions': []
}
def _detect_milestone_category(text: str, subject: str = None) -> str:
"""
Auto-detect milestone category based on keywords.
Categories: nuclear, offshore, infrastructure, defense, other
"""
text_lower = (text or '').lower()
subject_lower = (subject or '').lower()
combined = f"{text_lower} {subject_lower}"
# Nuclear energy keywords
nuclear_keywords = [
'jądrowa', 'jądrowy', 'atomowa', 'atomowy', 'nuclear',
'lubiatowo', 'kopalino', 'pej', 'polskie elektrownie',
'reaktor', 'uran', 'westinghouse', 'ap1000'
]
if any(kw in combined for kw in nuclear_keywords):
return 'nuclear'
# Offshore wind keywords
offshore_keywords = [
'offshore', 'wiatrowa', 'wiatrowy', 'morska farma', 'farma wiatrowa',
'baltic power', 'baltica', 'orlen', 'northland', 'bałtyk',
'turbina', 'mw wiatr', 'gw wiatr'
]
if any(kw in combined for kw in offshore_keywords):
return 'offshore'
# Defense/military keywords
defense_keywords = [
'kongsberg', 'obronność', 'obronny', 'wojsko', 'wojskowy',
'mon ', 'ministerstwo obrony', 'zbrojeniowy', 'dron', 'amunicja',
'nsm', 'rakieta', 'samolot bojowy', 'okręt', 'bezpieczeństwo',
'nato', 'sojusz'
]
if any(kw in combined for kw in defense_keywords):
return 'defense'
# Infrastructure keywords
infra_keywords = [
's6', 's7', 'via pomerania', 'droga', 'autostrada', 'ekspresowa',
'kolej', 'pkp', 'port', 'terminal', 'lotnisko', 'most',
'infrastruktura', 'budowa', 'remont', 'przebudowa',
'wodociąg', 'kanalizacja', 'oczyszczalnia'
]
if any(kw in combined for kw in infra_keywords):
return 'infrastructure'
return 'other'
def _generate_milestone_title(full_text: str, subject: str = None) -> str:
"""
Generate a concise title for milestone from fact text.
Truncates to ~100 chars and tries to keep meaningful content.
"""
if not full_text:
return subject or "Kamień milowy"
# If text is short enough, use as is
if len(full_text) <= 100:
return full_text
# Try to find a natural break point
text = full_text[:150]
# Look for sentence end
for sep in ['. ', ', ', ' - ', ': ']:
if sep in text:
parts = text.split(sep)
if len(parts[0]) >= 30:
return parts[0] + ('.' if not parts[0].endswith('.') else '')
# Just truncate with ellipsis
return text[:97] + '...'
def _extract_date_from_text(text: str) -> str:
"""
Try to extract date from milestone text.
Returns ISO format date string or None.
"""
import re
from datetime import datetime
if not text:
return None
text_lower = text.lower()
# Patterns to match
patterns = [
# "w 2025 roku", "2025 r.", "rok 2025"
(r'\b(20[2-3]\d)\s*(rok|r\.?)\b', lambda m: f"{m.group(1)}-01-01"),
(r'\brok\s*(20[2-3]\d)\b', lambda m: f"{m.group(1)}-01-01"),
# "w marcu 2025", "marzec 2025"
(r'\b(stycz\w*|luty|lut\w*|marz\w*|kwie\w*|maj\w*|czerw\w*|lip\w*|sierp\w*|wrze\w*|paźdz\w*|listop\w*|grud\w*)\s*(20[2-3]\d)',
lambda m: _month_to_date(m.group(1), m.group(2))),
# "Q1 2025", "Q3 2026"
(r'\bQ([1-4])\s*(20[2-3]\d)', lambda m: f"{m.group(2)}-{int(m.group(1))*3-2:02d}-01"),
# "I kwartał 2025"
(r'\b(I|II|III|IV)\s*kwarta\w*\s*(20[2-3]\d)',
lambda m: _quarter_to_date(m.group(1), m.group(2))),
]
for pattern, formatter in patterns:
match = re.search(pattern, text_lower)
if match:
try:
return formatter(match)
except:
continue
return None
def _month_to_date(month_name: str, year: str) -> str:
"""Convert Polish month name to date string."""
months = {
'stycz': '01', 'luty': '02', 'lut': '02', 'marz': '03',
'kwie': '04', 'maj': '05', 'czerw': '06', 'lip': '07',
'sierp': '08', 'wrze': '09', 'paźdz': '10', 'listop': '11',
'grud': '12'
}
for prefix, num in months.items():
if month_name.startswith(prefix):
return f"{year}-{num}-01"
return f"{year}-01-01"
def _quarter_to_date(quarter: str, year: str) -> str:
"""Convert Roman numeral quarter to date string."""
quarters = {'I': '01', 'II': '04', 'III': '07', 'IV': '10'}
month = quarters.get(quarter, '01')
return f"{year}-{month}-01"
def _is_past_event(text: str) -> bool:
"""
Detect if milestone text describes a past event (completed)
or future event (planned).
"""
if not text:
return False
text_lower = text.lower()
# Past tense indicators (Polish)
past_indicators = [
'podpisano', 'podpisał', 'zakończono', 'oddano', 'otwarto',
'uruchomiono', 'rozpoczęto', 'ogłoszono', 'przyznano',
'uzyskano', 'otrzymał', 'zdobył', 'wygrał', 'został',
'odbył się', 'odbyła się', 'miało miejsce'
]
# Future tense indicators
future_indicators = [
'planowany', 'planowane', 'planowana', 'ma zostać',
'będzie', 'zostanie', 'ma być', 'powstanie',
'w przyszłości', 'do końca', 'w ciągu'
]
past_count = sum(1 for ind in past_indicators if ind in text_lower)
future_count = sum(1 for ind in future_indicators if ind in text_lower)
return past_count > future_count
def create_milestone_from_suggestion(
db_session,
fact_id: int,
title: str,
description: str = None,
category: str = 'other',
target_date: str = None,
status: str = 'planned',
source_url: str = None
) -> Dict:
"""
Create a timeline milestone from a knowledge fact suggestion.
Args:
db_session: Database session
fact_id: Source fact ID
title: Milestone title
description: Optional description
category: nuclear, offshore, infrastructure, defense, other
target_date: Target date (YYYY-MM-DD format)
status: planned, in_progress, completed, delayed
source_url: Source article URL
Returns:
{'success': True, 'milestone_id': int} or {'success': False, 'error': str}
"""
from database import ZOPKMilestone, ZOPKKnowledgeFact
from datetime import datetime
try:
# Get the source fact
fact = db_session.query(ZOPKKnowledgeFact).get(fact_id)
if not fact:
return {'success': False, 'error': f'Fact {fact_id} not found'}
# Parse target date
parsed_date = None
if target_date:
try:
parsed_date = datetime.strptime(target_date, '%Y-%m-%d').date()
except ValueError:
pass
# Create milestone
milestone = ZOPKMilestone(
title=title,
description=description or fact.full_text,
category=category,
target_date=parsed_date,
actual_date=parsed_date if status == 'completed' else None,
status=status,
source_url=source_url,
source_news_id=fact.source_news_id,
is_featured=False
)
db_session.add(milestone)
db_session.commit()
logger.info(f"Created milestone #{milestone.id} from fact #{fact_id}: {title}")
return {
'success': True,
'milestone_id': milestone.id,
'title': title,
'category': category
}
except Exception as e:
db_session.rollback()
logger.error(f"Error creating milestone from fact {fact_id}: {e}")
return {'success': False, 'error': str(e)}
def categorize_milestones_with_ai(
db_session,
suggestions: List[Dict],
model_name: str = "gemini-3-flash-preview"
) -> List[Dict]:
"""
Use Gemini AI to categorize and enhance milestone suggestions.
Adds AI-improved titles, categories, and extracts dates more accurately.
"""
from gemini_service import GeminiService
import json
if not suggestions:
return suggestions
# Prepare batch for AI processing
facts_text = "\n".join([
f"{i+1}. {s['full_text'][:300]}"
for i, s in enumerate(suggestions[:20]) # Limit to 20 for API
])
prompt = f"""Przeanalizuj poniższe fakty o projekcie ZOPK (Zielony Okręg Przemysłowy Kaszubia) i dla każdego zwróć:
- category: jedna z [nuclear, offshore, infrastructure, defense, other]
- short_title: zwięzły tytuł (max 80 znaków)
- target_date: data w formacie YYYY-MM-DD (jeśli można wywnioskować)
- status: jeden z [completed, in_progress, planned]
Kategorie:
- nuclear: elektrownia jądrowa, atom, Lubiatowo-Kopalino
- offshore: farmy wiatrowe, offshore wind, Baltic Power, Baltica
- infrastructure: drogi S6, Via Pomerania, porty, koleje
- defense: Kongsberg, przemysł zbrojeniowy, obronność, MON
Fakty:
{facts_text}
Odpowiedz TYLKO jako JSON array:
[{{"id": 1, "category": "...", "short_title": "...", "target_date": "YYYY-MM-DD lub null", "status": "..."}}]"""
try:
service = GeminiService(model=model_name)
response_text = service.generate_text(prompt)
# Parse response
response_text = response_text.strip()
if response_text.startswith('```'):
response_text = response_text.split('```')[1]
if response_text.startswith('json'):
response_text = response_text[4:]
ai_results = json.loads(response_text)
# Merge AI results with suggestions
for result in ai_results:
idx = result.get('id', 0) - 1
if 0 <= idx < len(suggestions):
suggestions[idx]['ai_category'] = result.get('category', suggestions[idx]['suggested_category'])
suggestions[idx]['ai_title'] = result.get('short_title', suggestions[idx]['suggested_title'])
suggestions[idx]['ai_date'] = result.get('target_date')
suggestions[idx]['ai_status'] = result.get('status', suggestions[idx]['suggested_status'])
return suggestions
except Exception as e:
logger.warning(f"AI categorization failed: {e}")
return suggestions # Return original suggestions without AI enhancement
def analyze_roadmap_with_ai(db_session) -> Dict:
"""
AI-powered roadmap analysis: new milestones, status updates, and gap detection.
Uses Gemini to analyze existing milestones against recent knowledge facts.
Returns:
{
'success': True,
'new_milestones': [...],
'status_updates': [...],
'gaps': [...]
}
"""
from gemini_service import GeminiService
import json
from database import ZOPKMilestone
try:
# 1. Get existing milestones
milestones = db_session.query(ZOPKMilestone).order_by(ZOPKMilestone.target_date).all()
milestones_text = "\n".join([
f"ID:{m.id} | {m.title} | kategoria:{m.category} | status:{m.status} | data:{m.target_date}"
for m in milestones
]) or "(brak kamieni milowych)"
# 2. Get recent verified facts from knowledge base
from sqlalchemy import text
facts_query = text("""
SELECT f.id, f.full_text, f.fact_type, f.date_value, f.confidence_score,
f.source_news_id, n.title as news_title, n.url as news_url
FROM zopk_knowledge_facts f
LEFT JOIN zopk_news n ON n.id = f.source_news_id
WHERE f.confidence_score >= 0.5
ORDER BY f.created_at DESC
LIMIT 50
""")
facts = db_session.execute(facts_query).fetchall()
if not facts:
return {
'success': True,
'new_milestones': [],
'status_updates': [],
'gaps': [],
'message': 'Brak faktów w bazie wiedzy do analizy'
}
facts_text = "\n".join([
f"ID:{f.id} | typ:{f.fact_type} | data:{f.date_value} | {f.full_text[:250]}"
for f in facts
])
# 3. Send to Gemini
prompt = f"""Jesteś ekspertem ds. projektu ZOPK (Zielony Okręg Przemysłowy Kaszubia - Pomorze).
Projekty w regionie: elektrownia jądrowa (Lubiatowo-Kopalino), farmy wiatrowe offshore (Baltic Power, Baltica), infrastruktura (S6, porty), obronność (Kongsberg).
ISTNIEJĄCE KAMIENIE MILOWE ROADMAPY:
{milestones_text}
OSTATNIE FAKTY Z BAZY WIEDZY:
{facts_text}
Przeanalizuj i zwróć TYLKO JSON (bez markdown):
{{
"new_milestones": [
{{"fact_id": N, "title": "max 80 znaków", "category": "nuclear|offshore|infrastructure|defense|other", "target_date": "YYYY-MM-DD lub null", "status": "planned|in_progress|completed", "reason": "dlaczego to kamień milowy"}}
],
"status_updates": [
{{"milestone_id": N, "current_status": "...", "suggested_status": "...", "reason": "krótkie uzasadnienie", "supporting_fact_ids": [N]}}
],
"gaps": [
{{"description": "co brakuje w roadmapie", "suggested_title": "max 80 znaków", "category": "nuclear|offshore|infrastructure|defense|other", "reason": "dlaczego to ważne"}}
]
}}
Zasady:
- new_milestones: fakty które powinny być kamieniami milowymi, a NIE ma ich jeszcze w roadmapie
- status_updates: istniejące kamienie milowe, których status powinien się zmienić na podstawie nowych faktów
- gaps: ważne tematy dla regionu Kaszubia/Pomorze bez kamienia milowego, które warto dodać
- Jeśli nie ma sugestii w danej kategorii, zwróć pustą listę
- Tytuły pisz po polsku"""
gemini = GeminiService()
response_text = gemini.generate_text(
prompt=prompt,
temperature=0.2,
feature='zopk_roadmap_analysis'
)
if not response_text:
return {'success': False, 'error': 'Brak odpowiedzi od AI'}
response_text = response_text.strip()
# Strip markdown code blocks if present
if response_text.startswith('```'):
response_text = response_text.split('```')[1]
if response_text.startswith('json'):
response_text = response_text[4:]
if response_text.endswith('```'):
response_text = response_text[:-3]
ai_result = json.loads(response_text.strip())
# Enrich new_milestones with source info
facts_map = {f.id: f for f in facts}
for nm in ai_result.get('new_milestones', []):
fact = facts_map.get(nm.get('fact_id'))
if fact:
nm['full_text'] = fact.full_text
nm['news_url'] = fact.news_url
nm['news_title'] = fact.news_title
nm['source_news_id'] = fact.source_news_id
# Enrich status_updates with milestone info
milestones_map = {m.id: m for m in milestones}
for su in ai_result.get('status_updates', []):
ms = milestones_map.get(su.get('milestone_id'))
if ms:
su['milestone_title'] = ms.title
su['milestone_category'] = ms.category
logger.info(
f"AI roadmap analysis: {len(ai_result.get('new_milestones', []))} new, "
f"{len(ai_result.get('status_updates', []))} updates, "
f"{len(ai_result.get('gaps', []))} gaps"
)
return {
'success': True,
'new_milestones': ai_result.get('new_milestones', []),
'status_updates': ai_result.get('status_updates', []),
'gaps': ai_result.get('gaps', []),
'total_milestones': len(milestones),
'total_facts_analyzed': len(facts)
}
except json.JSONDecodeError as e:
logger.error(f"AI roadmap analysis JSON parse error: {e}")
return {'success': False, 'error': f'Błąd parsowania odpowiedzi AI: {e}'}
except Exception as e:
logger.error(f"AI roadmap analysis error: {e}")
return {'success': False, 'error': str(e)}