nordabiz/zopk_news_service.py
Maciej Pienczyn 46ef40950f feat: Add multi-source news search with cross-verification for ZOPK
- Add ZOPKNewsService with multiple RSS sources (12 feeds)
- Implement cross-verification logic (3+ sources = auto_approved)
- Add title hash normalization for fuzzy deduplication
- Include government sources: MON, Ministerstwo Rozwoju
- Include Google News searches for key topics and people
- Add confidence scoring (1-5 based on source count)
- Update SQL migration with cross-verification columns

Sources: Brave API, trojmiasto.pl, Dziennik Bałtycki, Google News,
gov.pl/obrona-narodowa, gov.pl/rozwoj-technologia

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 06:41:31 +01:00

516 lines
17 KiB
Python

"""
ZOPK News Service
================
Multi-source news search and cross-verification for
Zielony Okręg Przemysłowy Kaszubia (ZOPK) knowledge base.
Sources:
- Brave Search API (web news)
- Google News RSS (aggregated news)
- Local media RSS feeds (trojmiasto.pl, dziennikbaltycki.pl)
Cross-verification:
- 1 source → pending (manual moderation required)
- 2 sources → pending with higher confidence
- 3+ sources → auto_approved (verified automatically)
Author: NordaBiz Development Team
Created: 2026-01-11
"""
import os
import re
import hashlib
import logging
import unicodedata
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from urllib.parse import urlparse
import requests
import feedparser
logger = logging.getLogger(__name__)
# ============================================================
# RSS FEED SOURCES
# ============================================================
RSS_SOURCES = {
# Local media
'trojmiasto': {
'url': 'https://www.trojmiasto.pl/rss/wiadomosci.xml',
'name': 'trojmiasto.pl',
'type': 'local_media',
'keywords': ['kaszubia', 'wejherowo', 'rumia', 'gdynia', 'pomorze', 'offshore', 'energia', 'przemysł', 'samsonowicz', 'kongsberg']
},
'dziennik_baltycki': {
'url': 'https://dziennikbaltycki.pl/rss/najnowsze.xml',
'name': 'Dziennik Bałtycki',
'type': 'local_media',
'keywords': ['kaszubia', 'wejherowo', 'rumia', 'gdynia', 'elektrownia', 'offshore', 'samsonowicz', 'kongsberg', 'lubiatowo']
},
# Government sources
'gov_mon': {
'url': 'https://www.gov.pl/web/obrona-narodowa/rss',
'name': 'Ministerstwo Obrony Narodowej',
'type': 'government',
'keywords': ['kongsberg', 'przemysł obronny', 'kaszubia', 'rumia', 'samsonowicz', 'inwestycje']
},
'gov_przemysl': {
'url': 'https://www.gov.pl/web/rozwoj-technologia/rss',
'name': 'Ministerstwo Rozwoju i Technologii',
'type': 'government',
'keywords': ['offshore', 'elektrownia jądrowa', 'centrum danych', 'wodór', 'transformacja']
},
# Google News aggregated searches
'google_news_zopk': {
'url': 'https://news.google.com/rss/search?q=Zielony+Okr%C4%99g+Przemys%C5%82owy+Kaszubia&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': [] # No filtering, query-based
},
'google_news_offshore': {
'url': 'https://news.google.com/rss/search?q=offshore+Polska+Baltyk&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_nuclear': {
'url': 'https://news.google.com/rss/search?q=elektrownia+jadrowa+Polska+Lubiatowo&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_samsonowicz': {
'url': 'https://news.google.com/rss/search?q=Maciej+Samsonowicz+MON&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_kongsberg': {
'url': 'https://news.google.com/rss/search?q=Kongsberg+Polska+Rumia&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
# Business/local organizations (via Google News)
'google_news_norda': {
'url': 'https://news.google.com/rss/search?q=Norda+Biznes+Wejherowo&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
},
'google_news_spoko': {
'url': 'https://news.google.com/rss/search?q=Spoko+Gospodarcze+Pomorze&hl=pl&gl=PL&ceid=PL:pl',
'name': 'Google News',
'type': 'aggregator',
'keywords': []
}
}
# ZOPK-related keywords for filtering
ZOPK_KEYWORDS = [
# Project names
'zielony okręg przemysłowy',
'zopk',
'kaszubia przemysłowa',
# Energy projects
'offshore wind polska',
'offshore bałtyk',
'farma wiatrowa bałtyk',
'elektrownia jądrowa lubiatowo',
'elektrownia jądrowa kopalino',
'pej lubiatowo', # Polskie Elektrownie Jądrowe
# Defense industry
'kongsberg rumia',
'kongsberg polska',
'kongsberg defence',
'przemysł obronny pomorze',
'przemysł zbrojeniowy pomorze',
# Technology
'centrum danych gdynia',
'centrum danych pomorze',
'data center pomorze',
'wodór pomorze',
'hydrogen pomorze',
'laboratoria wodorowe',
# Key people
'samsonowicz mon',
'maciej samsonowicz',
'kosiniak-kamysz przemysł',
# Locations
'transformacja energetyczna pomorze',
'inwestycje wejherowo',
'inwestycje rumia',
'strefa ekonomiczna rumia',
'rumia invest park',
# Organizations
'norda biznes',
'spoko gospodarcze',
'izba gospodarcza pomorze'
]
@dataclass
class NewsItem:
"""Represents a news item from any source"""
title: str
url: str
description: str
source_name: str
source_type: str # brave, google_news, rss_local
source_id: str # specific source identifier
published_at: Optional[datetime]
image_url: Optional[str] = None
@property
def url_hash(self) -> str:
"""SHA256 hash of URL for exact deduplication"""
return hashlib.sha256(self.url.encode()).hexdigest()
@property
def title_hash(self) -> str:
"""Normalized title hash for fuzzy matching"""
return normalize_title_hash(self.title)
@property
def domain(self) -> str:
"""Extract domain from URL"""
parsed = urlparse(self.url)
return parsed.netloc.replace('www.', '')
def normalize_title_hash(title: str) -> str:
"""
Create a normalized hash from title for fuzzy matching.
Normalization:
- Lowercase
- Remove diacritics (ą→a, ę→e, etc.)
- Remove punctuation
- Remove common words (i, w, z, na, do, etc.)
- Sort words alphabetically
- Hash the result
"""
if not title:
return ''
# Lowercase
text = title.lower()
# Remove diacritics
text = unicodedata.normalize('NFKD', text)
text = ''.join(c for c in text if not unicodedata.combining(c))
# Remove punctuation
text = re.sub(r'[^\w\s]', '', text)
# Remove common Polish stop words
stop_words = {'i', 'w', 'z', 'na', 'do', 'o', 'od', 'za', 'po', 'przy', 'dla', 'oraz', 'sie', 'to', 'jest', 'ze', 'nie', 'jak', 'czy', 'ale', 'a'}
words = [w for w in text.split() if w not in stop_words and len(w) > 2]
# Sort and join
text = ' '.join(sorted(words))
# Hash
return hashlib.sha256(text.encode()).hexdigest()[:32]
def is_zopk_relevant(title: str, description: str = '') -> bool:
"""Check if content is relevant to ZOPK topics"""
text = f"{title} {description}".lower()
for keyword in ZOPK_KEYWORDS:
if keyword.lower() in text:
return True
return False
class ZOPKNewsService:
"""
Multi-source news search service with cross-verification.
"""
def __init__(self, db_session, brave_api_key: Optional[str] = None):
self.db = db_session
self.brave_api_key = brave_api_key or os.getenv('BRAVE_API_KEY')
def search_all_sources(self, query: str = 'Zielony Okręg Przemysłowy Kaszubia') -> Dict:
"""
Search all sources and return aggregated results with cross-verification.
Returns:
Dict with search results and statistics
"""
all_items: List[NewsItem] = []
source_stats = {}
# 1. Brave Search API
if self.brave_api_key:
brave_items = self._search_brave(query)
all_items.extend(brave_items)
source_stats['brave'] = len(brave_items)
logger.info(f"Brave Search: found {len(brave_items)} items")
# 2. RSS Feeds
for source_id, source_config in RSS_SOURCES.items():
rss_items = self._fetch_rss(source_id, source_config)
all_items.extend(rss_items)
source_stats[source_id] = len(rss_items)
logger.info(f"RSS {source_id}: found {len(rss_items)} items")
# 3. Cross-verify and deduplicate
verified_items = self._cross_verify(all_items)
# 4. Save to database
saved_count, updated_count = self._save_to_database(verified_items)
return {
'total_found': len(all_items),
'unique_items': len(verified_items),
'saved_new': saved_count,
'updated_existing': updated_count,
'source_stats': source_stats,
'auto_approved': sum(1 for item in verified_items if item.get('auto_approve', False))
}
def _search_brave(self, query: str) -> List[NewsItem]:
"""Search Brave API for news"""
if not self.brave_api_key:
return []
items = []
try:
headers = {
'Accept': 'application/json',
'X-Subscription-Token': self.brave_api_key
}
params = {
'q': query,
'count': 20,
'freshness': 'pm', # past month
'country': 'pl',
'search_lang': 'pl'
}
response = requests.get(
'https://api.search.brave.com/res/v1/news/search',
headers=headers,
params=params,
timeout=30
)
if response.status_code == 200:
results = response.json().get('results', [])
for item in results:
if item.get('url'):
items.append(NewsItem(
title=item.get('title', 'Bez tytułu'),
url=item['url'],
description=item.get('description', ''),
source_name=item.get('source', ''),
source_type='brave',
source_id='brave_search',
published_at=datetime.now(), # Brave doesn't provide exact date
image_url=item.get('thumbnail', {}).get('src')
))
else:
logger.error(f"Brave API error: {response.status_code}")
except Exception as e:
logger.error(f"Brave search error: {e}")
return items
def _fetch_rss(self, source_id: str, config: Dict) -> List[NewsItem]:
"""Fetch and parse RSS feed"""
items = []
try:
feed = feedparser.parse(config['url'])
for entry in feed.entries[:30]: # Limit to 30 per feed
title = entry.get('title', '')
description = entry.get('summary', entry.get('description', ''))
# Filter by keywords if specified
keywords = config.get('keywords', [])
if keywords and not any(kw in f"{title} {description}".lower() for kw in keywords):
continue
# Check ZOPK relevance for local media
if config['type'] == 'local_media' and not is_zopk_relevant(title, description):
continue
# Parse date
published_at = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
published_at = datetime(*entry.published_parsed[:6])
items.append(NewsItem(
title=title,
url=entry.get('link', ''),
description=description[:500],
source_name=config['name'],
source_type='rss_' + config['type'],
source_id=source_id,
published_at=published_at,
image_url=self._extract_image_from_entry(entry)
))
except Exception as e:
logger.error(f"RSS fetch error for {source_id}: {e}")
return items
def _extract_image_from_entry(self, entry) -> Optional[str]:
"""Extract image URL from RSS entry"""
# Try media:thumbnail
if hasattr(entry, 'media_thumbnail') and entry.media_thumbnail:
return entry.media_thumbnail[0].get('url')
# Try media:content
if hasattr(entry, 'media_content') and entry.media_content:
for media in entry.media_content:
if media.get('type', '').startswith('image/'):
return media.get('url')
# Try enclosure
if hasattr(entry, 'enclosures') and entry.enclosures:
for enc in entry.enclosures:
if enc.get('type', '').startswith('image/'):
return enc.get('href')
return None
def _cross_verify(self, items: List[NewsItem]) -> List[Dict]:
"""
Cross-verify items from multiple sources.
Groups items by title_hash to find the same story from different sources.
Increases confidence_score based on number of sources.
"""
# Group by title_hash (fuzzy match)
title_groups: Dict[str, List[NewsItem]] = {}
for item in items:
title_hash = item.title_hash
if title_hash not in title_groups:
title_groups[title_hash] = []
title_groups[title_hash].append(item)
# Also track URL hashes to avoid exact duplicates
seen_urls = set()
verified_items = []
for title_hash, group in title_groups.items():
# Get unique sources
unique_sources = list(set(item.source_id for item in group))
source_count = len(unique_sources)
# Use the first item as base (prefer Brave for better metadata)
base_item = sorted(group, key=lambda x: x.source_type != 'brave')[0]
if base_item.url_hash in seen_urls:
continue
seen_urls.add(base_item.url_hash)
# Calculate confidence
confidence_score = min(5, source_count + 1) # 1-5 scale
auto_approve = source_count >= 3
verified_items.append({
'title': base_item.title,
'url': base_item.url,
'url_hash': base_item.url_hash,
'title_hash': title_hash,
'description': base_item.description,
'source_name': base_item.source_name,
'source_domain': base_item.domain,
'source_type': base_item.source_type,
'published_at': base_item.published_at,
'image_url': base_item.image_url,
'confidence_score': confidence_score,
'source_count': source_count,
'sources_list': unique_sources,
'auto_approve': auto_approve
})
return verified_items
def _save_to_database(self, items: List[Dict]) -> Tuple[int, int]:
"""
Save verified items to database.
Returns:
Tuple of (new_count, updated_count)
"""
from database import ZOPKNews
new_count = 0
updated_count = 0
for item in items:
# Check if URL already exists
existing = self.db.query(ZOPKNews).filter(
ZOPKNews.url_hash == item['url_hash']
).first()
if existing:
# Update source count and confidence if new sources found
existing_sources = existing.sources_list or []
new_sources = [s for s in item['sources_list'] if s not in existing_sources]
if new_sources:
existing.sources_list = existing_sources + new_sources
existing.source_count = len(existing.sources_list)
existing.confidence_score = min(5, existing.source_count + 1)
# Auto-approve if threshold reached
if existing.source_count >= 3 and existing.status == 'pending':
existing.status = 'auto_approved'
existing.is_auto_verified = True
updated_count += 1
else:
# Create new entry
status = 'auto_approved' if item['auto_approve'] else 'pending'
news = ZOPKNews(
title=item['title'],
url=item['url'],
url_hash=item['url_hash'],
title_hash=item['title_hash'],
description=item['description'],
source_name=item['source_name'],
source_domain=item['source_domain'],
source_type=item['source_type'],
published_at=item['published_at'],
image_url=item['image_url'],
confidence_score=item['confidence_score'],
source_count=item['source_count'],
sources_list=item['sources_list'],
is_auto_verified=item['auto_approve'],
status=status
)
self.db.add(news)
new_count += 1
self.db.commit()
return new_count, updated_count
def search_zopk_news(db_session, query: str = None) -> Dict:
"""
Convenience function to search ZOPK news from all sources.
Usage:
from zopk_news_service import search_zopk_news
results = search_zopk_news(db)
"""
service = ZOPKNewsService(db_session)
return service.search_all_sources(query or 'Zielony Okręg Przemysłowy Kaszubia')