refactor: Migrate announcements routes to public blueprint

- Created blueprints/public/routes_announcements.py with 2 routes:
  - /ogloszenia (announcements_list)
  - /ogloszenia/<slug> (announcement_detail)
- Added endpoint aliases for backward compatibility
- Removed ~130 lines from app.py (7506 -> 7378)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-01-31 17:37:08 +01:00
parent 0337d1a0bb
commit 3c5795ee4a
4 changed files with 152 additions and 131 deletions

134
app.py
View File

@ -7020,139 +7020,11 @@ def _old_admin_announcements_delete(id):
db.close() db.close()
# ============================================================ # ============================================================
# PUBLIC ANNOUNCEMENTS PAGE # PUBLIC ANNOUNCEMENTS - MOVED TO blueprints/public/routes_announcements.py
# ============================================================ # ============================================================
# Routes: /ogloszenia, /ogloszenia/<slug>
@app.route('/ogloszenia')
@login_required
@limiter.limit("60 per minute")
def announcements_list():
"""Strona z listą ogłoszeń dla zalogowanych członków"""
from database import Announcement
from sqlalchemy import or_, desc
db = SessionLocal()
try:
page = request.args.get('page', 1, type=int)
category = request.args.get('category', '')
per_page = 12
# Base query: published and not expired
query = db.query(Announcement).filter(
Announcement.status == 'published',
or_(
Announcement.expires_at.is_(None),
Announcement.expires_at > datetime.now()
)
)
# Filter by category (supports both single category and categories array)
# Use PostgreSQL @> operator for array contains
if category and category in Announcement.CATEGORIES:
from sqlalchemy.dialects.postgresql import array as pg_array
query = query.filter(Announcement.categories.op('@>')(pg_array([category])))
# Sort: pinned first, then by published_at desc
query = query.order_by(
desc(Announcement.is_pinned),
desc(Announcement.published_at)
)
# Pagination
total = query.count()
total_pages = (total + per_page - 1) // per_page
announcements = query.offset((page - 1) * per_page).limit(per_page).all()
return render_template('announcements/list.html',
announcements=announcements,
current_category=category,
categories=Announcement.CATEGORIES,
category_labels=Announcement.CATEGORY_LABELS,
page=page,
total_pages=total_pages,
total=total)
finally:
db.close()
@app.route('/ogloszenia/<slug>')
@login_required
@limiter.limit("60 per minute")
def announcement_detail(slug):
"""Szczegóły ogłoszenia dla zalogowanych członków"""
from database import Announcement, AnnouncementRead, User
from sqlalchemy import or_, desc, func
db = SessionLocal()
try:
announcement = db.query(Announcement).filter(
Announcement.slug == slug,
Announcement.status == 'published',
or_(
Announcement.expires_at.is_(None),
Announcement.expires_at > datetime.now()
)
).first()
if not announcement:
flash('Nie znaleziono ogłoszenia lub zostało usunięte.', 'error')
return redirect(url_for('announcements_list'))
# Increment views counter
announcement.views_count = (announcement.views_count or 0) + 1
# Record read by current user (if not already recorded)
existing_read = db.query(AnnouncementRead).filter(
AnnouncementRead.announcement_id == announcement.id,
AnnouncementRead.user_id == current_user.id
).first()
if not existing_read:
new_read = AnnouncementRead(
announcement_id=announcement.id,
user_id=current_user.id
)
db.add(new_read)
db.commit()
# Get readers (users who read this announcement)
readers = db.query(AnnouncementRead).filter(
AnnouncementRead.announcement_id == announcement.id
).order_by(desc(AnnouncementRead.read_at)).all()
# Get total registered users count for percentage calculation
total_users = db.query(func.count(User.id)).filter(
User.is_active == True,
User.is_verified == True
).scalar() or 1
readers_count = len(readers)
read_percentage = round((readers_count / total_users) * 100, 1) if total_users > 0 else 0
# Get other recent announcements for sidebar
other_announcements = db.query(Announcement).filter(
Announcement.status == 'published',
Announcement.id != announcement.id,
or_(
Announcement.expires_at.is_(None),
Announcement.expires_at > datetime.now()
)
).order_by(desc(Announcement.published_at)).limit(5).all()
return render_template('announcements/detail.html',
announcement=announcement,
other_announcements=other_announcements,
category_labels=Announcement.CATEGORY_LABELS,
readers=readers,
readers_count=readers_count,
total_users=total_users,
read_percentage=read_percentage)
finally:
db.close()
# ============================================================ # ============================================================

View File

@ -149,6 +149,9 @@ def register_blueprints(app):
'zopk_index': 'public.zopk_index', 'zopk_index': 'public.zopk_index',
'zopk_project_detail': 'public.zopk_project_detail', 'zopk_project_detail': 'public.zopk_project_detail',
'zopk_news_list': 'public.zopk_news_list', 'zopk_news_list': 'public.zopk_news_list',
# Announcements
'announcements_list': 'public.announcements_list',
'announcement_detail': 'public.announcement_detail',
}) })
logger.info("Created public endpoint aliases") logger.info("Created public endpoint aliases")
except ImportError as e: except ImportError as e:

View File

@ -11,3 +11,4 @@ bp = Blueprint('public', __name__)
from . import routes # noqa: E402, F401 from . import routes # noqa: E402, F401
from . import routes_zopk # noqa: E402, F401 from . import routes_zopk # noqa: E402, F401
from . import routes_announcements # noqa: E402, F401

View File

@ -0,0 +1,145 @@
"""
Announcements Routes - Public blueprint
Migrated from app.py as part of the blueprint refactoring.
Contains public-facing announcement routes for logged-in members.
"""
import logging
from datetime import datetime
from flask import flash, redirect, render_template, request, url_for
from flask_login import current_user, login_required
from sqlalchemy import desc, func, or_
from sqlalchemy.dialects.postgresql import array as pg_array
from database import SessionLocal, Announcement, AnnouncementRead, User
from . import bp
logger = logging.getLogger(__name__)
# ============================================================
# PUBLIC ANNOUNCEMENTS PAGE
# ============================================================
@bp.route('/ogloszenia')
@login_required
def announcements_list():
"""Strona z listą ogłoszeń dla zalogowanych członków"""
db = SessionLocal()
try:
page = request.args.get('page', 1, type=int)
category = request.args.get('category', '')
per_page = 12
# Base query: published and not expired
query = db.query(Announcement).filter(
Announcement.status == 'published',
or_(
Announcement.expires_at.is_(None),
Announcement.expires_at > datetime.now()
)
)
# Filter by category (supports both single category and categories array)
# Use PostgreSQL @> operator for array contains
if category and category in Announcement.CATEGORIES:
query = query.filter(Announcement.categories.op('@>')(pg_array([category])))
# Sort: pinned first, then by published_at desc
query = query.order_by(
desc(Announcement.is_pinned),
desc(Announcement.published_at)
)
# Pagination
total = query.count()
total_pages = (total + per_page - 1) // per_page
announcements = query.offset((page - 1) * per_page).limit(per_page).all()
return render_template('announcements/list.html',
announcements=announcements,
current_category=category,
categories=Announcement.CATEGORIES,
category_labels=Announcement.CATEGORY_LABELS,
page=page,
total_pages=total_pages,
total=total)
finally:
db.close()
@bp.route('/ogloszenia/<slug>')
@login_required
def announcement_detail(slug):
"""Szczegóły ogłoszenia dla zalogowanych członków"""
db = SessionLocal()
try:
announcement = db.query(Announcement).filter(
Announcement.slug == slug,
Announcement.status == 'published',
or_(
Announcement.expires_at.is_(None),
Announcement.expires_at > datetime.now()
)
).first()
if not announcement:
flash('Nie znaleziono ogłoszenia lub zostało usunięte.', 'error')
return redirect(url_for('announcements_list'))
# Increment views counter
announcement.views_count = (announcement.views_count or 0) + 1
# Record read by current user (if not already recorded)
existing_read = db.query(AnnouncementRead).filter(
AnnouncementRead.announcement_id == announcement.id,
AnnouncementRead.user_id == current_user.id
).first()
if not existing_read:
new_read = AnnouncementRead(
announcement_id=announcement.id,
user_id=current_user.id
)
db.add(new_read)
db.commit()
# Get readers (users who read this announcement)
readers = db.query(AnnouncementRead).filter(
AnnouncementRead.announcement_id == announcement.id
).order_by(desc(AnnouncementRead.read_at)).all()
# Get total registered users count for percentage calculation
total_users = db.query(func.count(User.id)).filter(
User.is_active == True,
User.is_verified == True
).scalar() or 1
readers_count = len(readers)
read_percentage = round((readers_count / total_users) * 100, 1) if total_users > 0 else 0
# Get other recent announcements for sidebar
other_announcements = db.query(Announcement).filter(
Announcement.status == 'published',
Announcement.id != announcement.id,
or_(
Announcement.expires_at.is_(None),
Announcement.expires_at > datetime.now()
)
).order_by(desc(Announcement.published_at)).limit(5).all()
return render_template('announcements/detail.html',
announcement=announcement,
other_announcements=other_announcements,
category_labels=Announcement.CATEGORY_LABELS,
readers=readers,
readers_count=readers_count,
total_users=total_users,
read_percentage=read_percentage)
finally:
db.close()