nordabiz/zopk_news_service.py
Maciej Pienczyn e399022223 feat: Add 1-5 star rating to ZOPK news AI evaluation
- Add ai_relevance_score column (1-5) to zopk_news table
- Update AI prompt to return score with detailed criteria:
  * 1 star = very weak (loose connection to region/industry)
  * 2 stars = weak (general industry news)
  * 3 stars = medium (relates to ZOPK industry but not directly)
  * 4 stars = strong (directly about ZOPK investments/companies)
  * 5 stars = perfect (main topic is ZOPK, Kongsberg, offshore Baltic)
- Display star ratings in admin dashboard with color-coded badges
- Score >= 3 marks news as relevant, < 3 as not relevant

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 07:34:36 +01:00

747 lines
26 KiB
Python

"""
ZOPK News Service
================
Multi-source news search and cross-verification for
Zielony Okręg Przemysłowy Kaszubia (ZOPK) knowledge base.
Sources:
- Brave Search API (web news)
- Google News RSS (aggregated news)
- Local media RSS feeds (trojmiasto.pl, dziennikbaltycki.pl)
Cross-verification:
- 1 source → pending (manual moderation required)
- 2 sources → pending with higher confidence
- 3+ sources → auto_approved (verified automatically)
Author: NordaBiz Development Team
Created: 2026-01-11
"""
import os
import re
import hashlib
import logging
import unicodedata
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from urllib.parse import urlparse
import requests
import feedparser
logger = logging.getLogger(__name__)
# ============================================================
# RSS FEED SOURCES
# ============================================================
RSS_SOURCES = {
# Local media
'trojmiasto': {
'url': 'https://www.trojmiasto.pl/rss/wiadomosci.xml',
'name': 'trojmiasto.pl',
'type': 'local_media',
'keywords': ['kaszubia', 'wejherowo', 'rumia', 'gdynia', 'pomorze', 'offshore', 'energia', 'przemysł', 'samsonowicz', 'kongsberg']
},
'dziennik_baltycki': {
'url': 'https://dziennikbaltycki.pl/rss/najnowsze.xml',
'name': 'Dziennik Bałtycki',
'type': 'local_media',
'keywords': ['kaszubia', 'wejherowo', 'rumia', 'gdynia', 'elektrownia', 'offshore', 'samsonowicz', 'kongsberg', 'lubiatowo']
},
# Government sources
'gov_mon': {
'url': 'https://www.gov.pl/web/obrona-narodowa/rss',
'name': 'Ministerstwo Obrony Narodowej',
'type': 'government',
'keywords': ['kongsberg', 'przemysł obronny', 'kaszubia', 'rumia', 'samsonowicz', 'inwestycje']
},
'gov_przemysl': {
'url': 'https://www.gov.pl/web/rozwoj-technologia/rss',
'name': 'Ministerstwo Rozwoju i Technologii',
'type': 'government',
'keywords': ['offshore', 'elektrownia jądrowa', 'centrum danych', 'wodór', 'transformacja']
},
# Google News aggregated searches
'google_news_zopk': {
'url': 'https://news.google.com/rss/search?q=Zielony+Okr%C4%99g+Przemys%C5%82owy+Kaszubia&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': [] # No filtering, query-based
},
'google_news_offshore': {
'url': 'https://news.google.com/rss/search?q=offshore+Polska+Baltyk&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_nuclear': {
'url': 'https://news.google.com/rss/search?q=elektrownia+jadrowa+Polska+Lubiatowo&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_samsonowicz': {
'url': 'https://news.google.com/rss/search?q=Maciej+Samsonowicz+MON&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_kongsberg': {
'url': 'https://news.google.com/rss/search?q=Kongsberg+Polska+Rumia&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
# Business/local organizations (via Google News)
'google_news_norda': {
'url': 'https://news.google.com/rss/search?q=Norda+Biznes+Wejherowo&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_spoko': {
'url': 'https://news.google.com/rss/search?q=Spoko+Gospodarcze+Pomorze&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
# Regional media (via Google News - site-specific searches)
'google_news_norda_fm': {
'url': 'https://news.google.com/rss/search?q=site:nordafm.pl+OR+%22Norda+FM%22&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Norda FM',
'type': 'local_media',
'keywords': []
},
'google_news_ttm': {
'url': 'https://news.google.com/rss/search?q=site:ttm24.pl+OR+%22Twoja+Telewizja+Morska%22&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Twoja Telewizja Morska',
'type': 'local_media',
'keywords': []
},
'google_news_nadmorski24': {
'url': 'https://news.google.com/rss/search?q=site:nadmorski24.pl&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Nadmorski24.pl',
'type': 'local_media',
'keywords': []
},
# Facebook - Maciej Samsonowicz (via Google search - FB doesn't have RSS)
'google_news_samsonowicz_fb': {
'url': 'https://news.google.com/rss/search?q=%22Maciej+Samsonowicz%22+facebook&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News (Facebook Samsonowicz)',
'type': 'aggregator',
'keywords': []
}
}
# ZOPK-related keywords for filtering
ZOPK_KEYWORDS = [
# Project names
'zielony okręg przemysłowy',
'zopk',
'kaszubia przemysłowa',
# Energy projects
'offshore wind polska',
'offshore bałtyk',
'farma wiatrowa bałtyk',
'elektrownia jądrowa lubiatowo',
'elektrownia jądrowa kopalino',
'pej lubiatowo', # Polskie Elektrownie Jądrowe
# Defense industry
'kongsberg rumia',
'kongsberg polska',
'kongsberg defence',
'przemysł obronny pomorze',
'przemysł zbrojeniowy pomorze',
# Technology
'centrum danych gdynia',
'centrum danych pomorze',
'data center pomorze',
'wodór pomorze',
'hydrogen pomorze',
'laboratoria wodorowe',
# Key people
'samsonowicz mon',
'maciej samsonowicz',
'kosiniak-kamysz przemysł',
# Locations
'transformacja energetyczna pomorze',
'inwestycje wejherowo',
'inwestycje rumia',
'strefa ekonomiczna rumia',
'rumia invest park',
# Organizations
'norda biznes',
'spoko gospodarcze',
'izba gospodarcza pomorze'
]
@dataclass
class NewsItem:
"""Represents a news item from any source"""
title: str
url: str
description: str
source_name: str
source_type: str # brave, google_news, rss_local
source_id: str # specific source identifier
published_at: Optional[datetime]
image_url: Optional[str] = None
@property
def url_hash(self) -> str:
"""SHA256 hash of URL for exact deduplication"""
return hashlib.sha256(self.url.encode()).hexdigest()
@property
def title_hash(self) -> str:
"""Normalized title hash for fuzzy matching"""
return normalize_title_hash(self.title)
@property
def domain(self) -> str:
"""Extract domain from URL"""
parsed = urlparse(self.url)
return parsed.netloc.replace('www.', '')
def normalize_title_hash(title: str) -> str:
"""
Create a normalized hash from title for fuzzy matching.
Normalization:
- Lowercase
- Remove diacritics (ą→a, ę→e, etc.)
- Remove punctuation
- Remove common words (i, w, z, na, do, etc.)
- Sort words alphabetically
- Hash the result
"""
if not title:
return ''
# Lowercase
text = title.lower()
# Remove diacritics
text = unicodedata.normalize('NFKD', text)
text = ''.join(c for c in text if not unicodedata.combining(c))
# Remove punctuation
text = re.sub(r'[^\w\s]', '', text)
# Remove common Polish stop words
stop_words = {'i', 'w', 'z', 'na', 'do', 'o', 'od', 'za', 'po', 'przy', 'dla', 'oraz', 'sie', 'to', 'jest', 'ze', 'nie', 'jak', 'czy', 'ale', 'a'}
words = [w for w in text.split() if w not in stop_words and len(w) > 2]
# Sort and join
text = ' '.join(sorted(words))
# Hash
return hashlib.sha256(text.encode()).hexdigest()[:32]
def is_zopk_relevant(title: str, description: str = '') -> bool:
"""Check if content is relevant to ZOPK topics"""
text = f"{title} {description}".lower()
for keyword in ZOPK_KEYWORDS:
if keyword.lower() in text:
return True
return False
class ZOPKNewsService:
"""
Multi-source news search service with cross-verification.
"""
def __init__(self, db_session, brave_api_key: Optional[str] = None):
self.db = db_session
self.brave_api_key = brave_api_key or os.getenv('BRAVE_API_KEY')
def search_all_sources(self, query: str = 'Zielony Okręg Przemysłowy Kaszubia') -> Dict:
"""
Search all sources and return aggregated results with cross-verification.
Returns:
Dict with search results and statistics
"""
all_items: List[NewsItem] = []
source_stats = {}
# 1. Brave Search API
if self.brave_api_key:
brave_items = self._search_brave(query)
all_items.extend(brave_items)
source_stats['brave'] = len(brave_items)
logger.info(f"Brave Search: found {len(brave_items)} items")
# 2. RSS Feeds
for source_id, source_config in RSS_SOURCES.items():
rss_items = self._fetch_rss(source_id, source_config)
all_items.extend(rss_items)
source_stats[source_id] = len(rss_items)
logger.info(f"RSS {source_id}: found {len(rss_items)} items")
# 3. Cross-verify and deduplicate
verified_items = self._cross_verify(all_items)
# 4. Save to database
saved_count, updated_count = self._save_to_database(verified_items)
return {
'total_found': len(all_items),
'unique_items': len(verified_items),
'saved_new': saved_count,
'updated_existing': updated_count,
'source_stats': source_stats,
'auto_approved': sum(1 for item in verified_items if item.get('auto_approve', False))
}
def _search_brave(self, query: str) -> List[NewsItem]:
"""Search Brave API for news"""
if not self.brave_api_key:
return []
items = []
try:
headers = {
'Accept': 'application/json',
'X-Subscription-Token': self.brave_api_key
}
params = {
'q': query,
'count': 20,
'freshness': 'pm', # past month
'country': 'pl',
'search_lang': 'pl'
}
response = requests.get(
'https://api.search.brave.com/res/v1/news/search',
headers=headers,
params=params,
timeout=30
)
if response.status_code == 200:
results = response.json().get('results', [])
for item in results:
if item.get('url'):
items.append(NewsItem(
title=item.get('title', 'Bez tytułu'),
url=item['url'],
description=item.get('description', ''),
source_name=item.get('source', ''),
source_type='brave',
source_id='brave_search',
published_at=datetime.now(), # Brave doesn't provide exact date
image_url=item.get('thumbnail', {}).get('src')
))
else:
logger.error(f"Brave API error: {response.status_code}")
except Exception as e:
logger.error(f"Brave search error: {e}")
return items
def _fetch_rss(self, source_id: str, config: Dict) -> List[NewsItem]:
"""Fetch and parse RSS feed"""
items = []
try:
feed = feedparser.parse(config['url'])
for entry in feed.entries[:30]: # Limit to 30 per feed
title = entry.get('title', '')
description = entry.get('summary', entry.get('description', ''))
# Filter by keywords if specified
keywords = config.get('keywords', [])
if keywords and not any(kw in f"{title} {description}".lower() for kw in keywords):
continue
# Check ZOPK relevance for local media
if config['type'] == 'local_media' and not is_zopk_relevant(title, description):
continue
# Parse date
published_at = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
published_at = datetime(*entry.published_parsed[:6])
items.append(NewsItem(
title=title,
url=entry.get('link', ''),
description=description[:500],
source_name=config['name'],
source_type='rss_' + config['type'],
source_id=source_id,
published_at=published_at,
image_url=self._extract_image_from_entry(entry)
))
except Exception as e:
logger.error(f"RSS fetch error for {source_id}: {e}")
return items
def _extract_image_from_entry(self, entry) -> Optional[str]:
"""Extract image URL from RSS entry"""
# Try media:thumbnail
if hasattr(entry, 'media_thumbnail') and entry.media_thumbnail:
return entry.media_thumbnail[0].get('url')
# Try media:content
if hasattr(entry, 'media_content') and entry.media_content:
for media in entry.media_content:
if media.get('type', '').startswith('image/'):
return media.get('url')
# Try enclosure
if hasattr(entry, 'enclosures') and entry.enclosures:
for enc in entry.enclosures:
if enc.get('type', '').startswith('image/'):
return enc.get('href')
return None
def _cross_verify(self, items: List[NewsItem]) -> List[Dict]:
"""
Cross-verify items from multiple sources.
Groups items by title_hash to find the same story from different sources.
Increases confidence_score based on number of sources.
"""
# Group by title_hash (fuzzy match)
title_groups: Dict[str, List[NewsItem]] = {}
for item in items:
title_hash = item.title_hash
if title_hash not in title_groups:
title_groups[title_hash] = []
title_groups[title_hash].append(item)
# Also track URL hashes to avoid exact duplicates
seen_urls = set()
verified_items = []
for title_hash, group in title_groups.items():
# Get unique sources
unique_sources = list(set(item.source_id for item in group))
source_count = len(unique_sources)
# Use the first item as base (prefer Brave for better metadata)
base_item = sorted(group, key=lambda x: x.source_type != 'brave')[0]
if base_item.url_hash in seen_urls:
continue
seen_urls.add(base_item.url_hash)
# Calculate confidence
confidence_score = min(5, source_count + 1) # 1-5 scale
auto_approve = source_count >= 3
verified_items.append({
'title': base_item.title,
'url': base_item.url,
'url_hash': base_item.url_hash,
'title_hash': title_hash,
'description': base_item.description,
'source_name': base_item.source_name,
'source_domain': base_item.domain,
'source_type': base_item.source_type,
'published_at': base_item.published_at,
'image_url': base_item.image_url,
'confidence_score': confidence_score,
'source_count': source_count,
'sources_list': unique_sources,
'auto_approve': auto_approve
})
return verified_items
def _save_to_database(self, items: List[Dict]) -> Tuple[int, int]:
"""
Save verified items to database.
Returns:
Tuple of (new_count, updated_count)
"""
from database import ZOPKNews
new_count = 0
updated_count = 0
for item in items:
# Check if URL already exists
existing = self.db.query(ZOPKNews).filter(
ZOPKNews.url_hash == item['url_hash']
).first()
if existing:
# Update source count and confidence if new sources found
existing_sources = existing.sources_list or []
new_sources = [s for s in item['sources_list'] if s not in existing_sources]
if new_sources:
existing.sources_list = existing_sources + new_sources
existing.source_count = len(existing.sources_list)
existing.confidence_score = min(5, existing.source_count + 1)
# Auto-approve if threshold reached
if existing.source_count >= 3 and existing.status == 'pending':
existing.status = 'auto_approved'
existing.is_auto_verified = True
updated_count += 1
else:
# Create new entry
status = 'auto_approved' if item['auto_approve'] else 'pending'
news = ZOPKNews(
title=item['title'],
url=item['url'],
url_hash=item['url_hash'],
title_hash=item['title_hash'],
description=item['description'],
source_name=item['source_name'],
source_domain=item['source_domain'],
source_type=item['source_type'],
published_at=item['published_at'],
image_url=item['image_url'],
confidence_score=item['confidence_score'],
source_count=item['source_count'],
sources_list=item['sources_list'],
is_auto_verified=item['auto_approve'],
status=status
)
self.db.add(news)
new_count += 1
self.db.commit()
return new_count, updated_count
def search_zopk_news(db_session, query: str = None) -> Dict:
"""
Convenience function to search ZOPK news from all sources.
Usage:
from zopk_news_service import search_zopk_news
results = search_zopk_news(db)
"""
service = ZOPKNewsService(db_session)
return service.search_all_sources(query or 'Zielony Okręg Przemysłowy Kaszubia')
# ============================================================
# AI RELEVANCE EVALUATION (GEMINI)
# ============================================================
ZOPK_AI_EVALUATION_PROMPT = """Jesteś ekspertem ds. analizy wiadomości. Oceń, czy poniższy artykuł/news dotyczy projektu **Zielony Okręg Przemysłowy Kaszubia (ZOPK)** lub związanych z nim tematów.
**ZOPK obejmuje:**
1. Morską energetykę wiatrową na Bałtyku (offshore wind)
2. Elektrownię jądrową w Lubiatowie-Kopalino (Choczewo)
3. Inwestycję Kongsberg w Rumi (przemysł obronny)
4. Centra danych i laboratoria wodorowe
5. Rozwój przemysłowy Kaszub (Wejherowo, Rumia, Gdynia)
6. Kluczowe osoby: Maciej Samsonowicz (koordynator ZOPK), minister Kosiniak-Kamysz
**Artykuł do oceny:**
Tytuł: {title}
Opis: {description}
Źródło: {source}
Data: {date}
**Twoje zadanie:**
1. Oceń czy artykuł dotyczy ZOPK lub powiązanych tematów
2. Przyznaj ocenę od 1 do 5 gwiazdek:
- ⭐ 1 = Bardzo słabo powiązany (luźna styczność z regionem/przemysłem)
- ⭐⭐ 2 = Słabo powiązany (ogólne wiadomości branżowe)
- ⭐⭐⭐ 3 = Średnio powiązany (dotyczy branży ZOPK, ale nie bezpośrednio projektu)
- ⭐⭐⭐⭐ 4 = Mocno powiązany (bezpośrednio dotyczy inwestycji lub kluczowych firm ZOPK)
- ⭐⭐⭐⭐⭐ 5 = Doskonale pasuje (główny temat to ZOPK, Kongsberg, offshore Baltic, elektrownia Choczewo)
3. Odpowiedz TYLKO w formacie JSON (bez żadnego innego tekstu):
{{"relevant": true/false, "score": 1-5, "reason": "krótkie uzasadnienie po polsku (max 100 znaków)"}}
Zasady:
- relevant=true gdy score >= 3
- relevant=false gdy score < 3
Przykłady odpowiedzi:
{{"relevant": true, "score": 5, "reason": "Bezpośrednio o inwestycji Kongsberg w Rumi"}}
{{"relevant": true, "score": 4, "reason": "Dotyczy farm wiatrowych Baltic Power"}}
{{"relevant": true, "score": 3, "reason": "Ogólne informacje o offshore wind w Polsce"}}
{{"relevant": false, "score": 2, "reason": "Artykuł o energetyce, ale nie dotyczy Bałtyku"}}
{{"relevant": false, "score": 1, "reason": "News sportowy bez związku z przemysłem"}}"""
def evaluate_news_relevance(news_item, gemini_service=None) -> Dict:
"""
Evaluate a single news item for ZOPK relevance using Gemini AI.
Args:
news_item: ZOPKNews object or dict with title, description, source_name, published_at
gemini_service: Optional GeminiService instance (uses global if not provided)
Returns:
Dict with keys: relevant (bool), reason (str), evaluated (bool)
"""
import json
# Get Gemini service
if gemini_service is None:
try:
from gemini_service import get_gemini_service
gemini_service = get_gemini_service()
except Exception as e:
logger.error(f"Failed to get Gemini service: {e}")
return {'relevant': None, 'reason': 'Gemini service unavailable', 'evaluated': False}
if gemini_service is None:
return {'relevant': None, 'reason': 'Gemini service not initialized', 'evaluated': False}
# Extract fields from news_item
if hasattr(news_item, 'title'):
title = news_item.title or ''
description = news_item.description or ''
source = news_item.source_name or news_item.source_domain or ''
date = news_item.published_at.strftime('%Y-%m-%d') if news_item.published_at else ''
else:
title = news_item.get('title', '')
description = news_item.get('description', '')
source = news_item.get('source_name', '')
date = news_item.get('published_at', '')
# Build prompt
prompt = ZOPK_AI_EVALUATION_PROMPT.format(
title=title[:500], # Limit length
description=description[:1000] if description else 'Brak opisu',
source=source[:100],
date=date
)
try:
# Call Gemini with low temperature for consistent results
response = gemini_service.generate_text(
prompt,
temperature=0.1,
feature='zopk_news_evaluation'
)
# Parse JSON response
# Try to extract JSON from response (handle markdown code blocks)
json_match = re.search(r'\{[^{}]*\}', response)
if json_match:
result = json.loads(json_match.group())
# Extract score (1-5), default to 3 if not present
score = int(result.get('score', 3))
score = max(1, min(5, score)) # Clamp to 1-5 range
return {
'relevant': bool(result.get('relevant', score >= 3)),
'score': score,
'reason': str(result.get('reason', ''))[:255],
'evaluated': True
}
else:
logger.warning(f"Could not parse Gemini response: {response[:200]}")
return {'relevant': None, 'score': None, 'reason': 'Invalid AI response format', 'evaluated': False}
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
return {'relevant': None, 'score': None, 'reason': f'JSON parse error: {str(e)[:50]}', 'evaluated': False}
except Exception as e:
logger.error(f"Gemini evaluation error: {e}")
return {'relevant': None, 'score': None, 'reason': f'AI error: {str(e)[:50]}', 'evaluated': False}
def evaluate_pending_news(db_session, limit: int = 50, user_id: int = None) -> Dict:
"""
Evaluate multiple pending news items for ZOPK relevance.
Args:
db_session: SQLAlchemy session
limit: Max number of items to evaluate (to avoid API limits)
user_id: User triggering the evaluation (for logging)
Returns:
Dict with stats: total_evaluated, relevant_count, not_relevant_count, errors
"""
from database import ZOPKNews
from datetime import datetime
# Get pending news that haven't been AI-evaluated yet
pending_news = db_session.query(ZOPKNews).filter(
ZOPKNews.status == 'pending',
ZOPKNews.ai_relevant.is_(None) # Not yet evaluated
).order_by(ZOPKNews.created_at.desc()).limit(limit).all()
if not pending_news:
return {
'total_evaluated': 0,
'relevant_count': 0,
'not_relevant_count': 0,
'errors': 0,
'message': 'Brak newsów do oceny'
}
# Get Gemini service once
try:
from gemini_service import get_gemini_service
gemini = get_gemini_service()
except Exception as e:
return {
'total_evaluated': 0,
'relevant_count': 0,
'not_relevant_count': 0,
'errors': 1,
'message': f'Gemini service error: {str(e)}'
}
stats = {
'total_evaluated': 0,
'relevant_count': 0,
'not_relevant_count': 0,
'errors': 0
}
for news in pending_news:
result = evaluate_news_relevance(news, gemini)
if result['evaluated']:
news.ai_relevant = result['relevant']
news.ai_relevance_score = result.get('score') # 1-5 stars
news.ai_evaluation_reason = result['reason']
news.ai_evaluated_at = datetime.now()
news.ai_model = 'gemini-2.0-flash'
stats['total_evaluated'] += 1
if result['relevant']:
stats['relevant_count'] += 1
else:
stats['not_relevant_count'] += 1
else:
stats['errors'] += 1
logger.warning(f"Failed to evaluate news {news.id}: {result['reason']}")
# Commit all changes
try:
db_session.commit()
stats['message'] = f"Oceniono {stats['total_evaluated']} newsów: {stats['relevant_count']} pasuje, {stats['not_relevant_count']} nie pasuje"
except Exception as e:
db_session.rollback()
stats['errors'] += 1
stats['message'] = f'Database error: {str(e)}'
return stats