""" ZOPK News Routes - Admin blueprint Migrated from app.py as part of the blueprint refactoring. Contains routes for ZOPK news management, scraping, and AI evaluation. """ import hashlib import json import logging import queue import threading import uuid from datetime import datetime from urllib.parse import urlparse from flask import flash, jsonify, redirect, render_template, request, url_for, Response, stream_with_context from flask_login import current_user, login_required from sqlalchemy import desc, asc, func, or_ from sqlalchemy.sql import nullslast from database import ( SessionLocal, SystemRole, ZOPKProject, ZOPKNews, ZOPKNewsFetchJob ) from utils.decorators import role_required from . import bp logger = logging.getLogger(__name__) @bp.route('/zopk/news') @login_required @role_required(SystemRole.ADMIN) def admin_zopk_news(): """Admin news management for ZOPK""" db = SessionLocal() try: page = request.args.get('page', 1, type=int) status = request.args.get('status', 'all') stars = request.args.get('stars', 'all') # 'all', '1'-'5', 'none' sort_by = request.args.get('sort', 'date') # 'date', 'score', 'title' sort_dir = request.args.get('dir', 'desc') # 'asc', 'desc' per_page = 50 query = db.query(ZOPKNews) if status != 'all': query = query.filter(ZOPKNews.status == status) # Filter by star rating if stars == 'none': query = query.filter(ZOPKNews.ai_relevance_score.is_(None)) elif stars in ['1', '2', '3', '4', '5']: query = query.filter(ZOPKNews.ai_relevance_score == int(stars)) # 'all' - no filter # Apply sorting sort_func = desc if sort_dir == 'desc' else asc if sort_by == 'score': # Sort by AI score (nulls last so evaluated items come first) query = query.order_by(nullslast(sort_func(ZOPKNews.ai_relevance_score))) elif sort_by == 'title': query = query.order_by(sort_func(ZOPKNews.title)) else: # default: date query = query.order_by(sort_func(ZOPKNews.published_at)) total = query.count() news_items = query.offset((page - 1) * per_page).limit(per_page).all() total_pages = (total + per_page - 1) // per_page projects = db.query(ZOPKProject).order_by(ZOPKProject.sort_order).all() return render_template('admin/zopk_news.html', news_items=news_items, projects=projects, page=page, total_pages=total_pages, total=total, current_status=status, current_stars=stars, current_sort=sort_by, current_dir=sort_dir ) finally: db.close() @bp.route('/zopk/news//approve', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_news_approve(news_id): """Approve a ZOPK news item""" db = SessionLocal() try: news = db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() if not news: return jsonify({'success': False, 'error': 'Nie znaleziono newsa'}), 404 news.status = 'approved' news.moderated_by = current_user.id news.moderated_at = datetime.now() db.commit() return jsonify({'success': True, 'message': 'News został zatwierdzony'}) except Exception as e: db.rollback() logger.error(f"Error approving ZOPK news {news_id}: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas zatwierdzania'}), 500 finally: db.close() @bp.route('/zopk/news//reject', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_news_reject(news_id): """Reject a ZOPK news item""" db = SessionLocal() try: data = request.get_json() or {} reason = data.get('reason', '') news = db.query(ZOPKNews).filter(ZOPKNews.id == news_id).first() if not news: return jsonify({'success': False, 'error': 'Nie znaleziono newsa'}), 404 news.status = 'rejected' news.moderated_by = current_user.id news.moderated_at = datetime.now() news.rejection_reason = reason db.commit() return jsonify({'success': True, 'message': 'News został odrzucony'}) except Exception as e: db.rollback() logger.error(f"Error rejecting ZOPK news {news_id}: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas odrzucania'}), 500 finally: db.close() @bp.route('/zopk/news/add', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_news_add(): """Manually add a ZOPK news item""" db = SessionLocal() try: data = request.get_json() or {} title = data.get('title', '').strip() url = data.get('url', '').strip() description = data.get('description', '').strip() source_name = data.get('source_name', '').strip() project_id = data.get('project_id') if not title or not url: return jsonify({'success': False, 'error': 'Tytuł i URL są wymagane'}), 400 # SECURITY: Validate URL protocol (block javascript:, data:, etc.) parsed = urlparse(url) allowed_protocols = ('http', 'https') if parsed.scheme.lower() not in allowed_protocols: return jsonify({'success': False, 'error': 'Nieprawidłowy protokół URL. Dozwolone: http, https'}), 400 # SECURITY: Validate project_id if provided if project_id: try: project_id = int(project_id) project = db.query(ZOPKProject).filter(ZOPKProject.id == project_id).first() if not project: return jsonify({'success': False, 'error': 'Nieprawidłowy ID projektu'}), 400 except (ValueError, TypeError): return jsonify({'success': False, 'error': 'ID projektu musi być liczbą'}), 400 else: project_id = None # Generate URL hash for deduplication url_hash = hashlib.sha256(url.encode()).hexdigest() # Check if URL already exists existing = db.query(ZOPKNews).filter(ZOPKNews.url_hash == url_hash).first() if existing: return jsonify({'success': False, 'error': 'Ten artykuł już istnieje w bazie'}), 400 # Extract domain from URL source_domain = parsed.netloc.replace('www.', '') news = ZOPKNews( title=title, url=url, url_hash=url_hash, description=description, source_name=source_name or source_domain, source_domain=source_domain, source_type='manual', status='approved', # Manual entries are auto-approved moderated_by=current_user.id, moderated_at=datetime.now(), published_at=datetime.now(), project_id=project_id ) db.add(news) db.commit() return jsonify({ 'success': True, 'message': 'News został dodany', 'news_id': news.id }) except Exception as e: db.rollback() logger.error(f"Error adding ZOPK news: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas dodawania newsa'}), 500 finally: db.close() @bp.route('/zopk/news/reject-old', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_reject_old_news(): """Reject all news from before a certain year (ZOPK didn't exist then)""" db = SessionLocal() try: data = request.get_json() or {} min_year = data.get('min_year', 2024) # Find all pending news from before min_year min_date = datetime(min_year, 1, 1) old_news = db.query(ZOPKNews).filter( ZOPKNews.status == 'pending', ZOPKNews.published_at < min_date ).all() count = len(old_news) # Reject them all for news in old_news: news.status = 'rejected' news.moderated_by = current_user.id news.moderated_at = datetime.now() news.rejection_reason = f'Automatycznie odrzucony - artykuł sprzed {min_year} roku (ZOP Kaszubia powstał w 2024)' db.commit() return jsonify({ 'success': True, 'message': f'Odrzucono {count} newsów sprzed {min_year} roku', 'count': count }) except Exception as e: db.rollback() logger.error(f"Error rejecting old ZOPK news: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas odrzucania starych newsów'}), 500 finally: db.close() @bp.route('/zopk/news/star-counts') @login_required @role_required(SystemRole.ADMIN) def admin_zopk_news_star_counts(): """Get counts of pending news items grouped by star rating""" db = SessionLocal() try: # Count pending news for each star rating (1-5 and NULL) counts = {} # Count for each star 1-5 for star in range(1, 6): count = db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status == 'pending', ZOPKNews.ai_relevance_score == star ).scalar() counts[star] = count # Count for NULL (no AI evaluation) count_null = db.query(func.count(ZOPKNews.id)).filter( ZOPKNews.status == 'pending', ZOPKNews.ai_relevance_score.is_(None) ).scalar() counts[0] = count_null return jsonify({ 'success': True, 'counts': counts }) except Exception as e: logger.error(f"Error getting ZOPK news star counts: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd'}), 500 finally: db.close() @bp.route('/zopk/news/reject-by-stars', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_reject_by_stars(): """Reject all pending news items with specified star ratings""" db = SessionLocal() try: data = request.get_json() or {} stars = data.get('stars', []) # List of star ratings to reject (0 = no rating) reason = data.get('reason', '') if not stars: return jsonify({'success': False, 'error': 'Nie wybrano ocen do odrzucenia'}), 400 # Validate stars input valid_stars = [s for s in stars if s in [0, 1, 2, 3, 4, 5]] if not valid_stars: return jsonify({'success': False, 'error': 'Nieprawidłowe oceny gwiazdkowe'}), 400 # Build query for pending news with specified stars conditions = [] for star in valid_stars: if star == 0: conditions.append(ZOPKNews.ai_relevance_score.is_(None)) else: conditions.append(ZOPKNews.ai_relevance_score == star) news_to_reject = db.query(ZOPKNews).filter( ZOPKNews.status == 'pending', or_(*conditions) ).all() count = len(news_to_reject) # Reject them all default_reason = f"Masowo odrzucone - oceny: {', '.join(str(s) + '★' if s > 0 else 'brak oceny' for s in valid_stars)}" final_reason = reason if reason else default_reason for news in news_to_reject: news.status = 'rejected' news.moderated_by = current_user.id news.moderated_at = datetime.now() news.rejection_reason = final_reason db.commit() logger.info(f"Admin {current_user.email} rejected {count} ZOPK news with stars {valid_stars}") return jsonify({ 'success': True, 'message': f'Odrzucono {count} artykułów', 'count': count }) except Exception as e: db.rollback() logger.error(f"Error rejecting ZOPK news by stars: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas odrzucania'}), 500 finally: db.close() @bp.route('/zopk/news/evaluate-ai', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_evaluate_ai(): """Evaluate pending news for ZOPK relevance using Gemini AI""" from zopk_news_service import evaluate_pending_news db = SessionLocal() try: data = request.get_json() or {} limit = data.get('limit', 50) # Max 50 to avoid API limits # Run AI evaluation result = evaluate_pending_news(db, limit=limit, user_id=current_user.id) return jsonify({ 'success': True, 'total_evaluated': result.get('total_evaluated', 0), 'relevant_count': result.get('relevant_count', 0), 'not_relevant_count': result.get('not_relevant_count', 0), 'errors': result.get('errors', 0), 'message': result.get('message', '') }) except Exception as e: db.rollback() logger.error(f"Error evaluating ZOPK news with AI: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas oceny AI'}), 500 finally: db.close() @bp.route('/zopk/news/reevaluate-scores', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_reevaluate_scores(): """Re-evaluate news items that have ai_relevant but no ai_relevance_score (1-5 stars)""" from zopk_news_service import reevaluate_news_without_score db = SessionLocal() try: data = request.get_json() or {} limit = data.get('limit', 50) # Max 50 to avoid API limits # Run AI re-evaluation for items missing scores result = reevaluate_news_without_score(db, limit=limit, user_id=current_user.id) return jsonify({ 'success': True, 'total_evaluated': result.get('total_evaluated', 0), 'relevant_count': result.get('relevant_count', 0), 'not_relevant_count': result.get('not_relevant_count', 0), 'errors': result.get('errors', 0), 'message': result.get('message', '') }) except Exception as e: db.rollback() logger.error(f"Error reevaluating ZOPK news scores: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas ponownej oceny'}), 500 finally: db.close() @bp.route('/zopk/news/reevaluate-low-scores', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_reevaluate_low_scores(): """ Re-evaluate news with low AI scores (1-2★) that contain key ZOPK topics. Useful after updating AI prompt to include new topics like Via Pomerania, S6, NORDA. Old articles scored low before these topics were recognized will be re-evaluated and potentially upgraded. """ from zopk_news_service import reevaluate_low_score_news db = SessionLocal() try: data = request.get_json() or {} limit = data.get('limit', 50) # Max 50 to avoid API limits # Run AI re-evaluation for low-score items with key topics result = reevaluate_low_score_news(db, limit=limit, user_id=current_user.id) return jsonify({ 'success': True, 'total_evaluated': result.get('total_evaluated', 0), 'upgraded': result.get('upgraded', 0), 'downgraded': result.get('downgraded', 0), 'unchanged': result.get('unchanged', 0), 'errors': result.get('errors', 0), 'message': result.get('message', ''), 'details': result.get('details', []) }) except Exception as e: db.rollback() logger.error(f"Error reevaluating low-score ZOPK news: {e}") return jsonify({'success': False, 'error': 'Wystąpił błąd podczas ponownej oceny'}), 500 finally: db.close() @bp.route('/zopk-api/search-news', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def api_zopk_search_news(): """ Search for ZOPK news using multiple sources with cross-verification. Sources: - Brave Search API - Google News RSS - Local media RSS (trojmiasto.pl, dziennikbaltycki.pl) Cross-verification: - 1 source → pending (manual review) - 3+ sources → auto_approved """ from zopk_news_service import ZOPKNewsService db = SessionLocal() try: data = request.get_json() or {} query = data.get('query', 'Zielony Okręg Przemysłowy Kaszubia') # Create fetch job record job_id = str(uuid.uuid4())[:8] fetch_job = ZOPKNewsFetchJob( job_id=job_id, search_query=query, search_api='multi_source', # Brave + RSS triggered_by='admin', triggered_by_user=current_user.id, status='running', started_at=datetime.now() ) db.add(fetch_job) db.commit() # Use multi-source service service = ZOPKNewsService(db) results = service.search_all_sources(query) # Update fetch job fetch_job.results_found = results['total_found'] fetch_job.results_new = results['saved_new'] fetch_job.results_approved = results['auto_approved'] fetch_job.status = 'completed' fetch_job.completed_at = datetime.now() db.commit() # Build detailed message source_info = ', '.join(f"{k}: {v}" for k, v in results['source_stats'].items() if v > 0) return jsonify({ 'success': True, 'message': f"Znaleziono {results['total_found']} wyników z {len(results['source_stats'])} źródeł. " f"Dodano {results['saved_new']} nowych, zaktualizowano {results['updated_existing']}. " f"Auto-zatwierdzono: {results['auto_approved']}", 'job_id': job_id, 'total_found': results['total_found'], 'unique_items': results['unique_items'], 'saved_new': results['saved_new'], 'updated_existing': results['updated_existing'], 'auto_approved': results['auto_approved'], 'ai_approved': results.get('ai_approved', 0), 'ai_rejected': results.get('ai_rejected', 0), 'blacklisted': results.get('blacklisted', 0), 'keyword_filtered': results.get('keyword_filtered', 0), 'sent_to_ai': results.get('sent_to_ai', 0), 'duplicates': results.get('duplicates', 0), 'processing_time': results.get('processing_time', 0), 'knowledge_entities_created': results.get('knowledge_entities_created', 0), 'source_stats': results['source_stats'], 'process_log': results.get('process_log', []), 'auto_approved_articles': results.get('auto_approved_articles', []), 'ai_rejected_articles': results.get('ai_rejected_articles', []) }) except Exception as e: db.rollback() logger.error(f"ZOPK news search error: {e}") # Update job status on error try: fetch_job.status = 'failed' fetch_job.error_message = str(e) # Keep internal log fetch_job.completed_at = datetime.now() db.commit() except: pass return jsonify({'success': False, 'error': 'Wystąpił błąd podczas wyszukiwania newsów'}), 500 finally: db.close() @bp.route('/zopk-api/search-news-stream') @login_required @role_required(SystemRole.ADMIN) def api_zopk_search_news_stream(): """SSE endpoint for streaming ZOPK news search progress in real-time.""" from zopk_news_service import ZOPKNewsService query = request.args.get('query', 'Zielony Okręg Przemysłowy Kaszubia') user_id = current_user.id # Capture before generator (proxy loses context inside) def generate(): db = SessionLocal() try: # Create fetch job record job_id = str(uuid.uuid4())[:8] fetch_job = ZOPKNewsFetchJob( job_id=job_id, search_query=query, search_api='multi_source_sse', triggered_by='admin', triggered_by_user=user_id, status='running', started_at=datetime.now() ) db.add(fetch_job) db.commit() progress_queue = queue.Queue() result_holder = [None] error_holder = [None] def on_progress(phase, message, current, total): progress_queue.put((phase, message, current, total)) def run_search(): search_db = SessionLocal() try: service = ZOPKNewsService(search_db) result_holder[0] = service.search_all_sources( query, user_id=user_id, progress_callback=on_progress ) except Exception as e: error_holder[0] = e finally: search_db.close() progress_queue.put(None) # sentinel thread = threading.Thread(target=run_search, daemon=True) thread.start() # Stream progress events while True: try: item = progress_queue.get(timeout=1) except queue.Empty: yield f"data: {json.dumps({'type': 'heartbeat'})}\n\n" continue if item is None: break phase, message, current, total = item yield f"data: {json.dumps({'type': 'progress', 'phase': phase, 'message': message, 'current': current, 'total': total})}\n\n" thread.join(timeout=10) if error_holder[0]: # Update job on error try: fetch_job.status = 'failed' fetch_job.error_message = str(error_holder[0]) fetch_job.completed_at = datetime.now() db.commit() except Exception: pass yield f"data: {json.dumps({'type': 'error', 'message': str(error_holder[0])})}\n\n" else: results = result_holder[0] # Update fetch job fetch_job.results_found = results['total_found'] fetch_job.results_new = results['saved_new'] fetch_job.results_approved = results['auto_approved'] fetch_job.status = 'completed' fetch_job.completed_at = datetime.now() db.commit() yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" except Exception as e: logger.error(f"SSE stream error: {e}") yield f"data: {json.dumps({'type': 'error', 'message': 'Wystąpił błąd podczas wyszukiwania'})}\n\n" finally: db.close() return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', 'Connection': 'keep-alive' } ) @bp.route('/zopk/news/scrape-stats') @login_required @role_required(SystemRole.ADMIN) def admin_zopk_scrape_stats(): """ Get content scraping statistics. Returns JSON with: - total_approved: Total approved/auto_approved articles - scraped: Successfully scraped articles - pending: Articles waiting to be scraped - failed: Failed scraping attempts - skipped: Skipped (social media, paywalls) - ready_for_extraction: Scraped but not yet processed for knowledge """ from zopk_content_scraper import get_scrape_stats db = SessionLocal() try: stats = get_scrape_stats(db) return jsonify({ 'success': True, **stats }) except Exception as e: logger.error(f"Error getting scrape stats: {e}") return jsonify({'success': False, 'error': str(e)}), 500 finally: db.close() @bp.route('/zopk/news/scrape-content', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_scrape_content(): """ Batch scrape article content from source URLs. Request JSON: - limit: int (default 50) - max articles to scrape - force: bool (default false) - re-scrape already scraped Response: - scraped: number of successfully scraped - failed: number of failures - skipped: number of skipped (social media, etc.) - errors: list of error details - scraped_articles: list of scraped article info """ from zopk_content_scraper import ZOPKContentScraper db = SessionLocal() try: data = request.get_json() or {} limit = min(data.get('limit', 50), 100) # Max 100 at once force = data.get('force', False) scraper = ZOPKContentScraper(db, user_id=current_user.id) result = scraper.batch_scrape(limit=limit, force=force) return jsonify({ 'success': True, 'message': f"Scraping zakończony: {result['scraped']} pobrano, " f"{result['failed']} błędów, {result['skipped']} pominięto", **result }) except Exception as e: db.rollback() logger.error(f"Error in batch scrape: {e}") return jsonify({'success': False, 'error': str(e)}), 500 finally: db.close() @bp.route('/zopk/news//scrape', methods=['POST']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_scrape_single(news_id): """ Scrape content for a single article. """ from zopk_content_scraper import ZOPKContentScraper db = SessionLocal() try: scraper = ZOPKContentScraper(db, user_id=current_user.id) result = scraper.scrape_article(news_id) if result.success: return jsonify({ 'success': True, 'message': f"Pobrano treść: {result.word_count} słów", 'word_count': result.word_count, 'status': result.status }) else: return jsonify({ 'success': False, 'error': result.error, 'status': result.status }), 400 except Exception as e: db.rollback() logger.error(f"Error scraping article {news_id}: {e}") return jsonify({'success': False, 'error': str(e)}), 500 finally: db.close() @bp.route('/zopk/news/scrape-content/stream', methods=['GET']) @login_required @role_required(SystemRole.ADMIN) def admin_zopk_news_scrape_stream(): """ Stream scraping progress using Server-Sent Events. Query params: - limit: int (default 50) - force: bool (default false) """ from zopk_content_scraper import ZOPKContentScraper, MAX_RETRY_ATTEMPTS limit = request.args.get('limit', 50, type=int) force = request.args.get('force', 'false').lower() == 'true' user_id = current_user.id # Capture before generator (proxy loses context inside) def generate(): import json import time as _time db = SessionLocal() try: scraper = ZOPKContentScraper(db, user_id=user_id) # Get articles to scrape query = db.query(ZOPKNews).filter( ZOPKNews.status.in_(['approved', 'auto_approved']) ) if not force: query = query.filter( ZOPKNews.scrape_status.in_(['pending', 'failed']), ZOPKNews.scrape_attempts < MAX_RETRY_ATTEMPTS ) articles = query.order_by(ZOPKNews.published_at.desc()).limit(limit).all() total = len(articles) if total == 0: yield f"data: {json.dumps({'status': 'complete', 'message': 'Brak artykułów do scrapowania', 'total': 0, 'details': {'success': 0, 'failed': 0, 'skipped': 0}}, ensure_ascii=False)}\n\n" return yield f"data: {json.dumps({'current': 0, 'total': total, 'percent': 0, 'status': 'processing', 'message': f'Rozpoczynam scraping {total} artykułów...'}, ensure_ascii=False)}\n\n" start_time = _time.time() scraped = 0 failed = 0 skipped = 0 for i, article in enumerate(articles): result = scraper.scrape_article(article.id) if result.success: scraped += 1 status = 'success' msg = f'✓ {result.word_count} słów: {article.title[:50]}' elif result.status == 'skipped': skipped += 1 status = 'skipped' msg = f'⊘ Pominięto: {article.title[:50]}' else: skipped += 1 status = 'skipped' msg = f'⊘ {(result.error or "Niedostępny")}: {article.title[:40]}' pct = round((i + 1) / total * 100, 1) yield f"data: {json.dumps({'current': i + 1, 'total': total, 'percent': pct, 'status': status, 'message': msg, 'details': {'success': scraped, 'scraped': scraped, 'failed': failed, 'skipped': skipped}}, ensure_ascii=False)}\n\n" processing_time = round(_time.time() - start_time, 2) yield f"data: {json.dumps({'current': total, 'total': total, 'percent': 100, 'status': 'complete', 'message': f'Zakończono: {scraped} pobrano, {failed} błędów, {skipped} pominięto', 'details': {'success': scraped, 'scraped': scraped, 'failed': failed, 'skipped': skipped, 'processing_time': processing_time}}, ensure_ascii=False)}\n\n" except Exception as e: logger.error(f"Error in scrape stream: {e}") yield f"data: {json.dumps({'status': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n" finally: db.close() return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } )