Some checks are pending
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- Fix broken news thumbnails by adding og:image extraction during content scraping (replaces Brave proxy URLs that block hotlinking) - Add image onerror fallback in templates showing domain favicon when original image fails to load - Decode Brave proxy image URLs to original source URLs before saving - Enforce English-only entity types in AI extraction prompt to prevent mixed Polish/English type names - Add migration 083 to normalize 14 existing Polish entity types and clean up 5 stale fetch jobs stuck in 'running' status - Add backfill script for existing articles with broken image URLs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2859 lines
98 KiB
Python
2859 lines
98 KiB
Python
"""
|
|
ZOPK Knowledge Service - Ekstrakcja wiedzy z artykułów dla bazy wiedzy.
|
|
|
|
Pipeline:
|
|
1. Chunking - dzielenie tekstu na fragmenty 500-1000 tokenów
|
|
2. AI Extraction - ekstrakcja faktów i encji przez Gemini
|
|
3. Entity Linking - identyfikacja i deduplikacja encji
|
|
4. Relation Extraction - wykrywanie relacji między encjami
|
|
5. Embedding Generation - wektory dla semantic search (FAZA 2)
|
|
|
|
Usage:
|
|
from zopk_knowledge_service import ZOPKKnowledgeService
|
|
|
|
service = ZOPKKnowledgeService(db_session)
|
|
result = service.extract_from_news(news_id=123)
|
|
# lub batch:
|
|
result = service.batch_extract(limit=50)
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import logging
|
|
import hashlib
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
|
|
# Import progress tracking from scraper
|
|
from zopk_content_scraper import ProgressUpdate, ProgressCallback
|
|
|
|
from database import (
|
|
ZOPKNews,
|
|
ZOPKKnowledgeChunk,
|
|
ZOPKKnowledgeEntity,
|
|
ZOPKKnowledgeFact,
|
|
ZOPKKnowledgeEntityMention,
|
|
ZOPKKnowledgeRelation,
|
|
)
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ============================================================
|
|
# CONFIGURATION
|
|
# ============================================================
|
|
|
|
# Chunk size settings
|
|
# NOTE: Reduced from 1000 to 500 tokens due to Gemini safety filter issues
|
|
# Long texts (~4000 chars) trigger safety blocks, ~2000 chars work reliably
|
|
MIN_CHUNK_SIZE = 200 # tokens
|
|
MAX_CHUNK_SIZE = 500 # tokens (~2000 chars for Polish text)
|
|
CHUNK_OVERLAP = 50 # tokens overlap between chunks (reduced proportionally)
|
|
APPROX_CHARS_PER_TOKEN = 4 # Polish text approximation
|
|
|
|
# AI extraction settings
|
|
MAX_FACTS_PER_CHUNK = 10
|
|
MAX_ENTITIES_PER_CHUNK = 15
|
|
|
|
# Entity types
|
|
ENTITY_TYPES = [
|
|
'company', # Firma, organizacja biznesowa
|
|
'person', # Osoba
|
|
'place', # Miejsce, lokalizacja
|
|
'organization', # Instytucja rządowa, NGO
|
|
'project', # Projekt, inicjatywa
|
|
'technology', # Technologia, produkt
|
|
'event', # Wydarzenie, konferencja
|
|
]
|
|
|
|
# Fact types
|
|
FACT_TYPES = [
|
|
'statistic', # Liczby, dane ilościowe
|
|
'event', # Zdarzenie z datą
|
|
'statement', # Wypowiedź, deklaracja
|
|
'decision', # Decyzja, postanowienie
|
|
'milestone', # Kamień milowy projektu
|
|
'partnership', # Partnerstwo, współpraca
|
|
'investment', # Inwestycja
|
|
]
|
|
|
|
# Relation types
|
|
RELATION_TYPES = [
|
|
'investor_in', # A inwestuje w B
|
|
'partner_of', # A jest partnerem B
|
|
'located_in', # A znajduje się w B
|
|
'manages', # A zarządza B
|
|
'works_for', # A pracuje dla B
|
|
'part_of', # A jest częścią B
|
|
'cooperates_with', # A współpracuje z B
|
|
'produces', # A produkuje B
|
|
'supplies_to', # A dostarcza do B
|
|
]
|
|
|
|
# ============================================================
|
|
# AI PROMPTS
|
|
# ============================================================
|
|
|
|
# Ultra-simplified prompt to avoid Gemini safety filter issues
|
|
# Note: Complex JSON schemas with pipe characters were triggering filters
|
|
# Note: max_tokens parameter also triggers filters - don't use it!
|
|
EXTRACTION_USER_PROMPT = """Przeanalizuj artykuł i wyodrębnij informacje w formacie JSON.
|
|
|
|
ARTYKUŁ:
|
|
{chunk_text}
|
|
|
|
Zwróć JSON z następującą strukturą:
|
|
{{
|
|
"facts": [
|
|
{{"text": "pełny fakt", "type": "investment"}}
|
|
],
|
|
"entities": [
|
|
{{"name": "Nazwa", "type": "company"}}
|
|
],
|
|
"summary": "krótkie podsumowanie"
|
|
}}
|
|
|
|
Typy faktów (TYLKO te angielskie nazwy): investment, decision, event, statistic, partnership, milestone
|
|
Typy encji (TYLKO te angielskie nazwy): company, person, place, organization, project
|
|
WAŻNE: Nigdy nie używaj polskich nazw typów (np. Organizacja, Lokalizacja, Osoba). Zawsze angielskie."""
|
|
|
|
# System prompt is now empty - the user prompt contains all necessary instructions
|
|
EXTRACTION_SYSTEM_PROMPT = ""
|
|
|
|
|
|
# ============================================================
|
|
# DATA CLASSES
|
|
# ============================================================
|
|
|
|
@dataclass
|
|
class ChunkData:
|
|
"""Data for a single chunk."""
|
|
content: str
|
|
index: int
|
|
token_count: int
|
|
|
|
|
|
@dataclass
|
|
class ExtractionResult:
|
|
"""Result of knowledge extraction from a news article."""
|
|
success: bool
|
|
news_id: int
|
|
chunks_created: int = 0
|
|
facts_created: int = 0
|
|
entities_created: int = 0
|
|
relations_created: int = 0
|
|
error: Optional[str] = None
|
|
processing_time: float = 0.0
|
|
|
|
|
|
# ============================================================
|
|
# CHUNKING FUNCTIONS
|
|
# ============================================================
|
|
|
|
def estimate_tokens(text: str) -> int:
|
|
"""Estimate token count for text."""
|
|
if not text:
|
|
return 0
|
|
# Polish text: ~4 chars per token on average
|
|
return len(text) // APPROX_CHARS_PER_TOKEN
|
|
|
|
|
|
def split_into_sentences(text: str) -> List[str]:
|
|
"""Split text into sentences."""
|
|
# Polish sentence boundaries
|
|
sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZĄĆĘŁŃÓŚŹŻ])'
|
|
sentences = re.split(sentence_pattern, text)
|
|
return [s.strip() for s in sentences if s.strip()]
|
|
|
|
|
|
def create_chunks(text: str) -> List[ChunkData]:
|
|
"""
|
|
Split text into chunks of appropriate size.
|
|
|
|
Strategy:
|
|
- Split by paragraphs first
|
|
- Combine small paragraphs
|
|
- Split large paragraphs by sentences
|
|
- Maintain overlap between chunks
|
|
"""
|
|
if not text:
|
|
return []
|
|
|
|
chunks = []
|
|
current_chunk = []
|
|
current_tokens = 0
|
|
|
|
# Split by paragraphs
|
|
paragraphs = text.split('\n\n')
|
|
|
|
for para in paragraphs:
|
|
para = para.strip()
|
|
if not para:
|
|
continue
|
|
|
|
para_tokens = estimate_tokens(para)
|
|
|
|
# If paragraph is too large, split by sentences
|
|
if para_tokens > MAX_CHUNK_SIZE:
|
|
sentences = split_into_sentences(para)
|
|
for sentence in sentences:
|
|
sent_tokens = estimate_tokens(sentence)
|
|
|
|
if current_tokens + sent_tokens > MAX_CHUNK_SIZE:
|
|
# Save current chunk
|
|
if current_chunk:
|
|
chunk_text = ' '.join(current_chunk)
|
|
chunks.append(ChunkData(
|
|
content=chunk_text,
|
|
index=len(chunks),
|
|
token_count=current_tokens
|
|
))
|
|
# Overlap: keep last sentence
|
|
current_chunk = [current_chunk[-1]] if current_chunk else []
|
|
current_tokens = estimate_tokens(current_chunk[0]) if current_chunk else 0
|
|
|
|
current_chunk.append(sentence)
|
|
current_tokens += sent_tokens
|
|
else:
|
|
# Add paragraph to current chunk
|
|
if current_tokens + para_tokens > MAX_CHUNK_SIZE:
|
|
# Save current chunk
|
|
if current_chunk:
|
|
chunk_text = '\n\n'.join(current_chunk)
|
|
chunks.append(ChunkData(
|
|
content=chunk_text,
|
|
index=len(chunks),
|
|
token_count=current_tokens
|
|
))
|
|
current_chunk = []
|
|
current_tokens = 0
|
|
|
|
current_chunk.append(para)
|
|
current_tokens += para_tokens
|
|
|
|
# Save last chunk
|
|
if current_chunk:
|
|
chunk_text = '\n\n'.join(current_chunk) if len(current_chunk[0]) > 100 else ' '.join(current_chunk)
|
|
if estimate_tokens(chunk_text) >= MIN_CHUNK_SIZE:
|
|
chunks.append(ChunkData(
|
|
content=chunk_text,
|
|
index=len(chunks),
|
|
token_count=current_tokens
|
|
))
|
|
|
|
return chunks
|
|
|
|
|
|
# ============================================================
|
|
# KNOWLEDGE SERVICE CLASS
|
|
# ============================================================
|
|
|
|
class ZOPKKnowledgeService:
|
|
"""
|
|
Service for extracting knowledge from ZOPK news articles.
|
|
|
|
Features:
|
|
- Text chunking with overlap
|
|
- AI-powered fact extraction (Gemini)
|
|
- Named entity recognition
|
|
- Relation extraction
|
|
- Entity deduplication
|
|
"""
|
|
|
|
def __init__(self, db_session, user_id: Optional[int] = None):
|
|
"""
|
|
Initialize service.
|
|
|
|
Args:
|
|
db_session: SQLAlchemy database session
|
|
user_id: Optional user ID for cost tracking
|
|
"""
|
|
self.db = db_session
|
|
self.user_id = user_id
|
|
self._gemini_service = None
|
|
|
|
@property
|
|
def gemini(self):
|
|
"""Lazy-load Gemini service."""
|
|
if self._gemini_service is None:
|
|
from gemini_service import GeminiService
|
|
self._gemini_service = GeminiService()
|
|
return self._gemini_service
|
|
|
|
def _normalize_entity_name(self, name: str) -> str:
|
|
"""Normalize entity name for deduplication."""
|
|
if not name:
|
|
return ''
|
|
# Lowercase, remove extra spaces
|
|
normalized = name.lower().strip()
|
|
normalized = re.sub(r'\s+', ' ', normalized)
|
|
# Remove Polish diacritics for matching
|
|
trans = str.maketrans('ąćęłńóśźżĄĆĘŁŃÓŚŹŻ', 'acelnoszzACELNOSZZ')
|
|
normalized = normalized.translate(trans)
|
|
return normalized
|
|
|
|
def _find_or_create_entity(
|
|
self,
|
|
name: str,
|
|
entity_type: str,
|
|
description: Optional[str] = None
|
|
) -> ZOPKKnowledgeEntity:
|
|
"""
|
|
Find existing entity or create new one.
|
|
Uses normalized name for deduplication.
|
|
"""
|
|
normalized = self._normalize_entity_name(name)
|
|
|
|
# Try to find existing
|
|
existing = self.db.query(ZOPKKnowledgeEntity).filter(
|
|
ZOPKKnowledgeEntity.normalized_name == normalized
|
|
).first()
|
|
|
|
if existing:
|
|
# Update mention count
|
|
existing.mentions_count = (existing.mentions_count or 0) + 1
|
|
existing.last_mentioned_at = datetime.now()
|
|
# Update description if better
|
|
if description and (not existing.description or len(description) > len(existing.description)):
|
|
existing.description = description
|
|
return existing
|
|
|
|
# Create new entity
|
|
entity = ZOPKKnowledgeEntity(
|
|
entity_type=entity_type,
|
|
name=name,
|
|
normalized_name=normalized,
|
|
description=description,
|
|
short_description=description[:500] if description else None,
|
|
mentions_count=1,
|
|
first_mentioned_at=datetime.now(),
|
|
last_mentioned_at=datetime.now()
|
|
)
|
|
self.db.add(entity)
|
|
self.db.flush() # Get ID
|
|
|
|
return entity
|
|
|
|
def _extract_with_ai(
|
|
self,
|
|
chunk: ChunkData,
|
|
source_name: str,
|
|
published_date: str
|
|
) -> Optional[Dict]:
|
|
"""
|
|
Extract facts, entities, and relations using Gemini AI.
|
|
|
|
Returns parsed JSON or None on error.
|
|
"""
|
|
try:
|
|
# Truncate chunk to avoid Gemini safety filter issues with long texts
|
|
# Testing showed ~4000 chars triggers safety blocks, ~2000 chars works
|
|
MAX_PROMPT_CHARS = 2000
|
|
chunk_text = chunk.content[:MAX_PROMPT_CHARS]
|
|
if len(chunk.content) > MAX_PROMPT_CHARS:
|
|
logger.debug(f"Truncated chunk from {len(chunk.content)} to {MAX_PROMPT_CHARS} chars")
|
|
|
|
# Simplified single prompt (system prompt removed to avoid safety filter issues)
|
|
prompt = EXTRACTION_USER_PROMPT.format(
|
|
chunk_text=chunk_text,
|
|
source_name=source_name,
|
|
published_date=published_date
|
|
)
|
|
|
|
# NOTE: max_tokens removed - testing showed it triggers safety filters!
|
|
response = self.gemini.generate_text(
|
|
prompt=prompt,
|
|
temperature=0.1, # Low temperature for consistency
|
|
user_id=self.user_id,
|
|
feature='zopk_knowledge_extraction'
|
|
)
|
|
|
|
if not response:
|
|
logger.warning("Empty response from Gemini")
|
|
return None
|
|
|
|
# Parse JSON from response
|
|
# Handle markdown code blocks
|
|
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response)
|
|
if json_match:
|
|
json_str = json_match.group(1)
|
|
else:
|
|
json_str = response
|
|
|
|
# Clean and parse
|
|
json_str = json_str.strip()
|
|
data = json.loads(json_str)
|
|
|
|
return data
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parse error: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"AI extraction error: {e}")
|
|
return None
|
|
|
|
def _save_chunk(
|
|
self,
|
|
news: ZOPKNews,
|
|
chunk: ChunkData,
|
|
extraction_data: Optional[Dict]
|
|
) -> ZOPKKnowledgeChunk:
|
|
"""Save chunk to database."""
|
|
db_chunk = ZOPKKnowledgeChunk(
|
|
source_news_id=news.id,
|
|
content=chunk.content,
|
|
chunk_index=chunk.index,
|
|
token_count=chunk.token_count,
|
|
chunk_type='narrative', # Default
|
|
summary=extraction_data.get('summary') if extraction_data else None,
|
|
keywords=extraction_data.get('keywords') if extraction_data else None,
|
|
language='pl',
|
|
extraction_model='gemini-3-flash-preview',
|
|
extracted_at=datetime.now()
|
|
)
|
|
self.db.add(db_chunk)
|
|
self.db.flush()
|
|
|
|
return db_chunk
|
|
|
|
def _save_facts(
|
|
self,
|
|
chunk: ZOPKKnowledgeChunk,
|
|
news: ZOPKNews,
|
|
facts_data: List[Dict]
|
|
) -> int:
|
|
"""Save extracted facts to database."""
|
|
count = 0
|
|
|
|
for fact in facts_data[:MAX_FACTS_PER_CHUNK]:
|
|
try:
|
|
# Parse numeric value
|
|
numeric_value = fact.get('numeric_value')
|
|
if numeric_value is not None:
|
|
try:
|
|
numeric_value = float(numeric_value)
|
|
except (ValueError, TypeError):
|
|
numeric_value = None
|
|
|
|
# Parse date
|
|
date_value = fact.get('date_value')
|
|
if date_value:
|
|
try:
|
|
from datetime import datetime as dt
|
|
date_value = dt.strptime(date_value, '%Y-%m-%d').date()
|
|
except (ValueError, TypeError):
|
|
date_value = None
|
|
|
|
# Support both old format (full_text) and new simplified format (text)
|
|
fact_text = fact.get('text') or fact.get('full_text', '')
|
|
|
|
db_fact = ZOPKKnowledgeFact(
|
|
source_chunk_id=chunk.id,
|
|
source_news_id=news.id,
|
|
fact_type=fact.get('type', 'statement'),
|
|
subject=fact.get('subject'),
|
|
predicate=fact.get('predicate'),
|
|
object=fact.get('object'),
|
|
full_text=fact_text,
|
|
numeric_value=numeric_value,
|
|
numeric_unit=fact.get('numeric_unit'),
|
|
date_value=date_value,
|
|
confidence_score=fact.get('confidence', 0.5)
|
|
)
|
|
self.db.add(db_fact)
|
|
count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving fact: {e}")
|
|
continue
|
|
|
|
return count
|
|
|
|
def _save_entities_and_relations(
|
|
self,
|
|
chunk: ZOPKKnowledgeChunk,
|
|
entities_data: List[Dict],
|
|
relations_data: List[Dict]
|
|
) -> Tuple[int, int]:
|
|
"""Save entities and relations to database."""
|
|
entity_count = 0
|
|
relation_count = 0
|
|
|
|
# Map of name -> entity for relations
|
|
entity_map = {}
|
|
|
|
# Save entities
|
|
for ent in entities_data[:MAX_ENTITIES_PER_CHUNK]:
|
|
try:
|
|
entity = self._find_or_create_entity(
|
|
name=ent.get('name', ''),
|
|
entity_type=ent.get('type', 'organization'),
|
|
description=ent.get('description')
|
|
)
|
|
entity_map[ent.get('name', '').lower()] = entity
|
|
|
|
# Create mention link
|
|
mention = ZOPKKnowledgeEntityMention(
|
|
chunk_id=chunk.id,
|
|
entity_id=entity.id,
|
|
mention_text=ent.get('name'),
|
|
mention_type='direct',
|
|
role_in_context=ent.get('role'),
|
|
confidence=0.9
|
|
)
|
|
self.db.add(mention)
|
|
entity_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving entity: {e}")
|
|
continue
|
|
|
|
# Flush to get entity IDs
|
|
self.db.flush()
|
|
|
|
# Save relations
|
|
for rel in relations_data:
|
|
try:
|
|
entity_a_name = rel.get('entity_a', '').lower()
|
|
entity_b_name = rel.get('entity_b', '').lower()
|
|
|
|
entity_a = entity_map.get(entity_a_name)
|
|
entity_b = entity_map.get(entity_b_name)
|
|
|
|
if not entity_a or not entity_b:
|
|
continue
|
|
|
|
# Check if relation already exists
|
|
existing = self.db.query(ZOPKKnowledgeRelation).filter(
|
|
ZOPKKnowledgeRelation.entity_a_id == entity_a.id,
|
|
ZOPKKnowledgeRelation.entity_b_id == entity_b.id,
|
|
ZOPKKnowledgeRelation.relation_type == rel.get('relation')
|
|
).first()
|
|
|
|
if not existing:
|
|
db_relation = ZOPKKnowledgeRelation(
|
|
entity_a_id=entity_a.id,
|
|
entity_b_id=entity_b.id,
|
|
relation_type=rel.get('relation', 'cooperates_with'),
|
|
evidence_text=rel.get('description'),
|
|
source_chunk_id=chunk.id,
|
|
confidence=0.8,
|
|
strength=3
|
|
)
|
|
self.db.add(db_relation)
|
|
relation_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving relation: {e}")
|
|
continue
|
|
|
|
return entity_count, relation_count
|
|
|
|
def extract_from_news(self, news_id: int) -> ExtractionResult:
|
|
"""
|
|
Extract knowledge from a single news article.
|
|
|
|
Args:
|
|
news_id: ID of ZOPKNews record
|
|
|
|
Returns:
|
|
ExtractionResult with statistics
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
# Get news record
|
|
news = self.db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first()
|
|
|
|
if not news:
|
|
return ExtractionResult(
|
|
success=False,
|
|
news_id=news_id,
|
|
error=f"News record {news_id} not found"
|
|
)
|
|
|
|
# Check if already extracted
|
|
if news.knowledge_extracted:
|
|
return ExtractionResult(
|
|
success=True,
|
|
news_id=news_id,
|
|
error="Już wyekstrahowano"
|
|
)
|
|
|
|
# Check if content is scraped
|
|
if not news.full_content:
|
|
return ExtractionResult(
|
|
success=False,
|
|
news_id=news_id,
|
|
error="Brak zescrapowanej treści"
|
|
)
|
|
|
|
logger.info(f"Extracting knowledge from news {news_id}: {news.title[:50]}...")
|
|
|
|
# Create chunks
|
|
chunks = create_chunks(news.full_content)
|
|
|
|
if not chunks:
|
|
return ExtractionResult(
|
|
success=False,
|
|
news_id=news_id,
|
|
error="Treść za krótka do ekstrakcji"
|
|
)
|
|
|
|
# Statistics
|
|
chunks_created = 0
|
|
facts_created = 0
|
|
entities_created = 0
|
|
relations_created = 0
|
|
|
|
# Process each chunk
|
|
for chunk in chunks:
|
|
try:
|
|
# Extract with AI
|
|
extraction_data = self._extract_with_ai(
|
|
chunk=chunk,
|
|
source_name=news.source_name or 'unknown',
|
|
published_date=news.published_at.strftime('%Y-%m-%d') if news.published_at else 'unknown'
|
|
)
|
|
|
|
# Save chunk
|
|
db_chunk = self._save_chunk(news, chunk, extraction_data)
|
|
chunks_created += 1
|
|
|
|
if extraction_data:
|
|
# Save facts
|
|
facts_created += self._save_facts(
|
|
db_chunk,
|
|
news,
|
|
extraction_data.get('facts', [])
|
|
)
|
|
|
|
# Save entities and relations
|
|
ent_count, rel_count = self._save_entities_and_relations(
|
|
db_chunk,
|
|
extraction_data.get('entities', []),
|
|
extraction_data.get('relations', [])
|
|
)
|
|
entities_created += ent_count
|
|
relations_created += rel_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing chunk {chunk.index}: {e}")
|
|
continue
|
|
|
|
# Mark as extracted
|
|
news.knowledge_extracted = True
|
|
news.knowledge_extracted_at = datetime.now()
|
|
self.db.commit()
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
logger.info(
|
|
f"Extracted from news {news_id}: "
|
|
f"{chunks_created} chunks, {facts_created} facts, "
|
|
f"{entities_created} entities, {relations_created} relations "
|
|
f"in {processing_time:.2f}s"
|
|
)
|
|
|
|
return ExtractionResult(
|
|
success=True,
|
|
news_id=news_id,
|
|
chunks_created=chunks_created,
|
|
facts_created=facts_created,
|
|
entities_created=entities_created,
|
|
relations_created=relations_created,
|
|
processing_time=processing_time
|
|
)
|
|
|
|
def batch_extract(self, limit: int = 50, progress_callback: ProgressCallback = None) -> Dict:
|
|
"""
|
|
Batch extract knowledge from scraped articles.
|
|
|
|
Args:
|
|
limit: Maximum number of articles to process
|
|
progress_callback: Optional callback for progress updates
|
|
|
|
Returns:
|
|
Dict with statistics
|
|
"""
|
|
import time
|
|
|
|
logger.info(f"Starting batch extraction: limit={limit}")
|
|
|
|
# Find articles ready for extraction
|
|
articles = self.db.query(ZOPKNews).filter(
|
|
ZOPKNews.status.in_(['approved', 'auto_approved']),
|
|
ZOPKNews.scrape_status == 'scraped',
|
|
ZOPKNews.knowledge_extracted == False
|
|
).order_by(
|
|
ZOPKNews.created_at.desc()
|
|
).limit(limit).all()
|
|
|
|
total = len(articles)
|
|
stats = {
|
|
'total': total,
|
|
'success': 0,
|
|
'failed': 0,
|
|
'chunks_created': 0,
|
|
'facts_created': 0,
|
|
'entities_created': 0,
|
|
'relations_created': 0,
|
|
'errors': [],
|
|
'processing_time': 0
|
|
}
|
|
|
|
# Send initial progress
|
|
if progress_callback and total > 0:
|
|
progress_callback(ProgressUpdate(
|
|
current=0,
|
|
total=total,
|
|
percent=0.0,
|
|
stage='extracting',
|
|
status='processing',
|
|
message=f'Rozpoczynam ekstrakcję wiedzy z {total} artykułów...',
|
|
details={'success': 0, 'failed': 0, 'chunks': 0, 'facts': 0, 'entities': 0}
|
|
))
|
|
|
|
start_time = time.time()
|
|
|
|
for idx, article in enumerate(articles, 1):
|
|
# Send progress update before processing
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round((idx - 1) / total * 100, 1),
|
|
stage='extracting',
|
|
status='processing',
|
|
message=f'Analizuję przez AI: {article.title[:50]}...',
|
|
article_id=article.id,
|
|
article_title=article.title[:80],
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'chunks': stats['chunks_created'],
|
|
'facts': stats['facts_created'],
|
|
'entities': stats['entities_created']
|
|
}
|
|
))
|
|
|
|
result = self.extract_from_news(article.id)
|
|
|
|
if result.success:
|
|
stats['success'] += 1
|
|
stats['chunks_created'] += result.chunks_created
|
|
stats['facts_created'] += result.facts_created
|
|
stats['entities_created'] += result.entities_created
|
|
stats['relations_created'] += result.relations_created
|
|
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round(idx / total * 100, 1),
|
|
stage='extracting',
|
|
status='success',
|
|
message=f'✓ Wyekstrahowano: {result.chunks_created} chunks, {result.facts_created} faktów, {result.entities_created} encji',
|
|
article_id=article.id,
|
|
article_title=article.title[:80],
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'chunks': stats['chunks_created'],
|
|
'facts': stats['facts_created'],
|
|
'entities': stats['entities_created'],
|
|
'new_chunks': result.chunks_created,
|
|
'new_facts': result.facts_created,
|
|
'new_entities': result.entities_created
|
|
}
|
|
))
|
|
else:
|
|
stats['failed'] += 1
|
|
if result.error:
|
|
stats['errors'].append({
|
|
'id': article.id,
|
|
'title': article.title[:100],
|
|
'error': result.error
|
|
})
|
|
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round(idx / total * 100, 1),
|
|
stage='extracting',
|
|
status='failed',
|
|
message=f'✗ Błąd ekstrakcji: {result.error[:50]}...' if result.error else '✗ Błąd',
|
|
article_id=article.id,
|
|
article_title=article.title[:80],
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'error': result.error
|
|
}
|
|
))
|
|
|
|
stats['processing_time'] = round(time.time() - start_time, 2)
|
|
|
|
# Send completion progress
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=total,
|
|
total=total,
|
|
percent=100.0,
|
|
stage='extracting',
|
|
status='complete',
|
|
message=f'Zakończono: {stats["success"]}/{total} artykułów. '
|
|
f'Utworzono: {stats["chunks_created"]} chunks, {stats["facts_created"]} faktów, '
|
|
f'{stats["entities_created"]} encji',
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'chunks': stats['chunks_created'],
|
|
'facts': stats['facts_created'],
|
|
'entities': stats['entities_created'],
|
|
'relations': stats['relations_created'],
|
|
'processing_time': stats['processing_time']
|
|
}
|
|
))
|
|
|
|
logger.info(
|
|
f"Batch extraction complete: {stats['success']}/{stats['total']} success "
|
|
f"in {stats['processing_time']}s"
|
|
)
|
|
|
|
return stats
|
|
|
|
def get_extraction_statistics(self) -> Dict:
|
|
"""Get knowledge extraction statistics."""
|
|
from sqlalchemy import func
|
|
|
|
# Articles stats
|
|
total_approved = self.db.query(func.count(ZOPKNews.id)).filter(
|
|
ZOPKNews.status.in_(['approved', 'auto_approved'])
|
|
).scalar()
|
|
|
|
scraped = self.db.query(func.count(ZOPKNews.id)).filter(
|
|
ZOPKNews.scrape_status == 'scraped'
|
|
).scalar()
|
|
|
|
# Pending scrape: approved but not yet scraped
|
|
pending_scrape = self.db.query(func.count(ZOPKNews.id)).filter(
|
|
ZOPKNews.status.in_(['approved', 'auto_approved']),
|
|
ZOPKNews.scrape_status.in_(['pending', None])
|
|
).scalar()
|
|
|
|
extracted = self.db.query(func.count(ZOPKNews.id)).filter(
|
|
ZOPKNews.knowledge_extracted == True
|
|
).scalar()
|
|
|
|
pending_extract = self.db.query(func.count(ZOPKNews.id)).filter(
|
|
ZOPKNews.scrape_status == 'scraped',
|
|
ZOPKNews.knowledge_extracted == False
|
|
).scalar()
|
|
|
|
# Knowledge base stats
|
|
total_chunks = self.db.query(func.count(ZOPKKnowledgeChunk.id)).scalar()
|
|
total_facts = self.db.query(func.count(ZOPKKnowledgeFact.id)).scalar()
|
|
total_entities = self.db.query(func.count(ZOPKKnowledgeEntity.id)).scalar()
|
|
total_relations = self.db.query(func.count(ZOPKKnowledgeRelation.id)).scalar()
|
|
|
|
# Embeddings stats
|
|
chunks_with_embeddings = self.db.query(func.count(ZOPKKnowledgeChunk.id)).filter(
|
|
ZOPKKnowledgeChunk.embedding.isnot(None)
|
|
).scalar()
|
|
|
|
chunks_without_embeddings = (total_chunks or 0) - (chunks_with_embeddings or 0)
|
|
|
|
# Top entities by mentions
|
|
top_entities = self.db.query(
|
|
ZOPKKnowledgeEntity.name,
|
|
ZOPKKnowledgeEntity.entity_type,
|
|
ZOPKKnowledgeEntity.mentions_count
|
|
).order_by(
|
|
ZOPKKnowledgeEntity.mentions_count.desc()
|
|
).limit(10).all()
|
|
|
|
return {
|
|
'articles': {
|
|
'total_approved': total_approved or 0,
|
|
'scraped': scraped or 0,
|
|
'pending_scrape': pending_scrape or 0,
|
|
'extracted': extracted or 0,
|
|
'pending_extract': pending_extract or 0
|
|
},
|
|
'knowledge_base': {
|
|
'total_chunks': total_chunks or 0,
|
|
'total_facts': total_facts or 0,
|
|
'total_entities': total_entities or 0,
|
|
'total_relations': total_relations or 0,
|
|
'chunks_with_embeddings': chunks_with_embeddings or 0,
|
|
'chunks_without_embeddings': chunks_without_embeddings or 0
|
|
},
|
|
'top_entities': [
|
|
{'name': e[0], 'type': e[1], 'mentions': e[2]}
|
|
for e in top_entities
|
|
]
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# SEMANTIC SEARCH (FAZA 2)
|
|
# ============================================================
|
|
|
|
def search_knowledge(
|
|
db_session,
|
|
query: str,
|
|
limit: int = 5,
|
|
min_similarity: float = 0.3,
|
|
user_id: Optional[int] = None
|
|
) -> List[Dict]:
|
|
"""
|
|
Semantic search in ZOPK knowledge base.
|
|
|
|
Args:
|
|
db_session: SQLAlchemy session
|
|
query: Search query
|
|
limit: Max results to return
|
|
min_similarity: Minimum cosine similarity (0-1)
|
|
user_id: User ID for cost tracking
|
|
|
|
Returns:
|
|
List of matching chunks with similarity scores
|
|
"""
|
|
from gemini_service import GeminiService
|
|
import json
|
|
|
|
# Generate query embedding
|
|
gemini = GeminiService()
|
|
query_embedding = gemini.generate_embedding(
|
|
text=query,
|
|
task_type='retrieval_query',
|
|
user_id=user_id,
|
|
feature='zopk_knowledge_search'
|
|
)
|
|
|
|
if not query_embedding:
|
|
logger.warning("Failed to generate query embedding")
|
|
return []
|
|
|
|
# Search in database
|
|
# Note: This uses PostgreSQL pgvector for efficient similarity search
|
|
# For now, we'll do a simpler approach with JSON embeddings
|
|
|
|
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
|
|
ZOPKKnowledgeChunk.embedding.isnot(None)
|
|
).all()
|
|
|
|
results = []
|
|
|
|
for chunk in chunks:
|
|
try:
|
|
# Parse stored embedding
|
|
if isinstance(chunk.embedding, str):
|
|
chunk_embedding = json.loads(chunk.embedding)
|
|
else:
|
|
chunk_embedding = chunk.embedding
|
|
|
|
if not chunk_embedding:
|
|
continue
|
|
|
|
# Calculate cosine similarity
|
|
similarity = _cosine_similarity(query_embedding, chunk_embedding)
|
|
|
|
if similarity >= min_similarity:
|
|
results.append({
|
|
'chunk_id': chunk.id,
|
|
'content': chunk.content[:500],
|
|
'summary': chunk.summary,
|
|
'keywords': chunk.keywords,
|
|
'similarity': round(similarity, 4),
|
|
'source_news_id': chunk.source_news_id,
|
|
'importance': chunk.importance_score
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error processing chunk {chunk.id}: {e}")
|
|
continue
|
|
|
|
# Sort by similarity
|
|
results.sort(key=lambda x: x['similarity'], reverse=True)
|
|
|
|
return results[:limit]
|
|
|
|
|
|
def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
|
|
"""Calculate cosine similarity between two vectors."""
|
|
import math
|
|
|
|
if len(vec1) != len(vec2):
|
|
return 0.0
|
|
|
|
dot_product = sum(a * b for a, b in zip(vec1, vec2))
|
|
norm1 = math.sqrt(sum(a * a for a in vec1))
|
|
norm2 = math.sqrt(sum(b * b for b in vec2))
|
|
|
|
if norm1 == 0 or norm2 == 0:
|
|
return 0.0
|
|
|
|
return dot_product / (norm1 * norm2)
|
|
|
|
|
|
def get_relevant_facts(
|
|
db_session,
|
|
query: str,
|
|
limit: int = 10
|
|
) -> List[Dict]:
|
|
"""
|
|
Get facts relevant to a query.
|
|
|
|
Uses keyword matching for now, can be enhanced with embeddings.
|
|
"""
|
|
from sqlalchemy import or_
|
|
|
|
# Simple keyword search
|
|
keywords = query.lower().split()
|
|
|
|
# NOTE: Removed is_verified filter - auto-extracted facts are usable
|
|
# Future: add manual verification workflow and re-enable filter
|
|
facts = db_session.query(ZOPKKnowledgeFact).filter(
|
|
ZOPKKnowledgeFact.confidence_score >= 0.3 # Minimum confidence threshold
|
|
).all()
|
|
|
|
results = []
|
|
|
|
for fact in facts:
|
|
score = 0
|
|
fact_text = (fact.full_text or '').lower()
|
|
|
|
for keyword in keywords:
|
|
if keyword in fact_text:
|
|
score += 1
|
|
|
|
if score > 0:
|
|
# Get source URL from related news
|
|
source_url = ''
|
|
source_name = ''
|
|
source_date = ''
|
|
if fact.source_news:
|
|
source_url = fact.source_news.url or ''
|
|
source_name = fact.source_news.source_name or fact.source_news.source_domain or ''
|
|
if fact.source_news.published_at:
|
|
source_date = fact.source_news.published_at.strftime('%Y-%m-%d')
|
|
|
|
results.append({
|
|
'fact_id': fact.id,
|
|
'fact_type': fact.fact_type,
|
|
'full_text': fact.full_text,
|
|
'subject': fact.subject,
|
|
'numeric_value': float(fact.numeric_value) if fact.numeric_value else None,
|
|
'numeric_unit': fact.numeric_unit,
|
|
'confidence': float(fact.confidence_score) if fact.confidence_score else None,
|
|
'relevance_score': score,
|
|
'source_url': source_url,
|
|
'source_name': source_name,
|
|
'source_date': source_date
|
|
})
|
|
|
|
# Sort by relevance
|
|
results.sort(key=lambda x: x['relevance_score'], reverse=True)
|
|
|
|
return results[:limit]
|
|
|
|
|
|
def generate_chunk_embeddings(
|
|
db_session,
|
|
limit: int = 100,
|
|
user_id: Optional[int] = None,
|
|
progress_callback: ProgressCallback = None
|
|
) -> Dict:
|
|
"""
|
|
Generate embeddings for chunks that don't have them.
|
|
|
|
Args:
|
|
db_session: SQLAlchemy session
|
|
limit: Max chunks to process
|
|
user_id: User ID for cost tracking
|
|
progress_callback: Optional callback for progress updates
|
|
|
|
Returns:
|
|
Dict with statistics
|
|
"""
|
|
import json
|
|
import time
|
|
from gemini_service import GeminiService
|
|
|
|
gemini = GeminiService()
|
|
|
|
# Find chunks without embeddings
|
|
chunks = db_session.query(ZOPKKnowledgeChunk).filter(
|
|
ZOPKKnowledgeChunk.embedding.is_(None)
|
|
).limit(limit).all()
|
|
|
|
total = len(chunks)
|
|
stats = {
|
|
'total': total,
|
|
'success': 0,
|
|
'failed': 0,
|
|
'processing_time': 0
|
|
}
|
|
|
|
# Send initial progress
|
|
if progress_callback and total > 0:
|
|
progress_callback(ProgressUpdate(
|
|
current=0,
|
|
total=total,
|
|
percent=0.0,
|
|
stage='embedding',
|
|
status='processing',
|
|
message=f'Rozpoczynam generowanie embeddingów dla {total} chunks...',
|
|
details={'success': 0, 'failed': 0}
|
|
))
|
|
|
|
start_time = time.time()
|
|
|
|
for idx, chunk in enumerate(chunks, 1):
|
|
# Send progress update before processing
|
|
if progress_callback:
|
|
# Get article title from chunk's source news
|
|
article_title = None
|
|
if chunk.source_news:
|
|
article_title = chunk.source_news.title[:80]
|
|
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round((idx - 1) / total * 100, 1),
|
|
stage='embedding',
|
|
status='processing',
|
|
message=f'Generuję embedding {idx}/{total}: {chunk.summary[:40] if chunk.summary else "chunk"}...',
|
|
article_id=chunk.source_news_id,
|
|
article_title=article_title,
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'chunk_id': chunk.id
|
|
}
|
|
))
|
|
|
|
try:
|
|
embedding = gemini.generate_embedding(
|
|
text=chunk.content,
|
|
task_type='retrieval_document',
|
|
title=chunk.summary,
|
|
user_id=user_id,
|
|
feature='zopk_chunk_embedding'
|
|
)
|
|
|
|
if embedding:
|
|
# Store as JSON string
|
|
chunk.embedding = json.dumps(embedding)
|
|
stats['success'] += 1
|
|
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round(idx / total * 100, 1),
|
|
stage='embedding',
|
|
status='success',
|
|
message=f'✓ Wygenerowano embedding (768 dim)',
|
|
article_id=chunk.source_news_id,
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'chunk_id': chunk.id
|
|
}
|
|
))
|
|
else:
|
|
stats['failed'] += 1
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round(idx / total * 100, 1),
|
|
stage='embedding',
|
|
status='failed',
|
|
message='✗ Nie udało się wygenerować embeddingu',
|
|
article_id=chunk.source_news_id,
|
|
details={'success': stats['success'], 'failed': stats['failed']}
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating embedding for chunk {chunk.id}: {e}")
|
|
stats['failed'] += 1
|
|
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=idx,
|
|
total=total,
|
|
percent=round(idx / total * 100, 1),
|
|
stage='embedding',
|
|
status='failed',
|
|
message=f'✗ Błąd: {str(e)[:50]}...',
|
|
article_id=chunk.source_news_id,
|
|
details={'success': stats['success'], 'failed': stats['failed'], 'error': str(e)}
|
|
))
|
|
|
|
db_session.commit()
|
|
stats['processing_time'] = round(time.time() - start_time, 2)
|
|
|
|
# Send completion progress
|
|
if progress_callback:
|
|
progress_callback(ProgressUpdate(
|
|
current=total,
|
|
total=total,
|
|
percent=100.0,
|
|
stage='embedding',
|
|
status='complete',
|
|
message=f'Zakończono: {stats["success"]}/{total} embeddingów wygenerowanych',
|
|
details={
|
|
'success': stats['success'],
|
|
'failed': stats['failed'],
|
|
'processing_time': stats['processing_time']
|
|
}
|
|
))
|
|
|
|
logger.info(f"Generated embeddings: {stats['success']}/{stats['total']} success")
|
|
|
|
return stats
|
|
|
|
|
|
# ============================================================
|
|
# STANDALONE FUNCTIONS FOR CRON/CLI
|
|
# ============================================================
|
|
|
|
def extract_pending_articles(db_session, limit: int = 50) -> Dict:
|
|
"""
|
|
Convenience function for cron jobs.
|
|
|
|
Usage:
|
|
from zopk_knowledge_service import extract_pending_articles
|
|
result = extract_pending_articles(db_session, limit=50)
|
|
"""
|
|
service = ZOPKKnowledgeService(db_session)
|
|
return service.batch_extract(limit=limit)
|
|
|
|
|
|
def get_knowledge_stats(db_session) -> Dict:
|
|
"""
|
|
Get knowledge extraction statistics for monitoring.
|
|
"""
|
|
service = ZOPKKnowledgeService(db_session)
|
|
return service.get_extraction_statistics()
|
|
|
|
|
|
# ============================================================
|
|
# ADMIN PANEL - LIST FUNCTIONS
|
|
# ============================================================
|
|
|
|
def list_chunks(
|
|
db_session,
|
|
page: int = 1,
|
|
per_page: int = 20,
|
|
source_news_id: Optional[int] = None,
|
|
has_embedding: Optional[bool] = None,
|
|
is_verified: Optional[bool] = None
|
|
) -> Dict:
|
|
"""
|
|
List knowledge chunks with pagination and filtering.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
page: Page number (1-based)
|
|
per_page: Items per page
|
|
source_news_id: Filter by source article
|
|
has_embedding: Filter by embedding status
|
|
is_verified: Filter by verification status
|
|
|
|
Returns:
|
|
{
|
|
'chunks': [...],
|
|
'total': int,
|
|
'page': int,
|
|
'per_page': int,
|
|
'pages': int
|
|
}
|
|
"""
|
|
from sqlalchemy import func
|
|
|
|
query = db_session.query(ZOPKKnowledgeChunk)
|
|
|
|
# Apply filters
|
|
if source_news_id:
|
|
query = query.filter(ZOPKKnowledgeChunk.source_news_id == source_news_id)
|
|
|
|
if has_embedding is not None:
|
|
if has_embedding:
|
|
query = query.filter(ZOPKKnowledgeChunk.embedding.isnot(None))
|
|
else:
|
|
query = query.filter(ZOPKKnowledgeChunk.embedding.is_(None))
|
|
|
|
if is_verified is not None:
|
|
query = query.filter(ZOPKKnowledgeChunk.is_verified == is_verified)
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Calculate pagination
|
|
pages = (total + per_page - 1) // per_page
|
|
offset = (page - 1) * per_page
|
|
|
|
# Get chunks with source news info
|
|
chunks = query.order_by(
|
|
ZOPKKnowledgeChunk.created_at.desc()
|
|
).offset(offset).limit(per_page).all()
|
|
|
|
return {
|
|
'chunks': [
|
|
{
|
|
'id': c.id,
|
|
'content': c.content[:300] + '...' if len(c.content) > 300 else c.content,
|
|
'full_content': c.content,
|
|
'summary': c.summary,
|
|
'chunk_type': c.chunk_type,
|
|
'chunk_index': c.chunk_index,
|
|
'token_count': c.token_count,
|
|
'importance_score': c.importance_score,
|
|
'confidence_score': float(c.confidence_score) if c.confidence_score else None,
|
|
'has_embedding': c.embedding is not None,
|
|
'is_verified': c.is_verified,
|
|
'source_news_id': c.source_news_id,
|
|
'source_title': c.source_news.title if c.source_news else None,
|
|
'source_url': c.source_news.url if c.source_news else None,
|
|
'created_at': c.created_at.isoformat() if c.created_at else None,
|
|
'keywords': c.keywords if isinstance(c.keywords, list) else []
|
|
}
|
|
for c in chunks
|
|
],
|
|
'total': total,
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'pages': pages
|
|
}
|
|
|
|
|
|
def list_facts(
|
|
db_session,
|
|
page: int = 1,
|
|
per_page: int = 20,
|
|
fact_type: Optional[str] = None,
|
|
is_verified: Optional[bool] = None,
|
|
source_news_id: Optional[int] = None
|
|
) -> Dict:
|
|
"""
|
|
List knowledge facts with pagination and filtering.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
page: Page number (1-based)
|
|
per_page: Items per page
|
|
fact_type: Filter by fact type (statistic, event, statement, decision, milestone)
|
|
is_verified: Filter by verification status
|
|
source_news_id: Filter by source article
|
|
|
|
Returns:
|
|
{
|
|
'facts': [...],
|
|
'total': int,
|
|
'page': int,
|
|
'per_page': int,
|
|
'pages': int,
|
|
'fact_types': [...] - available types for filtering
|
|
}
|
|
"""
|
|
from sqlalchemy import func, distinct
|
|
|
|
query = db_session.query(ZOPKKnowledgeFact)
|
|
|
|
# Apply filters
|
|
if fact_type:
|
|
query = query.filter(ZOPKKnowledgeFact.fact_type == fact_type)
|
|
|
|
if is_verified is not None:
|
|
query = query.filter(ZOPKKnowledgeFact.is_verified == is_verified)
|
|
|
|
if source_news_id:
|
|
query = query.filter(ZOPKKnowledgeFact.source_news_id == source_news_id)
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Get available fact types
|
|
fact_types = db_session.query(
|
|
distinct(ZOPKKnowledgeFact.fact_type)
|
|
).filter(
|
|
ZOPKKnowledgeFact.fact_type.isnot(None)
|
|
).all()
|
|
fact_types = [f[0] for f in fact_types if f[0]]
|
|
|
|
# Calculate pagination
|
|
pages = (total + per_page - 1) // per_page
|
|
offset = (page - 1) * per_page
|
|
|
|
# Get facts
|
|
facts = query.order_by(
|
|
ZOPKKnowledgeFact.created_at.desc()
|
|
).offset(offset).limit(per_page).all()
|
|
|
|
return {
|
|
'facts': [
|
|
{
|
|
'id': f.id,
|
|
'fact_type': f.fact_type,
|
|
'subject': f.subject,
|
|
'predicate': f.predicate,
|
|
'object': f.object,
|
|
'full_text': f.full_text,
|
|
'numeric_value': float(f.numeric_value) if f.numeric_value else None,
|
|
'numeric_unit': f.numeric_unit,
|
|
'date_value': f.date_value.isoformat() if f.date_value else None,
|
|
'confidence_score': float(f.confidence_score) if f.confidence_score else None,
|
|
'is_verified': f.is_verified,
|
|
'source_news_id': f.source_news_id,
|
|
'source_chunk_id': f.source_chunk_id,
|
|
'source_title': f.source_news.title if f.source_news else None,
|
|
'source_url': f.source_news.url if f.source_news else None,
|
|
'entities_involved': f.entities_involved if isinstance(f.entities_involved, list) else [],
|
|
'created_at': f.created_at.isoformat() if f.created_at else None
|
|
}
|
|
for f in facts
|
|
],
|
|
'total': total,
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'pages': pages,
|
|
'fact_types': fact_types
|
|
}
|
|
|
|
|
|
def list_entities(
|
|
db_session,
|
|
page: int = 1,
|
|
per_page: int = 20,
|
|
entity_type: Optional[str] = None,
|
|
is_verified: Optional[bool] = None,
|
|
min_mentions: Optional[int] = None
|
|
) -> Dict:
|
|
"""
|
|
List knowledge entities with pagination and filtering.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
page: Page number (1-based)
|
|
per_page: Items per page
|
|
entity_type: Filter by entity type (company, person, place, organization, project, technology)
|
|
is_verified: Filter by verification status
|
|
min_mentions: Filter by minimum mention count
|
|
|
|
Returns:
|
|
{
|
|
'entities': [...],
|
|
'total': int,
|
|
'page': int,
|
|
'per_page': int,
|
|
'pages': int,
|
|
'entity_types': [...] - available types for filtering
|
|
}
|
|
"""
|
|
from sqlalchemy import func, distinct
|
|
|
|
query = db_session.query(ZOPKKnowledgeEntity)
|
|
|
|
# Exclude merged entities
|
|
query = query.filter(ZOPKKnowledgeEntity.merged_into_id.is_(None))
|
|
|
|
# Apply filters
|
|
if entity_type:
|
|
query = query.filter(ZOPKKnowledgeEntity.entity_type == entity_type)
|
|
|
|
if is_verified is not None:
|
|
query = query.filter(ZOPKKnowledgeEntity.is_verified == is_verified)
|
|
|
|
if min_mentions:
|
|
query = query.filter(ZOPKKnowledgeEntity.mentions_count >= min_mentions)
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Get available entity types
|
|
entity_types = db_session.query(
|
|
distinct(ZOPKKnowledgeEntity.entity_type)
|
|
).filter(
|
|
ZOPKKnowledgeEntity.entity_type.isnot(None)
|
|
).all()
|
|
entity_types = [e[0] for e in entity_types if e[0]]
|
|
|
|
# Calculate pagination
|
|
pages = (total + per_page - 1) // per_page
|
|
offset = (page - 1) * per_page
|
|
|
|
# Get entities sorted by mentions
|
|
entities = query.order_by(
|
|
ZOPKKnowledgeEntity.mentions_count.desc()
|
|
).offset(offset).limit(per_page).all()
|
|
|
|
return {
|
|
'entities': [
|
|
{
|
|
'id': e.id,
|
|
'name': e.name,
|
|
'normalized_name': e.normalized_name,
|
|
'entity_type': e.entity_type,
|
|
'description': e.description,
|
|
'short_description': e.short_description,
|
|
'aliases': e.aliases if isinstance(e.aliases, list) else [],
|
|
'mentions_count': e.mentions_count or 0,
|
|
'is_verified': e.is_verified,
|
|
'company_id': e.company_id,
|
|
'external_url': e.external_url,
|
|
'first_mentioned_at': e.first_mentioned_at.isoformat() if e.first_mentioned_at else None,
|
|
'last_mentioned_at': e.last_mentioned_at.isoformat() if e.last_mentioned_at else None,
|
|
'created_at': e.created_at.isoformat() if e.created_at else None
|
|
}
|
|
for e in entities
|
|
],
|
|
'total': total,
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'pages': pages,
|
|
'entity_types': entity_types
|
|
}
|
|
|
|
|
|
def get_chunk_detail(db_session, chunk_id: int) -> Optional[Dict]:
|
|
"""Get detailed information about a single chunk."""
|
|
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
|
|
ZOPKKnowledgeChunk.id == chunk_id
|
|
).first()
|
|
|
|
if not chunk:
|
|
return None
|
|
|
|
# Get facts from this chunk
|
|
facts = db_session.query(ZOPKKnowledgeFact).filter(
|
|
ZOPKKnowledgeFact.source_chunk_id == chunk_id
|
|
).all()
|
|
|
|
# Get entity mentions
|
|
mentions = db_session.query(ZOPKKnowledgeEntityMention).filter(
|
|
ZOPKKnowledgeEntityMention.chunk_id == chunk_id
|
|
).all()
|
|
|
|
return {
|
|
'id': chunk.id,
|
|
'content': chunk.content,
|
|
'content_clean': chunk.content_clean,
|
|
'summary': chunk.summary,
|
|
'chunk_type': chunk.chunk_type,
|
|
'chunk_index': chunk.chunk_index,
|
|
'token_count': chunk.token_count,
|
|
'importance_score': chunk.importance_score,
|
|
'confidence_score': float(chunk.confidence_score) if chunk.confidence_score else None,
|
|
'has_embedding': chunk.embedding is not None,
|
|
'is_verified': chunk.is_verified,
|
|
'keywords': chunk.keywords if isinstance(chunk.keywords, list) else [],
|
|
'context_date': chunk.context_date.isoformat() if chunk.context_date else None,
|
|
'context_location': chunk.context_location,
|
|
'extraction_model': chunk.extraction_model,
|
|
'extracted_at': chunk.extracted_at.isoformat() if chunk.extracted_at else None,
|
|
'created_at': chunk.created_at.isoformat() if chunk.created_at else None,
|
|
'source_news': {
|
|
'id': chunk.source_news.id,
|
|
'title': chunk.source_news.title,
|
|
'url': chunk.source_news.url,
|
|
'source_name': chunk.source_news.source_name
|
|
} if chunk.source_news else None,
|
|
'facts': [
|
|
{
|
|
'id': f.id,
|
|
'fact_type': f.fact_type,
|
|
'full_text': f.full_text,
|
|
'is_verified': f.is_verified
|
|
}
|
|
for f in facts
|
|
],
|
|
'entity_mentions': [
|
|
{
|
|
'id': m.id,
|
|
'entity_id': m.entity_id,
|
|
'entity_name': m.entity.name if m.entity else None,
|
|
'entity_type': m.entity.entity_type if m.entity else None,
|
|
'mention_text': m.mention_text
|
|
}
|
|
for m in mentions
|
|
]
|
|
}
|
|
|
|
|
|
def update_chunk_verification(db_session, chunk_id: int, is_verified: bool, user_id: int) -> bool:
|
|
"""Update chunk verification status."""
|
|
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
|
|
ZOPKKnowledgeChunk.id == chunk_id
|
|
).first()
|
|
|
|
if not chunk:
|
|
return False
|
|
|
|
chunk.is_verified = is_verified
|
|
chunk.verified_by = user_id
|
|
chunk.verified_at = datetime.now()
|
|
db_session.commit()
|
|
return True
|
|
|
|
|
|
def update_fact_verification(db_session, fact_id: int, is_verified: bool) -> bool:
|
|
"""Update fact verification status."""
|
|
fact = db_session.query(ZOPKKnowledgeFact).filter(
|
|
ZOPKKnowledgeFact.id == fact_id
|
|
).first()
|
|
|
|
if not fact:
|
|
return False
|
|
|
|
fact.is_verified = is_verified
|
|
db_session.commit()
|
|
return True
|
|
|
|
|
|
def update_entity_verification(db_session, entity_id: int, is_verified: bool) -> bool:
|
|
"""Update entity verification status."""
|
|
entity = db_session.query(ZOPKKnowledgeEntity).filter(
|
|
ZOPKKnowledgeEntity.id == entity_id
|
|
).first()
|
|
|
|
if not entity:
|
|
return False
|
|
|
|
entity.is_verified = is_verified
|
|
db_session.commit()
|
|
return True
|
|
|
|
|
|
def delete_chunk(db_session, chunk_id: int) -> bool:
|
|
"""Delete a chunk and its associated facts and mentions."""
|
|
chunk = db_session.query(ZOPKKnowledgeChunk).filter(
|
|
ZOPKKnowledgeChunk.id == chunk_id
|
|
).first()
|
|
|
|
if not chunk:
|
|
return False
|
|
|
|
# Delete associated facts
|
|
db_session.query(ZOPKKnowledgeFact).filter(
|
|
ZOPKKnowledgeFact.source_chunk_id == chunk_id
|
|
).delete()
|
|
|
|
# Delete associated mentions
|
|
db_session.query(ZOPKKnowledgeEntityMention).filter(
|
|
ZOPKKnowledgeEntityMention.chunk_id == chunk_id
|
|
).delete()
|
|
|
|
# Delete chunk
|
|
db_session.delete(chunk)
|
|
db_session.commit()
|
|
return True
|
|
|
|
|
|
# ============================================================
|
|
# DUPLICATE ENTITY DETECTION AND MERGING
|
|
# ============================================================
|
|
|
|
def find_duplicate_entities(
|
|
db_session,
|
|
entity_type: Optional[str] = None,
|
|
min_similarity: float = 0.5,
|
|
limit: int = 100
|
|
) -> List[Dict]:
|
|
"""
|
|
Find potential duplicate entities using fuzzy matching.
|
|
|
|
Uses PostgreSQL pg_trgm extension for similarity matching.
|
|
Returns pairs of entities that might be duplicates.
|
|
|
|
Args:
|
|
db_session: SQLAlchemy session
|
|
entity_type: Filter by entity type (company, person, etc.)
|
|
min_similarity: Minimum similarity threshold (0.0-1.0)
|
|
limit: Maximum number of pairs to return
|
|
|
|
Returns:
|
|
List of dicts with duplicate pairs:
|
|
[
|
|
{
|
|
'entity1': {...},
|
|
'entity2': {...},
|
|
'similarity': 0.85,
|
|
'match_type': 'fuzzy' # or 'substring'
|
|
}
|
|
]
|
|
"""
|
|
from sqlalchemy import text
|
|
|
|
# Build query with pg_trgm similarity
|
|
# Use conditional SQL with COALESCE to avoid f-string interpolation
|
|
query = text("""
|
|
SELECT
|
|
e1.id as id1, e1.name as name1, e1.entity_type as type1,
|
|
e1.mentions_count as mentions1, e1.is_verified as verified1,
|
|
e2.id as id2, e2.name as name2, e2.entity_type as type2,
|
|
e2.mentions_count as mentions2, e2.is_verified as verified2,
|
|
similarity(LOWER(e1.name), LOWER(e2.name)) as sim,
|
|
CASE
|
|
WHEN LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%'
|
|
OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%'
|
|
THEN 'substring'
|
|
ELSE 'fuzzy'
|
|
END as match_type
|
|
FROM zopk_knowledge_entities e1
|
|
JOIN zopk_knowledge_entities e2
|
|
ON e1.id < e2.id
|
|
AND e1.entity_type = e2.entity_type
|
|
WHERE (
|
|
similarity(LOWER(e1.name), LOWER(e2.name)) > :min_sim
|
|
OR LOWER(e1.name) LIKE '%' || LOWER(e2.name) || '%'
|
|
OR LOWER(e2.name) LIKE '%' || LOWER(e1.name) || '%'
|
|
)
|
|
AND (:entity_type IS NULL OR e1.entity_type = :entity_type)
|
|
ORDER BY
|
|
sim DESC,
|
|
e1.entity_type,
|
|
GREATEST(e1.mentions_count, e2.mentions_count) DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
params = {'min_sim': min_similarity, 'limit': limit, 'entity_type': entity_type}
|
|
result = db_session.execute(query, params)
|
|
|
|
duplicates = []
|
|
for row in result:
|
|
duplicates.append({
|
|
'entity1': {
|
|
'id': row.id1,
|
|
'name': row.name1,
|
|
'entity_type': row.type1,
|
|
'mentions_count': row.mentions1,
|
|
'is_verified': row.verified1
|
|
},
|
|
'entity2': {
|
|
'id': row.id2,
|
|
'name': row.name2,
|
|
'entity_type': row.type2,
|
|
'mentions_count': row.mentions2,
|
|
'is_verified': row.verified2
|
|
},
|
|
'similarity': float(row.sim) if row.sim else 0.0,
|
|
'match_type': row.match_type
|
|
})
|
|
|
|
return duplicates
|
|
|
|
|
|
def merge_entities(
|
|
db_session,
|
|
primary_id: int,
|
|
duplicate_id: int,
|
|
new_name: Optional[str] = None
|
|
) -> Dict:
|
|
"""
|
|
Merge two entities - keep primary, delete duplicate.
|
|
|
|
Transfers all relationships from duplicate to primary:
|
|
- Entity mentions
|
|
- Facts (subject/object references)
|
|
- Relations (source/target)
|
|
- Updates mentions_count
|
|
|
|
Args:
|
|
db_session: SQLAlchemy session
|
|
primary_id: ID of entity to keep
|
|
duplicate_id: ID of entity to merge and delete
|
|
new_name: Optional new canonical name for primary
|
|
|
|
Returns:
|
|
Dict with merge results:
|
|
{
|
|
'success': True,
|
|
'primary_id': 123,
|
|
'deleted_id': 456,
|
|
'transfers': {
|
|
'mentions': 15,
|
|
'facts_subject': 3,
|
|
'facts_object': 2,
|
|
'relations_source': 1,
|
|
'relations_target': 0
|
|
}
|
|
}
|
|
"""
|
|
from sqlalchemy import text
|
|
|
|
# Get both entities
|
|
primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id)
|
|
duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id)
|
|
|
|
if not primary:
|
|
return {'success': False, 'error': f'Primary entity {primary_id} not found'}
|
|
if not duplicate:
|
|
return {'success': False, 'error': f'Duplicate entity {duplicate_id} not found'}
|
|
if primary.entity_type != duplicate.entity_type:
|
|
return {'success': False, 'error': 'Cannot merge entities of different types'}
|
|
|
|
transfers = {
|
|
'mentions': 0,
|
|
'facts': 0,
|
|
'relations_source': 0,
|
|
'relations_target': 0
|
|
}
|
|
|
|
try:
|
|
# 1. Transfer mentions
|
|
result = db_session.execute(text("""
|
|
UPDATE zopk_knowledge_entity_mentions
|
|
SET entity_id = :primary_id
|
|
WHERE entity_id = :duplicate_id
|
|
"""), {'primary_id': primary_id, 'duplicate_id': duplicate_id})
|
|
transfers['mentions'] = result.rowcount
|
|
|
|
# 2. Transfer facts - update entities_involved JSONB
|
|
# Replace duplicate entity ID with primary ID in the JSONB array
|
|
result = db_session.execute(text("""
|
|
UPDATE zopk_knowledge_facts
|
|
SET entities_involved = (
|
|
SELECT jsonb_agg(
|
|
CASE
|
|
WHEN (elem->>'id')::int = :duplicate_id
|
|
THEN jsonb_set(elem, '{id}', to_jsonb(:primary_id))
|
|
ELSE elem
|
|
END
|
|
)
|
|
FROM jsonb_array_elements(entities_involved::jsonb) AS elem
|
|
)
|
|
WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb)
|
|
"""), {
|
|
'primary_id': primary_id,
|
|
'duplicate_id': duplicate_id,
|
|
'entity_json': f'[{{"id": {duplicate_id}}}]'
|
|
})
|
|
transfers['facts'] = result.rowcount
|
|
|
|
# 4. Transfer relations (entity_a)
|
|
result = db_session.execute(text("""
|
|
UPDATE zopk_knowledge_relations
|
|
SET entity_a_id = :primary_id
|
|
WHERE entity_a_id = :duplicate_id
|
|
"""), {'primary_id': primary_id, 'duplicate_id': duplicate_id})
|
|
transfers['relations_source'] = result.rowcount
|
|
|
|
# 5. Transfer relations (entity_b)
|
|
result = db_session.execute(text("""
|
|
UPDATE zopk_knowledge_relations
|
|
SET entity_b_id = :primary_id
|
|
WHERE entity_b_id = :duplicate_id
|
|
"""), {'primary_id': primary_id, 'duplicate_id': duplicate_id})
|
|
transfers['relations_target'] = result.rowcount
|
|
|
|
# 6. Update primary entity
|
|
primary.mentions_count += duplicate.mentions_count
|
|
if new_name:
|
|
primary.canonical_name = new_name
|
|
|
|
# Merge aliases
|
|
if duplicate.aliases:
|
|
existing_aliases = primary.aliases or []
|
|
new_aliases = duplicate.aliases
|
|
# Add duplicate name as alias
|
|
if duplicate.name not in existing_aliases:
|
|
existing_aliases.append(duplicate.name)
|
|
# Add duplicate's aliases
|
|
for alias in new_aliases:
|
|
if alias not in existing_aliases:
|
|
existing_aliases.append(alias)
|
|
primary.aliases = existing_aliases
|
|
|
|
# 7. Delete duplicate
|
|
db_session.delete(duplicate)
|
|
|
|
db_session.commit()
|
|
|
|
return {
|
|
'success': True,
|
|
'primary_id': primary_id,
|
|
'deleted_id': duplicate_id,
|
|
'new_mentions_count': primary.mentions_count,
|
|
'transfers': transfers
|
|
}
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
logger.error(f"Error merging entities: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
def get_entity_merge_preview(
|
|
db_session,
|
|
primary_id: int,
|
|
duplicate_id: int
|
|
) -> Dict:
|
|
"""
|
|
Preview what would happen if two entities are merged.
|
|
|
|
Returns counts of items that would be transferred.
|
|
"""
|
|
from sqlalchemy import text, func
|
|
|
|
primary = db_session.query(ZOPKKnowledgeEntity).get(primary_id)
|
|
duplicate = db_session.query(ZOPKKnowledgeEntity).get(duplicate_id)
|
|
|
|
if not primary or not duplicate:
|
|
return {'error': 'Entity not found'}
|
|
|
|
# Count items that would be transferred
|
|
mentions = db_session.query(func.count(ZOPKKnowledgeEntityMention.id)).filter(
|
|
ZOPKKnowledgeEntityMention.entity_id == duplicate_id
|
|
).scalar() or 0
|
|
|
|
# Facts use entities_involved (JSONB) not FK columns, so count via JSONB query
|
|
# Count facts where duplicate entity is in entities_involved array
|
|
facts_with_entity = db_session.execute(text("""
|
|
SELECT COUNT(*) FROM zopk_knowledge_facts
|
|
WHERE entities_involved::jsonb @> CAST(:entity_json AS jsonb)
|
|
"""), {'entity_json': f'[{{"id": {duplicate_id}}}]'}).scalar() or 0
|
|
|
|
relations_source = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter(
|
|
ZOPKKnowledgeRelation.entity_a_id == duplicate_id
|
|
).scalar() or 0
|
|
|
|
relations_target = db_session.query(func.count(ZOPKKnowledgeRelation.id)).filter(
|
|
ZOPKKnowledgeRelation.entity_b_id == duplicate_id
|
|
).scalar() or 0
|
|
|
|
return {
|
|
'primary': {
|
|
'id': primary.id,
|
|
'name': primary.name,
|
|
'entity_type': primary.entity_type,
|
|
'mentions_count': primary.mentions_count,
|
|
'aliases': primary.aliases or []
|
|
},
|
|
'duplicate': {
|
|
'id': duplicate.id,
|
|
'name': duplicate.name,
|
|
'entity_type': duplicate.entity_type,
|
|
'mentions_count': duplicate.mentions_count,
|
|
'aliases': duplicate.aliases or []
|
|
},
|
|
'transfers': {
|
|
'mentions': mentions,
|
|
'facts': facts_with_entity,
|
|
'relations_source': relations_source,
|
|
'relations_target': relations_target,
|
|
'total': mentions + facts_with_entity + relations_source + relations_target
|
|
},
|
|
'result': {
|
|
'new_mentions_count': primary.mentions_count + duplicate.mentions_count
|
|
}
|
|
}
|
|
|
|
# ============================================================
|
|
# FACT DEDUPLICATION
|
|
# ============================================================
|
|
|
|
def find_duplicate_facts(
|
|
db_session,
|
|
min_similarity: float = 0.7,
|
|
limit: int = 100,
|
|
fact_type: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""Find potential duplicate facts using text similarity.
|
|
|
|
Uses pg_trgm % operator with GiST index for fast similarity search.
|
|
"""
|
|
from sqlalchemy import text
|
|
|
|
# Set similarity threshold and use % operator (uses GiST index)
|
|
db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"), {'threshold': min_similarity})
|
|
|
|
query = text("""
|
|
SELECT
|
|
f1.id as id1, f1.full_text as text1, f1.fact_type as type1,
|
|
f1.is_verified as verified1, f1.confidence_score as score1,
|
|
f2.id as id2, f2.full_text as text2, f2.fact_type as type2,
|
|
f2.is_verified as verified2, f2.confidence_score as score2,
|
|
similarity(f1.full_text, f2.full_text) as sim
|
|
FROM zopk_knowledge_facts f1
|
|
JOIN zopk_knowledge_facts f2 ON f1.id < f2.id
|
|
WHERE f1.full_text % f2.full_text
|
|
AND (:fact_type IS NULL OR f1.fact_type = :fact_type)
|
|
ORDER BY sim DESC, COALESCE(GREATEST(f1.confidence_score, f2.confidence_score), 0) DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
params = {'limit': limit, 'fact_type': fact_type}
|
|
result = db_session.execute(query, params)
|
|
|
|
duplicates = []
|
|
for row in result:
|
|
duplicates.append({
|
|
'fact1': {
|
|
'id': row.id1, 'text': row.text1, 'fact_type': row.type1,
|
|
'is_verified': row.verified1,
|
|
'confidence_score': float(row.score1) if row.score1 else 0
|
|
},
|
|
'fact2': {
|
|
'id': row.id2, 'text': row.text2, 'fact_type': row.type2,
|
|
'is_verified': row.verified2,
|
|
'confidence_score': float(row.score2) if row.score2 else 0
|
|
},
|
|
'similarity': float(row.sim)
|
|
})
|
|
return duplicates
|
|
|
|
|
|
def merge_facts(db_session, primary_id: int, duplicate_id: int, new_text: Optional[str] = None) -> Dict:
|
|
"""Merge duplicate fact into primary."""
|
|
primary = db_session.query(ZOPKKnowledgeFact).get(primary_id)
|
|
duplicate = db_session.query(ZOPKKnowledgeFact).get(duplicate_id)
|
|
|
|
if not primary:
|
|
return {'success': False, 'error': f'Primary fact {primary_id} not found'}
|
|
if not duplicate:
|
|
return {'success': False, 'error': f'Duplicate fact {duplicate_id} not found'}
|
|
|
|
try:
|
|
if new_text:
|
|
primary.full_text = new_text
|
|
if duplicate.importance_score and (not primary.importance_score or duplicate.importance_score > primary.importance_score):
|
|
primary.importance_score = duplicate.importance_score
|
|
if duplicate.confidence_score and (not primary.confidence_score or duplicate.confidence_score > primary.confidence_score):
|
|
primary.confidence_score = duplicate.confidence_score
|
|
if duplicate.is_verified:
|
|
primary.is_verified = True
|
|
|
|
db_session.delete(duplicate)
|
|
db_session.commit()
|
|
return {'success': True, 'primary_id': primary_id, 'deleted_id': duplicate_id}
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
# ============================================================
|
|
# AUTO-VERIFICATION
|
|
# ============================================================
|
|
|
|
def auto_verify_top_entities(db_session, min_mentions: int = 5, limit: int = 100) -> Dict:
|
|
"""Auto-verify entities with high mention counts."""
|
|
entities = db_session.query(ZOPKKnowledgeEntity).filter(
|
|
ZOPKKnowledgeEntity.is_verified == False,
|
|
ZOPKKnowledgeEntity.mentions_count >= min_mentions
|
|
).order_by(ZOPKKnowledgeEntity.mentions_count.desc()).limit(limit).all()
|
|
|
|
for entity in entities:
|
|
entity.is_verified = True
|
|
db_session.commit()
|
|
|
|
return {'success': True, 'verified_count': len(entities), 'min_mentions': min_mentions}
|
|
|
|
|
|
def auto_verify_top_facts(db_session, min_importance: float = 0.7, limit: int = 200) -> Dict:
|
|
"""Auto-verify facts with high confidence scores (≥70% by default)."""
|
|
# Note: Table uses confidence_score, not importance_score
|
|
facts = db_session.query(ZOPKKnowledgeFact).filter(
|
|
ZOPKKnowledgeFact.is_verified == False,
|
|
ZOPKKnowledgeFact.confidence_score >= min_importance
|
|
).order_by(ZOPKKnowledgeFact.confidence_score.desc()).limit(limit).all()
|
|
|
|
for fact in facts:
|
|
fact.is_verified = True
|
|
db_session.commit()
|
|
|
|
return {'success': True, 'verified_count': len(facts), 'min_confidence': min_importance}
|
|
|
|
|
|
def find_similar_to_verified_facts(
|
|
db_session,
|
|
min_similarity: float = 0.8,
|
|
limit: int = 100
|
|
) -> List[Dict]:
|
|
"""
|
|
Find unverified facts that are similar to already verified facts.
|
|
Uses pg_trgm similarity search to "learn" from verified examples.
|
|
|
|
Returns list of suggestions with similarity scores.
|
|
"""
|
|
from sqlalchemy import text, func
|
|
|
|
# Check if we have any verified facts to learn from
|
|
verified_count = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(
|
|
ZOPKKnowledgeFact.is_verified == True
|
|
).scalar() or 0
|
|
|
|
if verified_count == 0:
|
|
return []
|
|
|
|
# Set similarity threshold
|
|
db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"),
|
|
{'threshold': min_similarity})
|
|
|
|
# Find unverified facts similar to verified ones
|
|
query = text("""
|
|
SELECT DISTINCT ON (unverified.id)
|
|
unverified.id as fact_id,
|
|
unverified.full_text as fact_text,
|
|
unverified.fact_type,
|
|
unverified.confidence_score,
|
|
verified.id as similar_to_id,
|
|
verified.full_text as similar_to_text,
|
|
similarity(unverified.full_text, verified.full_text) as sim_score
|
|
FROM zopk_knowledge_facts unverified
|
|
JOIN zopk_knowledge_facts verified
|
|
ON verified.is_verified = TRUE
|
|
AND unverified.full_text % verified.full_text
|
|
WHERE unverified.is_verified = FALSE
|
|
ORDER BY unverified.id, sim_score DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
result = db_session.execute(query, {'limit': limit})
|
|
|
|
suggestions = []
|
|
for row in result:
|
|
suggestions.append({
|
|
'fact_id': row.fact_id,
|
|
'fact_text': row.fact_text,
|
|
'fact_type': row.fact_type,
|
|
'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5,
|
|
'similar_to_id': row.similar_to_id,
|
|
'similar_to_text': row.similar_to_text,
|
|
'similarity': float(row.sim_score)
|
|
})
|
|
|
|
return suggestions
|
|
|
|
|
|
def auto_verify_similar_to_verified(
|
|
db_session,
|
|
min_similarity: float = 0.8,
|
|
limit: int = 100
|
|
) -> Dict:
|
|
"""
|
|
Auto-verify facts that are similar to already verified facts.
|
|
This enables "learning" from manual verifications.
|
|
|
|
Args:
|
|
min_similarity: Minimum similarity threshold (0.8 = 80% similar)
|
|
limit: Maximum number of facts to verify at once
|
|
|
|
Returns:
|
|
Dict with success status and count of verified facts
|
|
"""
|
|
from sqlalchemy import text, func
|
|
|
|
# Check if we have any verified facts to learn from
|
|
verified_count = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(
|
|
ZOPKKnowledgeFact.is_verified == True
|
|
).scalar() or 0
|
|
|
|
if verified_count == 0:
|
|
return {
|
|
'success': False,
|
|
'error': 'Brak zweryfikowanych faktów do nauki. Najpierw zweryfikuj kilka faktów ręcznie.',
|
|
'verified_count': 0
|
|
}
|
|
|
|
# Set similarity threshold
|
|
db_session.execute(text("SET pg_trgm.similarity_threshold = :threshold"),
|
|
{'threshold': min_similarity})
|
|
|
|
# First, find matching facts with their details and similarity scores
|
|
find_query = text("""
|
|
SELECT DISTINCT ON (unverified.id)
|
|
unverified.id as fact_id,
|
|
unverified.full_text as fact_text,
|
|
unverified.fact_type,
|
|
unverified.confidence_score,
|
|
verified.id as pattern_id,
|
|
verified.full_text as pattern_text,
|
|
similarity(unverified.full_text, verified.full_text) as sim_score
|
|
FROM zopk_knowledge_facts unverified
|
|
JOIN zopk_knowledge_facts verified
|
|
ON verified.is_verified = TRUE
|
|
AND unverified.full_text % verified.full_text
|
|
WHERE unverified.is_verified = FALSE
|
|
ORDER BY unverified.id, sim_score DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
result = db_session.execute(find_query, {'limit': limit})
|
|
facts_to_verify = []
|
|
for row in result:
|
|
facts_to_verify.append({
|
|
'fact_id': row.fact_id,
|
|
'fact_text': row.fact_text,
|
|
'fact_type': row.fact_type,
|
|
'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5,
|
|
'pattern_id': row.pattern_id,
|
|
'pattern_text': row.pattern_text,
|
|
'similarity': round(float(row.sim_score) * 100)
|
|
})
|
|
|
|
if not facts_to_verify:
|
|
return {
|
|
'success': True,
|
|
'verified_count': 0,
|
|
'verified_facts': [],
|
|
'min_similarity': min_similarity,
|
|
'learned_from': verified_count
|
|
}
|
|
|
|
# Now verify the found facts
|
|
fact_ids = [f['fact_id'] for f in facts_to_verify]
|
|
update_query = text("""
|
|
UPDATE zopk_knowledge_facts
|
|
SET is_verified = TRUE
|
|
WHERE id = ANY(:ids)
|
|
""")
|
|
db_session.execute(update_query, {'ids': fact_ids})
|
|
db_session.commit()
|
|
|
|
return {
|
|
'success': True,
|
|
'verified_count': len(facts_to_verify),
|
|
'verified_facts': facts_to_verify,
|
|
'min_similarity': min_similarity,
|
|
'learned_from': verified_count
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# DASHBOARD STATS
|
|
# ============================================================
|
|
|
|
def get_knowledge_dashboard_stats(db_session) -> Dict:
|
|
"""Get comprehensive stats for knowledge dashboard."""
|
|
from sqlalchemy import func, text
|
|
|
|
chunks_total = db_session.query(func.count(ZOPKKnowledgeChunk.id)).scalar() or 0
|
|
chunks_verified = db_session.query(func.count(ZOPKKnowledgeChunk.id)).filter(ZOPKKnowledgeChunk.is_verified == True).scalar() or 0
|
|
chunks_with_embedding = db_session.query(func.count(ZOPKKnowledgeChunk.id)).filter(ZOPKKnowledgeChunk.embedding.isnot(None)).scalar() or 0
|
|
|
|
entities_total = db_session.query(func.count(ZOPKKnowledgeEntity.id)).scalar() or 0
|
|
entities_verified = db_session.query(func.count(ZOPKKnowledgeEntity.id)).filter(ZOPKKnowledgeEntity.is_verified == True).scalar() or 0
|
|
|
|
facts_total = db_session.query(func.count(ZOPKKnowledgeFact.id)).scalar() or 0
|
|
facts_verified = db_session.query(func.count(ZOPKKnowledgeFact.id)).filter(ZOPKKnowledgeFact.is_verified == True).scalar() or 0
|
|
|
|
news_total = db_session.execute(text("SELECT COUNT(*) FROM zopk_news WHERE status IN ('approved', 'auto_approved')")).scalar() or 0
|
|
news_with_extraction = db_session.execute(text('''
|
|
SELECT COUNT(DISTINCT n.id) FROM zopk_news n
|
|
JOIN zopk_knowledge_chunks c ON c.source_news_id = n.id
|
|
WHERE n.status IN ('approved', 'auto_approved')
|
|
''')).scalar() or 0
|
|
|
|
entity_types = db_session.execute(text('SELECT entity_type, COUNT(*) FROM zopk_knowledge_entities GROUP BY entity_type ORDER BY 2 DESC')).fetchall()
|
|
fact_types = db_session.execute(text('SELECT fact_type, COUNT(*) FROM zopk_knowledge_facts GROUP BY fact_type ORDER BY 2 DESC')).fetchall()
|
|
|
|
top_entities = db_session.query(ZOPKKnowledgeEntity).order_by(ZOPKKnowledgeEntity.mentions_count.desc()).limit(10).all()
|
|
|
|
return {
|
|
'chunks': {'total': chunks_total, 'verified': chunks_verified, 'with_embedding': chunks_with_embedding,
|
|
'verified_pct': round(100 * chunks_verified / chunks_total, 1) if chunks_total else 0},
|
|
'entities': {'total': entities_total, 'verified': entities_verified,
|
|
'verified_pct': round(100 * entities_verified / entities_total, 1) if entities_total else 0,
|
|
'by_type': [{'type': r[0], 'count': r[1]} for r in entity_types]},
|
|
'facts': {'total': facts_total, 'verified': facts_verified,
|
|
'verified_pct': round(100 * facts_verified / facts_total, 1) if facts_total else 0,
|
|
'by_type': [{'type': r[0] or 'unknown', 'count': r[1]} for r in fact_types]},
|
|
'news': {'total': news_total, 'with_extraction': news_with_extraction, 'pending': news_total - news_with_extraction},
|
|
'top_entities': [{'id': e.id, 'name': e.name, 'type': e.entity_type, 'mentions': e.mentions_count} for e in top_entities]
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# TIMELINE SUGGESTIONS (Auto-populate from Knowledge Base)
|
|
# ============================================================
|
|
|
|
def get_timeline_suggestions(
|
|
db_session,
|
|
limit: int = 50,
|
|
only_verified: bool = True
|
|
) -> Dict:
|
|
"""
|
|
Get milestone facts from knowledge base that could become timeline milestones.
|
|
|
|
Finds verified milestone facts that are NOT yet linked to any timeline milestone.
|
|
Groups similar facts and ranks by confidence score.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
limit: Max suggestions to return
|
|
only_verified: Only include verified facts
|
|
|
|
Returns:
|
|
{
|
|
'success': True,
|
|
'suggestions': [...],
|
|
'total_milestone_facts': int,
|
|
'already_in_timeline': int
|
|
}
|
|
"""
|
|
from sqlalchemy import text, func
|
|
from database import ZOPKMilestone
|
|
|
|
try:
|
|
# Count total milestone facts
|
|
total_query = text("""
|
|
SELECT COUNT(*) FROM zopk_knowledge_facts
|
|
WHERE fact_type = 'milestone'
|
|
""")
|
|
total_milestone_facts = db_session.execute(total_query).scalar() or 0
|
|
|
|
# Count facts already linked to timeline
|
|
linked_query = text("""
|
|
SELECT COUNT(DISTINCT f.id)
|
|
FROM zopk_knowledge_facts f
|
|
JOIN zopk_milestones m ON m.source_news_id = f.source_news_id
|
|
WHERE f.fact_type = 'milestone'
|
|
""")
|
|
already_linked = db_session.execute(linked_query).scalar() or 0
|
|
|
|
# Get milestone facts not yet in timeline
|
|
# Prioritize: verified, high confidence, has numeric value (dates/amounts)
|
|
suggestions_query = text("""
|
|
SELECT DISTINCT ON (f.id)
|
|
f.id as fact_id,
|
|
f.full_text,
|
|
f.subject,
|
|
f.predicate,
|
|
f.object,
|
|
f.confidence_score,
|
|
f.numeric_value,
|
|
f.numeric_unit,
|
|
f.is_verified,
|
|
f.source_news_id,
|
|
n.title as news_title,
|
|
n.published_at as news_date,
|
|
n.url as news_url,
|
|
n.source_name
|
|
FROM zopk_knowledge_facts f
|
|
LEFT JOIN zopk_news n ON n.id = f.source_news_id
|
|
WHERE f.fact_type = 'milestone'
|
|
AND (:only_verified = FALSE OR f.is_verified = TRUE)
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM zopk_milestones m
|
|
WHERE m.source_news_id = f.source_news_id
|
|
AND similarity(m.title, f.full_text) > 0.5
|
|
)
|
|
ORDER BY f.id, f.confidence_score DESC NULLS LAST, f.is_verified DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
params = {'limit': limit, 'only_verified': bool(only_verified)}
|
|
results = db_session.execute(suggestions_query, params).fetchall()
|
|
|
|
suggestions = []
|
|
for row in results:
|
|
# Auto-detect category based on keywords
|
|
category = _detect_milestone_category(row.full_text, row.subject)
|
|
|
|
# Try to extract date from text or use news date
|
|
target_date = _extract_date_from_text(row.full_text)
|
|
if not target_date and row.news_date:
|
|
target_date = row.news_date.strftime('%Y-%m-%d') if hasattr(row.news_date, 'strftime') else str(row.news_date)
|
|
|
|
suggestions.append({
|
|
'fact_id': row.fact_id,
|
|
'full_text': row.full_text,
|
|
'subject': row.subject,
|
|
'predicate': row.predicate,
|
|
'object': row.object,
|
|
'confidence_score': float(row.confidence_score) if row.confidence_score else 0.5,
|
|
'is_verified': row.is_verified,
|
|
'source_news_id': row.source_news_id,
|
|
'news_title': row.news_title,
|
|
'news_date': row.news_date.isoformat() if row.news_date else None,
|
|
'news_url': row.news_url,
|
|
'source_name': row.source_name,
|
|
# Auto-suggested values for timeline
|
|
'suggested_title': _generate_milestone_title(row.full_text, row.subject),
|
|
'suggested_category': category,
|
|
'suggested_date': target_date,
|
|
'suggested_status': 'completed' if _is_past_event(row.full_text) else 'planned'
|
|
})
|
|
|
|
return {
|
|
'success': True,
|
|
'suggestions': suggestions,
|
|
'total_milestone_facts': total_milestone_facts,
|
|
'already_in_timeline': already_linked,
|
|
'suggestions_count': len(suggestions)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting timeline suggestions: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'suggestions': []
|
|
}
|
|
|
|
|
|
def _detect_milestone_category(text: str, subject: str = None) -> str:
|
|
"""
|
|
Auto-detect milestone category based on keywords.
|
|
|
|
Categories: nuclear, offshore, infrastructure, defense, other
|
|
"""
|
|
text_lower = (text or '').lower()
|
|
subject_lower = (subject or '').lower()
|
|
combined = f"{text_lower} {subject_lower}"
|
|
|
|
# Nuclear energy keywords
|
|
nuclear_keywords = [
|
|
'jądrowa', 'jądrowy', 'atomowa', 'atomowy', 'nuclear',
|
|
'lubiatowo', 'kopalino', 'pej', 'polskie elektrownie',
|
|
'reaktor', 'uran', 'westinghouse', 'ap1000'
|
|
]
|
|
if any(kw in combined for kw in nuclear_keywords):
|
|
return 'nuclear'
|
|
|
|
# Offshore wind keywords
|
|
offshore_keywords = [
|
|
'offshore', 'wiatrowa', 'wiatrowy', 'morska farma', 'farma wiatrowa',
|
|
'baltic power', 'baltica', 'orlen', 'northland', 'bałtyk',
|
|
'turbina', 'mw wiatr', 'gw wiatr'
|
|
]
|
|
if any(kw in combined for kw in offshore_keywords):
|
|
return 'offshore'
|
|
|
|
# Defense/military keywords
|
|
defense_keywords = [
|
|
'kongsberg', 'obronność', 'obronny', 'wojsko', 'wojskowy',
|
|
'mon ', 'ministerstwo obrony', 'zbrojeniowy', 'dron', 'amunicja',
|
|
'nsm', 'rakieta', 'samolot bojowy', 'okręt', 'bezpieczeństwo',
|
|
'nato', 'sojusz'
|
|
]
|
|
if any(kw in combined for kw in defense_keywords):
|
|
return 'defense'
|
|
|
|
# Infrastructure keywords
|
|
infra_keywords = [
|
|
's6', 's7', 'via pomerania', 'droga', 'autostrada', 'ekspresowa',
|
|
'kolej', 'pkp', 'port', 'terminal', 'lotnisko', 'most',
|
|
'infrastruktura', 'budowa', 'remont', 'przebudowa',
|
|
'wodociąg', 'kanalizacja', 'oczyszczalnia'
|
|
]
|
|
if any(kw in combined for kw in infra_keywords):
|
|
return 'infrastructure'
|
|
|
|
return 'other'
|
|
|
|
|
|
def _generate_milestone_title(full_text: str, subject: str = None) -> str:
|
|
"""
|
|
Generate a concise title for milestone from fact text.
|
|
Truncates to ~100 chars and tries to keep meaningful content.
|
|
"""
|
|
if not full_text:
|
|
return subject or "Kamień milowy"
|
|
|
|
# If text is short enough, use as is
|
|
if len(full_text) <= 100:
|
|
return full_text
|
|
|
|
# Try to find a natural break point
|
|
text = full_text[:150]
|
|
|
|
# Look for sentence end
|
|
for sep in ['. ', ', ', ' - ', ': ']:
|
|
if sep in text:
|
|
parts = text.split(sep)
|
|
if len(parts[0]) >= 30:
|
|
return parts[0] + ('.' if not parts[0].endswith('.') else '')
|
|
|
|
# Just truncate with ellipsis
|
|
return text[:97] + '...'
|
|
|
|
|
|
def _extract_date_from_text(text: str) -> str:
|
|
"""
|
|
Try to extract date from milestone text.
|
|
Returns ISO format date string or None.
|
|
"""
|
|
import re
|
|
from datetime import datetime
|
|
|
|
if not text:
|
|
return None
|
|
|
|
text_lower = text.lower()
|
|
|
|
# Patterns to match
|
|
patterns = [
|
|
# "w 2025 roku", "2025 r.", "rok 2025"
|
|
(r'\b(20[2-3]\d)\s*(rok|r\.?)\b', lambda m: f"{m.group(1)}-01-01"),
|
|
(r'\brok\s*(20[2-3]\d)\b', lambda m: f"{m.group(1)}-01-01"),
|
|
|
|
# "w marcu 2025", "marzec 2025"
|
|
(r'\b(stycz\w*|luty|lut\w*|marz\w*|kwie\w*|maj\w*|czerw\w*|lip\w*|sierp\w*|wrze\w*|paźdz\w*|listop\w*|grud\w*)\s*(20[2-3]\d)',
|
|
lambda m: _month_to_date(m.group(1), m.group(2))),
|
|
|
|
# "Q1 2025", "Q3 2026"
|
|
(r'\bQ([1-4])\s*(20[2-3]\d)', lambda m: f"{m.group(2)}-{int(m.group(1))*3-2:02d}-01"),
|
|
|
|
# "I kwartał 2025"
|
|
(r'\b(I|II|III|IV)\s*kwarta\w*\s*(20[2-3]\d)',
|
|
lambda m: _quarter_to_date(m.group(1), m.group(2))),
|
|
]
|
|
|
|
for pattern, formatter in patterns:
|
|
match = re.search(pattern, text_lower)
|
|
if match:
|
|
try:
|
|
return formatter(match)
|
|
except:
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
def _month_to_date(month_name: str, year: str) -> str:
|
|
"""Convert Polish month name to date string."""
|
|
months = {
|
|
'stycz': '01', 'luty': '02', 'lut': '02', 'marz': '03',
|
|
'kwie': '04', 'maj': '05', 'czerw': '06', 'lip': '07',
|
|
'sierp': '08', 'wrze': '09', 'paźdz': '10', 'listop': '11',
|
|
'grud': '12'
|
|
}
|
|
for prefix, num in months.items():
|
|
if month_name.startswith(prefix):
|
|
return f"{year}-{num}-01"
|
|
return f"{year}-01-01"
|
|
|
|
|
|
def _quarter_to_date(quarter: str, year: str) -> str:
|
|
"""Convert Roman numeral quarter to date string."""
|
|
quarters = {'I': '01', 'II': '04', 'III': '07', 'IV': '10'}
|
|
month = quarters.get(quarter, '01')
|
|
return f"{year}-{month}-01"
|
|
|
|
|
|
def _is_past_event(text: str) -> bool:
|
|
"""
|
|
Detect if milestone text describes a past event (completed)
|
|
or future event (planned).
|
|
"""
|
|
if not text:
|
|
return False
|
|
|
|
text_lower = text.lower()
|
|
|
|
# Past tense indicators (Polish)
|
|
past_indicators = [
|
|
'podpisano', 'podpisał', 'zakończono', 'oddano', 'otwarto',
|
|
'uruchomiono', 'rozpoczęto', 'ogłoszono', 'przyznano',
|
|
'uzyskano', 'otrzymał', 'zdobył', 'wygrał', 'został',
|
|
'odbył się', 'odbyła się', 'miało miejsce'
|
|
]
|
|
|
|
# Future tense indicators
|
|
future_indicators = [
|
|
'planowany', 'planowane', 'planowana', 'ma zostać',
|
|
'będzie', 'zostanie', 'ma być', 'powstanie',
|
|
'w przyszłości', 'do końca', 'w ciągu'
|
|
]
|
|
|
|
past_count = sum(1 for ind in past_indicators if ind in text_lower)
|
|
future_count = sum(1 for ind in future_indicators if ind in text_lower)
|
|
|
|
return past_count > future_count
|
|
|
|
|
|
def create_milestone_from_suggestion(
|
|
db_session,
|
|
fact_id: int,
|
|
title: str,
|
|
description: str = None,
|
|
category: str = 'other',
|
|
target_date: str = None,
|
|
status: str = 'planned',
|
|
source_url: str = None
|
|
) -> Dict:
|
|
"""
|
|
Create a timeline milestone from a knowledge fact suggestion.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
fact_id: Source fact ID
|
|
title: Milestone title
|
|
description: Optional description
|
|
category: nuclear, offshore, infrastructure, defense, other
|
|
target_date: Target date (YYYY-MM-DD format)
|
|
status: planned, in_progress, completed, delayed
|
|
source_url: Source article URL
|
|
|
|
Returns:
|
|
{'success': True, 'milestone_id': int} or {'success': False, 'error': str}
|
|
"""
|
|
from database import ZOPKMilestone, ZOPKKnowledgeFact
|
|
from datetime import datetime
|
|
|
|
try:
|
|
# Get the source fact
|
|
fact = db_session.query(ZOPKKnowledgeFact).get(fact_id)
|
|
if not fact:
|
|
return {'success': False, 'error': f'Fact {fact_id} not found'}
|
|
|
|
# Parse target date
|
|
parsed_date = None
|
|
if target_date:
|
|
try:
|
|
parsed_date = datetime.strptime(target_date, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
pass
|
|
|
|
# Create milestone
|
|
milestone = ZOPKMilestone(
|
|
title=title,
|
|
description=description or fact.full_text,
|
|
category=category,
|
|
target_date=parsed_date,
|
|
actual_date=parsed_date if status == 'completed' else None,
|
|
status=status,
|
|
source_url=source_url,
|
|
source_news_id=fact.source_news_id,
|
|
is_featured=False
|
|
)
|
|
|
|
db_session.add(milestone)
|
|
db_session.commit()
|
|
|
|
logger.info(f"Created milestone #{milestone.id} from fact #{fact_id}: {title}")
|
|
|
|
return {
|
|
'success': True,
|
|
'milestone_id': milestone.id,
|
|
'title': title,
|
|
'category': category
|
|
}
|
|
|
|
except Exception as e:
|
|
db_session.rollback()
|
|
logger.error(f"Error creating milestone from fact {fact_id}: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
def categorize_milestones_with_ai(
|
|
db_session,
|
|
suggestions: List[Dict],
|
|
model_name: str = "gemini-3-flash-preview"
|
|
) -> List[Dict]:
|
|
"""
|
|
Use Gemini AI to categorize and enhance milestone suggestions.
|
|
|
|
Adds AI-improved titles, categories, and extracts dates more accurately.
|
|
"""
|
|
from gemini_service import GeminiService
|
|
import json
|
|
|
|
if not suggestions:
|
|
return suggestions
|
|
|
|
# Prepare batch for AI processing
|
|
facts_text = "\n".join([
|
|
f"{i+1}. {s['full_text'][:300]}"
|
|
for i, s in enumerate(suggestions[:20]) # Limit to 20 for API
|
|
])
|
|
|
|
prompt = f"""Przeanalizuj poniższe fakty o projekcie ZOPK (Zielony Okręg Przemysłowy Kaszubia) i dla każdego zwróć:
|
|
- category: jedna z [nuclear, offshore, infrastructure, defense, other]
|
|
- short_title: zwięzły tytuł (max 80 znaków)
|
|
- target_date: data w formacie YYYY-MM-DD (jeśli można wywnioskować)
|
|
- status: jeden z [completed, in_progress, planned]
|
|
|
|
Kategorie:
|
|
- nuclear: elektrownia jądrowa, atom, Lubiatowo-Kopalino
|
|
- offshore: farmy wiatrowe, offshore wind, Baltic Power, Baltica
|
|
- infrastructure: drogi S6, Via Pomerania, porty, koleje
|
|
- defense: Kongsberg, przemysł zbrojeniowy, obronność, MON
|
|
|
|
Fakty:
|
|
{facts_text}
|
|
|
|
Odpowiedz TYLKO jako JSON array:
|
|
[{{"id": 1, "category": "...", "short_title": "...", "target_date": "YYYY-MM-DD lub null", "status": "..."}}]"""
|
|
|
|
try:
|
|
service = GeminiService(model=model_name)
|
|
response_text = service.generate_text(prompt)
|
|
|
|
# Parse response
|
|
response_text = response_text.strip()
|
|
if response_text.startswith('```'):
|
|
response_text = response_text.split('```')[1]
|
|
if response_text.startswith('json'):
|
|
response_text = response_text[4:]
|
|
|
|
ai_results = json.loads(response_text)
|
|
|
|
# Merge AI results with suggestions
|
|
for result in ai_results:
|
|
idx = result.get('id', 0) - 1
|
|
if 0 <= idx < len(suggestions):
|
|
suggestions[idx]['ai_category'] = result.get('category', suggestions[idx]['suggested_category'])
|
|
suggestions[idx]['ai_title'] = result.get('short_title', suggestions[idx]['suggested_title'])
|
|
suggestions[idx]['ai_date'] = result.get('target_date')
|
|
suggestions[idx]['ai_status'] = result.get('status', suggestions[idx]['suggested_status'])
|
|
|
|
return suggestions
|
|
|
|
except Exception as e:
|
|
logger.warning(f"AI categorization failed: {e}")
|
|
return suggestions # Return original suggestions without AI enhancement
|
|
|
|
|
|
def analyze_roadmap_with_ai(db_session) -> Dict:
|
|
"""
|
|
AI-powered roadmap analysis: new milestones, status updates, and gap detection.
|
|
|
|
Uses Gemini to analyze existing milestones against recent knowledge facts.
|
|
|
|
Returns:
|
|
{
|
|
'success': True,
|
|
'new_milestones': [...],
|
|
'status_updates': [...],
|
|
'gaps': [...]
|
|
}
|
|
"""
|
|
from gemini_service import GeminiService
|
|
import json
|
|
from database import ZOPKMilestone
|
|
|
|
try:
|
|
# 1. Get existing milestones
|
|
milestones = db_session.query(ZOPKMilestone).order_by(ZOPKMilestone.target_date).all()
|
|
milestones_text = "\n".join([
|
|
f"ID:{m.id} | {m.title} | kategoria:{m.category} | status:{m.status} | data:{m.target_date}"
|
|
for m in milestones
|
|
]) or "(brak kamieni milowych)"
|
|
|
|
# 2. Get recent verified facts from knowledge base
|
|
from sqlalchemy import text
|
|
facts_query = text("""
|
|
SELECT f.id, f.full_text, f.fact_type, f.date_value, f.confidence_score,
|
|
f.source_news_id, n.title as news_title, n.url as news_url
|
|
FROM zopk_knowledge_facts f
|
|
LEFT JOIN zopk_news n ON n.id = f.source_news_id
|
|
WHERE f.confidence_score >= 0.5
|
|
ORDER BY f.created_at DESC
|
|
LIMIT 50
|
|
""")
|
|
facts = db_session.execute(facts_query).fetchall()
|
|
|
|
if not facts:
|
|
return {
|
|
'success': True,
|
|
'new_milestones': [],
|
|
'status_updates': [],
|
|
'gaps': [],
|
|
'message': 'Brak faktów w bazie wiedzy do analizy'
|
|
}
|
|
|
|
facts_text = "\n".join([
|
|
f"ID:{f.id} | typ:{f.fact_type} | data:{f.date_value} | {f.full_text[:250]}"
|
|
for f in facts
|
|
])
|
|
|
|
# 3. Send to Gemini
|
|
prompt = f"""Jesteś ekspertem ds. projektu ZOPK (Zielony Okręg Przemysłowy Kaszubia - Pomorze).
|
|
Projekty w regionie: elektrownia jądrowa (Lubiatowo-Kopalino), farmy wiatrowe offshore (Baltic Power, Baltica), infrastruktura (S6, porty), obronność (Kongsberg).
|
|
|
|
ISTNIEJĄCE KAMIENIE MILOWE ROADMAPY:
|
|
{milestones_text}
|
|
|
|
OSTATNIE FAKTY Z BAZY WIEDZY:
|
|
{facts_text}
|
|
|
|
Przeanalizuj i zwróć TYLKO JSON (bez markdown):
|
|
{{
|
|
"new_milestones": [
|
|
{{"fact_id": N, "title": "max 80 znaków", "category": "nuclear|offshore|infrastructure|defense|other", "target_date": "YYYY-MM-DD lub null", "status": "planned|in_progress|completed", "reason": "dlaczego to kamień milowy"}}
|
|
],
|
|
"status_updates": [
|
|
{{"milestone_id": N, "current_status": "...", "suggested_status": "...", "reason": "krótkie uzasadnienie", "supporting_fact_ids": [N]}}
|
|
],
|
|
"gaps": [
|
|
{{"description": "co brakuje w roadmapie", "suggested_title": "max 80 znaków", "category": "nuclear|offshore|infrastructure|defense|other", "reason": "dlaczego to ważne"}}
|
|
]
|
|
}}
|
|
|
|
Zasady:
|
|
- new_milestones: fakty które powinny być kamieniami milowymi, a NIE ma ich jeszcze w roadmapie
|
|
- status_updates: istniejące kamienie milowe, których status powinien się zmienić na podstawie nowych faktów
|
|
- gaps: ważne tematy dla regionu Kaszubia/Pomorze bez kamienia milowego, które warto dodać
|
|
- Jeśli nie ma sugestii w danej kategorii, zwróć pustą listę
|
|
- Tytuły pisz po polsku"""
|
|
|
|
gemini = GeminiService()
|
|
response_text = gemini.generate_text(
|
|
prompt=prompt,
|
|
temperature=0.2,
|
|
feature='zopk_roadmap_analysis'
|
|
)
|
|
|
|
if not response_text:
|
|
return {'success': False, 'error': 'Brak odpowiedzi od AI'}
|
|
|
|
response_text = response_text.strip()
|
|
# Strip markdown code blocks if present
|
|
if response_text.startswith('```'):
|
|
response_text = response_text.split('```')[1]
|
|
if response_text.startswith('json'):
|
|
response_text = response_text[4:]
|
|
if response_text.endswith('```'):
|
|
response_text = response_text[:-3]
|
|
|
|
ai_result = json.loads(response_text.strip())
|
|
|
|
# Enrich new_milestones with source info
|
|
facts_map = {f.id: f for f in facts}
|
|
for nm in ai_result.get('new_milestones', []):
|
|
fact = facts_map.get(nm.get('fact_id'))
|
|
if fact:
|
|
nm['full_text'] = fact.full_text
|
|
nm['news_url'] = fact.news_url
|
|
nm['news_title'] = fact.news_title
|
|
nm['source_news_id'] = fact.source_news_id
|
|
|
|
# Enrich status_updates with milestone info
|
|
milestones_map = {m.id: m for m in milestones}
|
|
for su in ai_result.get('status_updates', []):
|
|
ms = milestones_map.get(su.get('milestone_id'))
|
|
if ms:
|
|
su['milestone_title'] = ms.title
|
|
su['milestone_category'] = ms.category
|
|
|
|
logger.info(
|
|
f"AI roadmap analysis: {len(ai_result.get('new_milestones', []))} new, "
|
|
f"{len(ai_result.get('status_updates', []))} updates, "
|
|
f"{len(ai_result.get('gaps', []))} gaps"
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'new_milestones': ai_result.get('new_milestones', []),
|
|
'status_updates': ai_result.get('status_updates', []),
|
|
'gaps': ai_result.get('gaps', []),
|
|
'total_milestones': len(milestones),
|
|
'total_facts_analyzed': len(facts)
|
|
}
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"AI roadmap analysis JSON parse error: {e}")
|
|
return {'success': False, 'error': f'Błąd parsowania odpowiedzi AI: {e}'}
|
|
except Exception as e:
|
|
logger.error(f"AI roadmap analysis error: {e}")
|
|
return {'success': False, 'error': str(e)}
|