nordabiz/blueprints/admin/routes.py
Maciej Pienczyn 327cb5a790
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat(calendar): add attachment support to events
Adds file attachment capability to NordaEvent model (attachment_filename,
attachment_path columns). Admin can upload PDF/DOCX when creating events.
Users see a download link on the event detail page.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 17:22:18 +01:00

1038 lines
38 KiB
Python

"""
Admin Routes
============
Admin panel: users, recommendations, fees, calendar management.
"""
import os
import csv
import json
import re
import logging
import secrets
import string
from io import StringIO
from datetime import datetime, timedelta
from decimal import Decimal
from flask import render_template, request, redirect, url_for, flash, jsonify, Response
from flask_login import login_required, current_user
from werkzeug.security import generate_password_hash
from . import bp
from database import (
SessionLocal, User, Company, CompanyRecommendation,
MembershipFee, MembershipFeeConfig, NordaEvent, EventAttendee,
SystemRole, UserCompany
)
from utils.decorators import role_required
from utils.helpers import sanitize_html
import gemini_service
# Logger
logger = logging.getLogger(__name__)
# Polish month names for fees
MONTHS_PL = [
(1, 'Styczen'), (2, 'Luty'), (3, 'Marzec'), (4, 'Kwiecien'),
(5, 'Maj'), (6, 'Czerwiec'), (7, 'Lipiec'), (8, 'Sierpien'),
(9, 'Wrzesien'), (10, 'Pazdziernik'), (11, 'Listopad'), (12, 'Grudzien')
]
# ============================================================
# RECOMMENDATIONS ADMIN ROUTES
# ============================================================
@bp.route('/recommendations')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_recommendations():
"""Admin panel for recommendations moderation"""
db = SessionLocal()
try:
recommendations = db.query(CompanyRecommendation).order_by(
CompanyRecommendation.created_at.desc()
).all()
pending_recommendations = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'pending'
).order_by(CompanyRecommendation.created_at.desc()).all()
total_recommendations = len(recommendations)
pending_count = len(pending_recommendations)
approved_count = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'approved'
).count()
rejected_count = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'rejected'
).count()
logger.info(f"Admin {current_user.email} accessed recommendations panel - {pending_count} pending")
return render_template(
'admin/recommendations.html',
recommendations=recommendations,
pending_recommendations=pending_recommendations,
total_recommendations=total_recommendations,
pending_count=pending_count,
approved_count=approved_count,
rejected_count=rejected_count
)
finally:
db.close()
@bp.route('/recommendations/<int:recommendation_id>/approve', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_recommendation_approve(recommendation_id):
"""Approve a recommendation"""
db = SessionLocal()
try:
recommendation = db.query(CompanyRecommendation).filter(
CompanyRecommendation.id == recommendation_id
).first()
if not recommendation:
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
recommendation.status = 'approved'
recommendation.moderated_by = current_user.id
recommendation.moderated_at = datetime.utcnow()
recommendation.rejection_reason = None
db.commit()
logger.info(f"Admin {current_user.email} approved recommendation #{recommendation_id}")
return jsonify({
'success': True,
'message': 'Rekomendacja zatwierdzona'
})
finally:
db.close()
@bp.route('/recommendations/<int:recommendation_id>/reject', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_recommendation_reject(recommendation_id):
"""Reject a recommendation"""
db = SessionLocal()
try:
recommendation = db.query(CompanyRecommendation).filter(
CompanyRecommendation.id == recommendation_id
).first()
if not recommendation:
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
rejection_reason = request.json.get('reason', '') if request.is_json else request.form.get('reason', '')
recommendation.status = 'rejected'
recommendation.moderated_by = current_user.id
recommendation.moderated_at = datetime.utcnow()
recommendation.rejection_reason = rejection_reason.strip() if rejection_reason else None
db.commit()
logger.info(f"Admin {current_user.email} rejected recommendation #{recommendation_id}")
return jsonify({
'success': True,
'message': 'Rekomendacja odrzucona'
})
finally:
db.close()
# ============================================================
# USER MANAGEMENT ADMIN ROUTES
# ============================================================
@bp.route('/users')
@login_required
@role_required(SystemRole.ADMIN)
def admin_users():
"""Admin panel for user management"""
db = SessionLocal()
try:
users = db.query(User).order_by(User.created_at.desc()).all()
companies = db.query(Company).order_by(Company.name).all()
total_users = len(users)
admin_count = sum(1 for u in users if u.has_role(SystemRole.ADMIN))
verified_count = sum(1 for u in users if u.is_verified)
unverified_count = total_users - verified_count
pending_company_count = sum(1 for u in users if u.company_id and u.company_role == 'NONE')
logger.info(f"Admin {current_user.email} accessed users panel - {total_users} users")
return render_template(
'admin/users.html',
users=users,
companies=companies,
total_users=total_users,
admin_count=admin_count,
verified_count=verified_count,
unverified_count=unverified_count,
pending_company_count=pending_company_count
)
finally:
db.close()
@bp.route('/users/add', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_add():
"""Create a new user (admin only)"""
db = SessionLocal()
try:
data = request.get_json() or {}
email = data.get('email', '').strip().lower()
if not email:
return jsonify({'success': False, 'error': 'Email jest wymagany'}), 400
existing_user = db.query(User).filter(User.email == email).first()
if existing_user:
return jsonify({'success': False, 'error': 'Użytkownik z tym adresem email już istnieje'}), 400
password_chars = string.ascii_letters + string.digits + "!@#$%^&*"
generated_password = ''.join(secrets.choice(password_chars) for _ in range(16))
password_hash = generate_password_hash(generated_password, method='pbkdf2:sha256')
new_user = User(
email=email,
password_hash=password_hash,
name=data.get('name', '').strip() or None,
company_id=data.get('company_id') or None,
is_verified=data.get('is_verified', True),
is_active=True
)
db.add(new_user)
role_name = data.get('role', 'MEMBER')
try:
new_user.set_role(SystemRole[role_name])
except KeyError:
new_user.set_role(SystemRole.MEMBER)
db.commit()
db.refresh(new_user)
logger.info(f"Admin {current_user.email} created new user: {email} (ID: {new_user.id})")
return jsonify({
'success': True,
'user_id': new_user.id,
'generated_password': generated_password,
'message': f'Użytkownik {email} został utworzony'
})
except Exception as e:
db.rollback()
logger.error(f"Error creating user: {e}")
return jsonify({'success': False, 'error': 'Błąd podczas tworzenia użytkownika'}), 500
finally:
db.close()
@bp.route('/users/<int:user_id>/toggle-admin', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_toggle_admin(user_id):
"""Toggle admin status for a user"""
if user_id == current_user.id:
return jsonify({'success': False, 'error': 'Nie możesz zmienić własnych uprawnień'}), 400
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
if user.has_role(SystemRole.ADMIN):
user.set_role(SystemRole.MEMBER)
else:
user.set_role(SystemRole.ADMIN)
db.commit()
is_now_admin = user.has_role(SystemRole.ADMIN)
logger.info(f"Admin {current_user.email} {'granted' if is_now_admin else 'revoked'} admin for user {user.email}")
return jsonify({
'success': True,
'is_admin': is_now_admin, # Backward compat for frontend
'role': user.role,
'message': f"{'Nadano' if is_now_admin else 'Odebrano'} uprawnienia admina"
})
finally:
db.close()
@bp.route('/users/<int:user_id>/toggle-verified', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_toggle_verified(user_id):
"""Toggle verified status for a user"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
user.is_verified = not user.is_verified
if user.is_verified:
user.verified_at = datetime.utcnow()
else:
user.verified_at = None
db.commit()
logger.info(f"Admin {current_user.email} {'verified' if user.is_verified else 'unverified'} user {user.email}")
return jsonify({
'success': True,
'is_verified': user.is_verified,
'message': f"Użytkownik {'zweryfikowany' if user.is_verified else 'niezweryfikowany'}"
})
finally:
db.close()
@bp.route('/users/<int:user_id>/toggle-rada-member', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_toggle_rada_member(user_id):
"""Toggle Rada Izby (Board Council) membership for a user"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
user.is_rada_member = not user.is_rada_member
db.commit()
logger.info(f"Admin {current_user.email} {'added' if user.is_rada_member else 'removed'} user {user.email} {'to' if user.is_rada_member else 'from'} Rada Izby")
return jsonify({
'success': True,
'is_rada_member': user.is_rada_member,
'message': f"Użytkownik {'dodany do' if user.is_rada_member else 'usunięty z'} Rady Izby"
})
finally:
db.close()
@bp.route('/users/<int:user_id>/update', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_update(user_id):
"""Update user data (name, email)"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
data = request.get_json() or {}
if 'name' in data:
user.name = data['name'].strip() if data['name'] else None
if 'email' in data:
new_email = data['email'].strip().lower()
if new_email and new_email != user.email:
existing = db.query(User).filter(User.email == new_email, User.id != user_id).first()
if existing:
return jsonify({'success': False, 'error': 'Ten email jest już używany'}), 400
user.email = new_email
if 'phone' in data:
user.phone = data['phone'].strip() if data['phone'] else None
db.commit()
logger.info(f"Admin {current_user.email} updated user {user.email}: name={user.name}, phone={user.phone}")
return jsonify({
'success': True,
'user': {
'id': user.id,
'name': user.name,
'email': user.email,
'phone': user.phone
},
'message': 'Dane użytkownika zaktualizowane'
})
except Exception as e:
db.rollback()
logger.error(f"Error updating user {user_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/users/<int:user_id>/assign-company', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_assign_company(user_id):
"""Assign a company to a user"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
data = request.get_json() or {}
company_id = data.get('company_id')
if company_id:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie znaleziona'}), 404
user.company_id = company_id
company_name = company.name
# Sync user_companies table
existing_uc = db.query(UserCompany).filter_by(user_id=user.id, company_id=company_id).first()
if not existing_uc:
# Remove old primary associations
db.query(UserCompany).filter_by(user_id=user.id, is_primary=True).update({'is_primary': False})
uc = UserCompany(user_id=user.id, company_id=company_id, role='MANAGER', is_primary=True)
db.add(uc)
else:
existing_uc.is_primary = True
# Auto-set Norda membership when linking to active company
if company.status == 'active':
user.is_norda_member = True
else:
user.company_id = None
company_name = None
# Remove primary flag from all user_companies
db.query(UserCompany).filter_by(user_id=user.id, is_primary=True).update({'is_primary': False})
# Clear Norda membership if user has no remaining active company links
remaining = db.query(UserCompany).filter(
UserCompany.user_id == user.id,
UserCompany.company_id != None
).join(Company).filter(Company.status == 'active').first()
if not remaining:
user.is_norda_member = False
db.commit()
logger.info(f"Admin {current_user.email} assigned company '{company_name}' to user {user.email}")
return jsonify({
'success': True,
'company_name': company_name,
'message': f"Przypisano firmę: {company_name}" if company_name else "Odłączono od firmy"
})
finally:
db.close()
@bp.route('/users/list-all')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_users_list_all():
"""Get all users as JSON (for dropdowns)"""
db = SessionLocal()
try:
users = db.query(User).order_by(User.name, User.email).all()
return jsonify({
'success': True,
'users': [{
'id': u.id,
'name': u.name,
'email': u.email,
'company_id': u.company_id
} for u in users]
})
finally:
db.close()
@bp.route('/users/<int:user_id>/delete', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_delete(user_id):
"""Delete a user and all dependent records."""
from sqlalchemy import text
if user_id == current_user.id:
return jsonify({'success': False, 'error': 'Nie możesz usunąć własnego konta'}), 400
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
email = user.email
# Expunge user from ORM session to prevent backref cascades
db.expunge(user)
# Clean up FK references with NO ACTION that would block deletion.
# Tables with CASCADE in DB are handled automatically.
# 1) Nullable FKs → SET NULL
nullable_fk_updates = [
"UPDATE announcements SET author_id = NULL WHERE author_id = :uid",
"UPDATE board_documents SET uploaded_by = NULL WHERE uploaded_by = :uid",
"UPDATE board_documents SET updated_by = NULL WHERE updated_by = :uid",
"UPDATE board_meetings SET chairperson_id = NULL WHERE chairperson_id = :uid",
"UPDATE board_meetings SET secretary_id = NULL WHERE secretary_id = :uid",
"UPDATE board_meetings SET created_by = NULL WHERE created_by = :uid",
"UPDATE board_meetings SET updated_by = NULL WHERE updated_by = :uid",
"UPDATE classified_questions SET answered_by = NULL WHERE answered_by = :uid",
"UPDATE company_data_requests SET reviewed_by_id = NULL WHERE reviewed_by_id = :uid",
"UPDATE company_recommendations SET moderated_by = NULL WHERE moderated_by = :uid",
"UPDATE forum_topics SET edited_by = NULL WHERE edited_by = :uid",
"UPDATE forum_topics SET deleted_by = NULL WHERE deleted_by = :uid",
"UPDATE forum_topics SET status_changed_by = NULL WHERE status_changed_by = :uid",
"UPDATE forum_replies SET edited_by = NULL WHERE edited_by = :uid",
"UPDATE forum_replies SET deleted_by = NULL WHERE deleted_by = :uid",
"UPDATE forum_replies SET marked_as_solution_by = NULL WHERE marked_as_solution_by = :uid",
"UPDATE forum_reports SET reviewed_by = NULL WHERE reviewed_by = :uid",
"UPDATE it_audits SET audited_by = NULL WHERE audited_by = :uid",
"UPDATE maturity_assessments SET assessed_by_user_id = NULL WHERE assessed_by_user_id = :uid",
"UPDATE ai_enrichment_proposals SET reviewed_by_id = NULL WHERE reviewed_by_id = :uid",
"UPDATE membership_applications SET reviewed_by_id = NULL WHERE reviewed_by_id = :uid",
"UPDATE membership_applications SET proposed_changes_by_id = NULL WHERE proposed_changes_by_id = :uid",
"UPDATE membership_fee_config SET created_by = NULL WHERE created_by = :uid",
"UPDATE membership_fees SET recorded_by = NULL WHERE recorded_by = :uid",
"UPDATE norda_events SET created_by = NULL WHERE created_by = :uid",
"UPDATE user_company_permissions SET granted_by_id = NULL WHERE granted_by_id = :uid",
"UPDATE zopk_company_links SET created_by = NULL WHERE created_by = :uid",
"UPDATE zopk_knowledge_chunks SET verified_by = NULL WHERE verified_by = :uid",
"UPDATE zopk_knowledge_extraction_jobs SET triggered_by_user = NULL WHERE triggered_by_user = :uid",
"UPDATE zopk_knowledge_relations SET verified_by = NULL WHERE verified_by = :uid",
"UPDATE zopk_milestones SET verified_by = NULL WHERE verified_by = :uid",
"UPDATE zopk_news SET moderated_by = NULL WHERE moderated_by = :uid",
"UPDATE zopk_news_fetch_jobs SET triggered_by_user = NULL WHERE triggered_by_user = :uid",
"UPDATE zopk_resources SET uploaded_by = NULL WHERE uploaded_by = :uid",
]
# 2) NOT NULL FKs without CASCADE → DELETE records
not_null_fk_deletes = [
"DELETE FROM ai_api_costs WHERE user_id = :uid",
"DELETE FROM ai_chat_feedback WHERE user_id = :uid",
"DELETE FROM ai_usage_logs WHERE user_id = :uid",
"DELETE FROM forum_edit_history WHERE editor_id = :uid",
"DELETE FROM forum_attachments WHERE uploaded_by = :uid",
"DELETE FROM forum_reports WHERE reporter_id = :uid",
"DELETE FROM forum_replies WHERE author_id = :uid",
"DELETE FROM forum_topics WHERE author_id = :uid",
]
for sql in nullable_fk_updates + not_null_fk_deletes:
try:
db.execute(text(sql), {"uid": user_id})
except Exception:
pass # Table may not exist yet
# Use raw SQL to bypass ORM relationship cascades that try SET NULL
db.execute(text("DELETE FROM users WHERE id = :uid"), {"uid": user_id})
db.commit()
logger.info(f"Admin {current_user.email} deleted user {email}")
return jsonify({
'success': True,
'message': f"Użytkownik {email} został usunięty"
})
except Exception as e:
db.rollback()
logger.error(f"Error deleting user {user_id}: {e}")
return jsonify({'success': False, 'error': f'Błąd: {str(e)}'}), 500
finally:
db.close()
@bp.route('/users/<int:user_id>/reset-password', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_reset_password(user_id):
"""Generate password reset token and optionally send email"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
reset_token = secrets.token_urlsafe(32)
user.reset_token = reset_token
user.reset_token_expires = datetime.now() + timedelta(hours=24)
db.commit()
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
reset_url = f"{base_url}/reset-password/{reset_token}"
logger.info(f"Admin {current_user.email} generated reset token for user {user.email}: {reset_token[:8]}...")
# Check if admin requested sending email
data = request.get_json(silent=True) or {}
send_email = data.get('send_email', False)
email_sent = False
if send_email:
import email_service
if email_service.is_configured():
email_sent = email_service.send_password_reset_email(user.email, reset_url, admin_initiated=True)
if email_sent:
logger.info(f"Password reset email sent to {user.email} by admin {current_user.email}")
else:
logger.warning(f"Failed to send password reset email to {user.email}")
else:
logger.warning("Email service not configured, cannot send reset email")
message = "Email z linkiem do resetu hasła wysłany" if email_sent else "Link do resetu hasła wygenerowany (ważny 24 godziny)"
return jsonify({
'success': True,
'reset_url': reset_url,
'email_sent': email_sent,
'message': message
})
finally:
db.close()
@bp.route('/users/<int:user_id>/set-password', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_user_set_password(user_id):
"""Admin directly sets a new password for a user"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'success': False, 'error': 'Użytkownik nie znaleziony'}), 404
data = request.get_json()
new_password = data.get('password', '').strip()
if len(new_password) < 8:
return jsonify({'success': False, 'error': 'Hasło musi mieć min. 8 znaków'}), 400
user.password_hash = generate_password_hash(new_password, method='pbkdf2:sha256')
user.reset_token = None
user.reset_token_expires = None
db.commit()
logger.info(f"Admin {current_user.email} set new password for user {user.email}")
return jsonify({'success': True, 'message': 'Hasło zostało zmienione'})
except Exception as e:
db.rollback()
logger.error(f"Error setting password for user {user_id}: {e}")
return jsonify({'success': False, 'error': f'Błąd: {str(e)}'}), 500
finally:
db.close()
# ============================================================
# MEMBERSHIP FEES ADMIN
# ============================================================
@bp.route('/fees')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_fees():
"""Admin panel for membership fee management"""
db = SessionLocal()
try:
from sqlalchemy import func, case
year = request.args.get('year', datetime.now().year, type=int)
month = request.args.get('month', type=int)
status_filter = request.args.get('status', '')
companies = db.query(Company).filter(Company.status == 'active').order_by(Company.name).all()
fee_query = db.query(MembershipFee).filter(MembershipFee.fee_year == year)
if month:
fee_query = fee_query.filter(MembershipFee.fee_month == month)
fees = {(f.company_id, f.fee_month): f for f in fee_query.all()}
companies_fees = []
for company in companies:
if month:
fee = fees.get((company.id, month))
companies_fees.append({
'company': company,
'fee': fee,
'status': fee.status if fee else 'brak'
})
else:
company_data = {'company': company, 'months': {}}
for m in range(1, 13):
fee = fees.get((company.id, m))
company_data['months'][m] = fee
companies_fees.append(company_data)
if status_filter and month:
if status_filter == 'paid':
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'paid']
elif status_filter == 'pending':
companies_fees = [cf for cf in companies_fees if cf.get('status') in ('pending', 'brak')]
elif status_filter == 'overdue':
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'overdue']
total_companies = len(companies)
if month:
month_fees = [cf.get('fee') for cf in companies_fees if cf.get('fee')]
paid_count = sum(1 for f in month_fees if f and f.status == 'paid')
pending_count = total_companies - paid_count
total_due = sum(float(f.amount) for f in month_fees if f) if month_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in month_fees if f) if month_fees else Decimal(0)
else:
all_fees = list(fees.values())
paid_count = sum(1 for f in all_fees if f.status == 'paid')
pending_count = len(all_fees) - paid_count
total_due = sum(float(f.amount) for f in all_fees) if all_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in all_fees) if all_fees else Decimal(0)
fee_config = db.query(MembershipFeeConfig).filter(
MembershipFeeConfig.scope == 'global',
MembershipFeeConfig.valid_until == None
).first()
default_fee = float(fee_config.monthly_amount) if fee_config else 100.00
return render_template(
'admin/fees.html',
companies_fees=companies_fees,
year=year,
month=month,
status_filter=status_filter,
total_companies=total_companies,
paid_count=paid_count,
pending_count=pending_count,
total_due=total_due,
total_paid=total_paid,
default_fee=default_fee,
years=list(range(2024, datetime.now().year + 2)),
months=MONTHS_PL
)
finally:
db.close()
@bp.route('/fees/generate', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_fees_generate():
"""Generate fee records for all companies for a given month"""
db = SessionLocal()
try:
year = request.form.get('year', type=int)
month = request.form.get('month', type=int)
if not year or not month:
return jsonify({'success': False, 'error': 'Brak roku lub miesiaca'}), 400
fee_config = db.query(MembershipFeeConfig).filter(
MembershipFeeConfig.scope == 'global',
MembershipFeeConfig.valid_until == None
).first()
default_fee = fee_config.monthly_amount if fee_config else 100.00
companies = db.query(Company).filter(Company.status == 'active').all()
created = 0
for company in companies:
existing = db.query(MembershipFee).filter(
MembershipFee.company_id == company.id,
MembershipFee.fee_year == year,
MembershipFee.fee_month == month
).first()
if not existing:
fee = MembershipFee(
company_id=company.id,
fee_year=year,
fee_month=month,
amount=default_fee,
status='pending'
)
db.add(fee)
created += 1
db.commit()
return jsonify({
'success': True,
'message': f'Utworzono {created} rekordow skladek'
})
except Exception as e:
db.rollback()
logger.error(f"Error generating fees: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/fees/<int:fee_id>/mark-paid', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_fees_mark_paid(fee_id):
"""Mark a fee as paid"""
db = SessionLocal()
try:
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
if not fee:
return jsonify({'success': False, 'error': 'Nie znaleziono skladki'}), 404
amount_paid = request.form.get('amount_paid', type=float)
payment_date = request.form.get('payment_date')
payment_method = request.form.get('payment_method', 'transfer')
payment_reference = request.form.get('payment_reference', '')
notes = request.form.get('notes', '')
fee.amount_paid = amount_paid or float(fee.amount)
fee.payment_date = datetime.strptime(payment_date, '%Y-%m-%d').date() if payment_date else datetime.now().date()
fee.payment_method = payment_method
fee.payment_reference = payment_reference
fee.notes = notes
fee.recorded_by = current_user.id
fee.recorded_at = datetime.now()
if fee.amount_paid >= float(fee.amount):
fee.status = 'paid'
elif fee.amount_paid > 0:
fee.status = 'partial'
db.commit()
return jsonify({
'success': True,
'message': 'Skladka zostala zarejestrowana'
})
except Exception as e:
db.rollback()
logger.error(f"Error marking fee as paid: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/fees/bulk-mark-paid', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_fees_bulk_mark_paid():
"""Bulk mark fees as paid"""
db = SessionLocal()
try:
fee_ids = request.form.getlist('fee_ids[]', type=int)
if not fee_ids:
return jsonify({'success': False, 'error': 'Brak wybranych skladek'}), 400
updated = 0
for fee_id in fee_ids:
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
if fee and fee.status != 'paid':
fee.status = 'paid'
fee.amount_paid = fee.amount
fee.payment_date = datetime.now().date()
fee.recorded_by = current_user.id
fee.recorded_at = datetime.now()
updated += 1
db.commit()
return jsonify({
'success': True,
'message': f'Zaktualizowano {updated} rekordow'
})
except Exception as e:
db.rollback()
logger.error(f"Error in bulk action: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/fees/export')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_fees_export():
"""Export fees to CSV"""
db = SessionLocal()
try:
year = request.args.get('year', datetime.now().year, type=int)
month = request.args.get('month', type=int)
query = db.query(MembershipFee).join(Company).filter(
MembershipFee.fee_year == year
)
if month:
query = query.filter(MembershipFee.fee_month == month)
fees = query.order_by(Company.name, MembershipFee.fee_month).all()
output = StringIO()
writer = csv.writer(output)
writer.writerow([
'Firma', 'NIP', 'Rok', 'Miesiac', 'Kwota', 'Zaplacono',
'Status', 'Data platnosci', 'Metoda', 'Referencja', 'Notatki'
])
for fee in fees:
writer.writerow([
fee.company.name,
fee.company.nip,
fee.fee_year,
fee.fee_month,
fee.amount,
fee.amount_paid,
fee.status,
fee.payment_date,
fee.payment_method,
fee.payment_reference,
fee.notes
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=skladki_{year}_{month or "all"}.csv'
}
)
finally:
db.close()
# ============================================================
# CALENDAR ADMIN ROUTES
# ============================================================
@bp.route('/kalendarz')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_calendar():
"""Panel admin - zarządzanie wydarzeniami"""
db = SessionLocal()
try:
events = db.query(NordaEvent).order_by(NordaEvent.event_date.desc()).all()
return render_template('calendar/admin.html', events=events)
finally:
db.close()
@bp.route('/kalendarz/nowy', methods=['GET', 'POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_calendar_new():
"""Dodaj nowe wydarzenie"""
if request.method == 'POST':
db = SessionLocal()
try:
# Określ access_level na podstawie formularza lub typu wydarzenia
access_level = request.form.get('access_level', 'members_only')
event_type = request.form.get('event_type', 'meeting')
# Automatycznie ustaw rada_only dla wydarzeń typu "rada"
if event_type == 'rada' and access_level != 'rada_only':
access_level = 'rada_only'
event = NordaEvent(
title=request.form.get('title', '').strip(),
description=sanitize_html(request.form.get('description', '').strip()),
event_date=datetime.strptime(request.form.get('event_date'), '%Y-%m-%d').date(),
time_start=request.form.get('time_start') or None,
time_end=request.form.get('time_end') or None,
location=request.form.get('location', '').strip() or None,
event_type=event_type,
max_attendees=request.form.get('max_attendees', type=int) or None,
access_level=access_level,
created_by=current_user.id,
source='manual'
)
# Handle file attachment
attachment = request.files.get('attachment')
if attachment and attachment.filename:
from werkzeug.utils import secure_filename
import uuid
filename = secure_filename(attachment.filename)
stored_name = f"{uuid.uuid4().hex}_{filename}"
upload_dir = os.path.join('static', 'uploads', 'events')
os.makedirs(upload_dir, exist_ok=True)
attachment.save(os.path.join(upload_dir, stored_name))
event.attachment_filename = filename
event.attachment_path = f"static/uploads/events/{stored_name}"
db.add(event)
db.commit()
flash('Wydarzenie zostało dodane.', 'success')
return redirect(url_for('.admin_calendar'))
except Exception as e:
db.rollback()
flash(f'Błąd: {str(e)}', 'error')
finally:
db.close()
return render_template('calendar/admin_new.html', event=None)
@bp.route('/kalendarz/<int:event_id>/delete', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_calendar_delete(event_id):
"""Usuń wydarzenie"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
# Delete RSVPs first
db.query(EventAttendee).filter(EventAttendee.event_id == event_id).delete()
db.delete(event)
db.commit()
return jsonify({'success': True, 'message': 'Wydarzenie usunięte'})
except Exception as e:
db.rollback()
logger.error(f"Error deleting event: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
# ============================================================
# RELEASE NOTIFICATIONS
# ============================================================
@bp.route('/notify-release', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def admin_notify_release():
"""
Send notifications to all users about a new release.
Called manually by admin after deploying a new version.
"""
data = request.get_json() or {}
version = data.get('version')
highlights = data.get('highlights', [])
if not version:
return jsonify({'success': False, 'error': 'Brak wersji'}), 400
from utils.notifications import notify_all_users_release
count = notify_all_users_release(version=version, highlights=highlights)
return jsonify({
'success': True,
'message': f'Wysłano {count} powiadomień o wersji {version}'
})