Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- HIGH: Fix SQL injection in ZOPK knowledge service (3 functions) — replace f-strings with parameterized queries - MEDIUM: Sanitize tsquery/LIKE input in SearchService to prevent injection - MEDIUM: Add @login_required + @role_required(ADMIN) to /health/full endpoint - MEDIUM: Add @role_required(ADMIN) to ZOPK knowledge search API - MEDIUM: Add bleach HTML sanitization on write for announcements, events, board proceedings (stored XSS via |safe) - MEDIUM: Remove partial API key from Gemini service logs - MEDIUM: Remove @csrf.exempt from chat endpoints, add X-CSRFToken headers in JS - MEDIUM: Add missing CSRF tokens to 3 POST forms (data_request, benefits_form, benefits_list) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
341 lines
12 KiB
Python
341 lines
12 KiB
Python
"""
|
|
Admin Announcements Routes
|
|
===========================
|
|
|
|
Announcements management (ogłoszenia) for admin panel.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from flask import render_template, request, redirect, url_for, flash, jsonify
|
|
from flask_login import login_required, current_user
|
|
|
|
from . import bp
|
|
from database import SessionLocal, Announcement, SystemRole
|
|
from utils.decorators import role_required
|
|
from utils.helpers import sanitize_html
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def generate_slug(title):
|
|
"""
|
|
Generate URL-friendly slug from title.
|
|
Uses unidecode for proper Polish character handling.
|
|
"""
|
|
try:
|
|
from unidecode import unidecode
|
|
text = unidecode(title.lower())
|
|
except ImportError:
|
|
# Fallback without unidecode
|
|
text = title.lower()
|
|
replacements = {
|
|
'ą': 'a', 'ć': 'c', 'ę': 'e', 'ł': 'l', 'ń': 'n',
|
|
'ó': 'o', 'ś': 's', 'ź': 'z', 'ż': 'z'
|
|
}
|
|
for pl, en in replacements.items():
|
|
text = text.replace(pl, en)
|
|
|
|
# Remove special characters, replace spaces with hyphens
|
|
text = re.sub(r'[^\w\s-]', '', text)
|
|
text = re.sub(r'[-\s]+', '-', text).strip('-')
|
|
return text[:200] # Limit slug length
|
|
|
|
|
|
# ============================================================
|
|
# ANNOUNCEMENTS MANAGEMENT
|
|
# ============================================================
|
|
|
|
@bp.route('/announcements')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_announcements():
|
|
"""Admin panel - lista ogłoszeń"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Filters
|
|
status_filter = request.args.get('status', 'all')
|
|
category_filter = request.args.get('category', 'all')
|
|
|
|
query = db.query(Announcement)
|
|
|
|
if status_filter != 'all':
|
|
query = query.filter(Announcement.status == status_filter)
|
|
if category_filter != 'all':
|
|
from sqlalchemy.dialects.postgresql import array as pg_array
|
|
query = query.filter(Announcement.categories.op('@>')(pg_array([category_filter])))
|
|
|
|
# Sort: pinned first, then by created_at desc
|
|
query = query.order_by(
|
|
Announcement.is_pinned.desc(),
|
|
Announcement.created_at.desc()
|
|
)
|
|
|
|
announcements = query.all()
|
|
|
|
return render_template('admin/announcements.html',
|
|
announcements=announcements,
|
|
now=datetime.now(),
|
|
status_filter=status_filter,
|
|
category_filter=category_filter,
|
|
categories=Announcement.CATEGORIES,
|
|
category_labels=Announcement.CATEGORY_LABELS,
|
|
statuses=Announcement.STATUSES,
|
|
status_labels=Announcement.STATUS_LABELS)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/announcements/new', methods=['GET', 'POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_announcements_new():
|
|
"""Admin panel - nowe ogłoszenie"""
|
|
if request.method == 'POST':
|
|
db = SessionLocal()
|
|
try:
|
|
title = request.form.get('title', '').strip()
|
|
excerpt = request.form.get('excerpt', '').strip()
|
|
content = sanitize_html(request.form.get('content', '').strip())
|
|
categories = request.form.getlist('categories')
|
|
if not categories:
|
|
categories = ['internal'] # Default category
|
|
category = categories[0] # Backwards compatibility
|
|
image_url = request.form.get('image_url', '').strip() or None
|
|
external_link = request.form.get('external_link', '').strip() or None
|
|
is_featured = 'is_featured' in request.form
|
|
is_pinned = 'is_pinned' in request.form
|
|
|
|
# Handle expires_at
|
|
expires_at_str = request.form.get('expires_at', '').strip()
|
|
expires_at = None
|
|
if expires_at_str:
|
|
try:
|
|
expires_at = datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
pass
|
|
|
|
# Generate unique slug
|
|
base_slug = generate_slug(title)
|
|
slug = base_slug
|
|
counter = 1
|
|
while db.query(Announcement).filter(Announcement.slug == slug).first():
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
# Determine status based on button clicked
|
|
action = request.form.get('action', 'draft')
|
|
status = 'published' if action == 'publish' else 'draft'
|
|
published_at = datetime.now() if status == 'published' else None
|
|
|
|
announcement = Announcement(
|
|
title=title,
|
|
slug=slug,
|
|
excerpt=excerpt or None,
|
|
content=content,
|
|
category=category,
|
|
categories=categories,
|
|
image_url=image_url,
|
|
external_link=external_link,
|
|
status=status,
|
|
published_at=published_at,
|
|
expires_at=expires_at,
|
|
is_featured=is_featured,
|
|
is_pinned=is_pinned,
|
|
created_by=current_user.id
|
|
)
|
|
|
|
db.add(announcement)
|
|
db.commit()
|
|
|
|
flash(f'Ogłoszenie zostało {"opublikowane" if status == "published" else "zapisane jako szkic"}.', 'success')
|
|
return redirect(url_for('admin.admin_announcements'))
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error creating announcement: {e}")
|
|
flash(f'Błąd podczas tworzenia ogłoszenia: {e}', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
# GET request - show form
|
|
return render_template('admin/announcements_form.html',
|
|
announcement=None,
|
|
categories=Announcement.CATEGORIES,
|
|
category_labels=Announcement.CATEGORY_LABELS)
|
|
|
|
|
|
@bp.route('/announcements/<int:id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_announcements_edit(id):
|
|
"""Admin panel - edycja ogłoszenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
announcement = db.query(Announcement).filter(Announcement.id == id).first()
|
|
if not announcement:
|
|
flash('Nie znaleziono ogłoszenia.', 'error')
|
|
return redirect(url_for('admin.admin_announcements'))
|
|
|
|
if request.method == 'POST':
|
|
announcement.title = request.form.get('title', '').strip()
|
|
announcement.excerpt = request.form.get('excerpt', '').strip() or None
|
|
announcement.content = sanitize_html(request.form.get('content', '').strip())
|
|
categories = request.form.getlist('categories')
|
|
if not categories:
|
|
categories = ['internal'] # Default category
|
|
announcement.categories = categories
|
|
announcement.category = categories[0] # Backwards compatibility
|
|
announcement.image_url = request.form.get('image_url', '').strip() or None
|
|
announcement.external_link = request.form.get('external_link', '').strip() or None
|
|
announcement.is_featured = 'is_featured' in request.form
|
|
announcement.is_pinned = 'is_pinned' in request.form
|
|
|
|
# Handle expires_at
|
|
expires_at_str = request.form.get('expires_at', '').strip()
|
|
if expires_at_str:
|
|
try:
|
|
announcement.expires_at = datetime.strptime(expires_at_str, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
announcement.expires_at = None
|
|
|
|
# Regenerate slug if title changed significantly
|
|
new_slug = generate_slug(announcement.title)
|
|
if new_slug != announcement.slug.split('-')[0]: # Check if base changed
|
|
base_slug = new_slug
|
|
slug = base_slug
|
|
counter = 1
|
|
while db.query(Announcement).filter(
|
|
Announcement.slug == slug,
|
|
Announcement.id != id
|
|
).first():
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
announcement.slug = slug
|
|
|
|
# Handle status change
|
|
action = request.form.get('action', 'save')
|
|
if action == 'publish' and announcement.status != 'published':
|
|
announcement.status = 'published'
|
|
announcement.published_at = datetime.now()
|
|
elif action == 'archive':
|
|
announcement.status = 'archived'
|
|
elif action == 'draft':
|
|
announcement.status = 'draft'
|
|
|
|
announcement.updated_at = datetime.now()
|
|
db.commit()
|
|
|
|
flash('Zmiany zostały zapisane.', 'success')
|
|
return redirect(url_for('admin.admin_announcements'))
|
|
|
|
# GET request - show form
|
|
return render_template('admin/announcements_form.html',
|
|
announcement=announcement,
|
|
categories=Announcement.CATEGORIES,
|
|
category_labels=Announcement.CATEGORY_LABELS)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error editing announcement {id}: {e}")
|
|
flash(f'Błąd: {e}', 'error')
|
|
return redirect(url_for('admin.admin_announcements'))
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/announcements/<int:id>/publish', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_announcements_publish(id):
|
|
"""Publikacja ogłoszenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
announcement = db.query(Announcement).filter(Announcement.id == id).first()
|
|
if not announcement:
|
|
return jsonify({'success': False, 'error': 'Nie znaleziono ogłoszenia'}), 404
|
|
|
|
announcement.status = 'published'
|
|
if not announcement.published_at:
|
|
announcement.published_at = datetime.now()
|
|
announcement.updated_at = datetime.now()
|
|
db.commit()
|
|
|
|
# Notify all users about new announcement
|
|
try:
|
|
from utils.notifications import notify_all_users_announcement
|
|
notify_count = notify_all_users_announcement(
|
|
announcement_id=announcement.id,
|
|
title=announcement.title,
|
|
category=announcement.category
|
|
)
|
|
logger.info(f"Sent {notify_count} notifications for announcement: {announcement.title}")
|
|
return jsonify({'success': True, 'message': f'Ogłoszenie zostało opublikowane. Wysłano {notify_count} powiadomień.'})
|
|
except ImportError:
|
|
return jsonify({'success': True, 'message': 'Ogłoszenie zostało opublikowane.'})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error publishing announcement {id}: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/announcements/<int:id>/archive', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_announcements_archive(id):
|
|
"""Archiwizacja ogłoszenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
announcement = db.query(Announcement).filter(Announcement.id == id).first()
|
|
if not announcement:
|
|
return jsonify({'success': False, 'error': 'Nie znaleziono ogłoszenia'}), 404
|
|
|
|
announcement.status = 'archived'
|
|
announcement.updated_at = datetime.now()
|
|
db.commit()
|
|
|
|
return jsonify({'success': True, 'message': 'Ogłoszenie zostało zarchiwizowane'})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error archiving announcement {id}: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/announcements/<int:id>/delete', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_announcements_delete(id):
|
|
"""Usunięcie ogłoszenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
announcement = db.query(Announcement).filter(Announcement.id == id).first()
|
|
if not announcement:
|
|
return jsonify({'success': False, 'error': 'Nie znaleziono ogłoszenia'}), 404
|
|
|
|
db.delete(announcement)
|
|
db.commit()
|
|
|
|
return jsonify({'success': True, 'message': 'Ogłoszenie zostało usunięte'})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error deleting announcement {id}: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
finally:
|
|
db.close()
|