nordabiz/blueprints/admin/routes_people.py
Maciej Pienczyn 4181a2e760 refactor: Migrate access control from is_admin to role-based system
Replace ~170 manual `if not current_user.is_admin` checks with:
- @role_required(SystemRole.ADMIN) for user management, security, ZOPK
- @role_required(SystemRole.OFFICE_MANAGER) for content management
- current_user.can_access_admin_panel() for admin UI access
- current_user.can_moderate_forum() for forum moderation
- current_user.can_edit_company(id) for company permissions

Add @office_manager_required decorator shortcut.
Add SQL migration to sync existing users' role field.

Role hierarchy: UNAFFILIATED(10) < MEMBER(20) < EMPLOYEE(30) < MANAGER(40) < OFFICE_MANAGER(50) < ADMIN(100)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:05:22 +01:00

450 lines
15 KiB
Python

"""
Admin Routes - People
=====================
CRUD operations for person management in admin panel.
People are linked to companies via CompanyPerson relationships.
"""
import logging
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, Company, Person, CompanyPerson, SystemRole
from utils.decorators import role_required
# Logger
logger = logging.getLogger(__name__)
# ============================================================
# PEOPLE ADMIN ROUTES
# ============================================================
@bp.route('/people')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_people():
"""Admin panel for person management"""
db = SessionLocal()
try:
# Get search query
search_query = request.args.get('q', '').strip()
role_filter = request.args.get('role', '')
# Base query
query = db.query(Person)
if search_query:
search_pattern = f'%{search_query}%'
query = query.filter(
(Person.imiona.ilike(search_pattern)) |
(Person.nazwisko.ilike(search_pattern))
)
# Order and fetch
people = query.order_by(Person.nazwisko, Person.imiona).all()
# Apply role filter after fetching (need to check relationships)
if role_filter:
filtered_people = []
for person in people:
roles = [cp.role_category for cp in person.company_roles]
if role_filter in roles:
filtered_people.append(person)
people = filtered_people
# Statistics
total_people = db.query(Person).count()
# Count people with company relationships
with_companies = db.query(Person).join(CompanyPerson).distinct().count()
# Count by role_category
zarzad_count = db.query(CompanyPerson).filter(
CompanyPerson.role_category == 'zarzad'
).distinct(CompanyPerson.person_id).count()
wspolnik_count = db.query(CompanyPerson).filter(
CompanyPerson.role_category == 'wspolnik'
).distinct(CompanyPerson.person_id).count()
# Prepare people data with relationships
people_data = []
for person in people:
roles_list = []
for cp in person.company_roles:
roles_list.append({
'role': cp.role,
'role_category': cp.role_category,
'company_name': cp.company.name if cp.company else 'Nieznana',
'company_id': cp.company_id
})
people_data.append({
'id': person.id,
'imiona': person.imiona,
'nazwisko': person.nazwisko,
'pesel_masked': f"{person.pesel[:4]}******" if person.pesel else None,
'full_name': person.full_name(),
'roles': roles_list,
'roles_count': len(roles_list)
})
logger.info(f"Admin {current_user.email} accessed people panel - {total_people} people")
return render_template(
'admin/people.html',
people=people_data,
total_people=total_people,
with_companies=with_companies,
zarzad_count=zarzad_count,
wspolnik_count=wspolnik_count,
search_query=search_query,
current_role=role_filter
)
finally:
db.close()
@bp.route('/people/add', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_add():
"""Create a new person"""
db = SessionLocal()
try:
data = request.get_json() or {}
imiona = data.get('imiona', '').strip()
nazwisko = data.get('nazwisko', '').strip()
if not imiona or not nazwisko:
return jsonify({'success': False, 'error': 'Imiona i nazwisko są wymagane'}), 400
pesel = data.get('pesel', '').strip() if data.get('pesel') else None
# Validate PESEL if provided
if pesel:
if len(pesel) != 11 or not pesel.isdigit():
return jsonify({'success': False, 'error': 'PESEL musi mieć 11 cyfr'}), 400
existing = db.query(Person).filter(Person.pesel == pesel).first()
if existing:
return jsonify({
'success': False,
'error': f'Osoba z tym PESEL już istnieje: {existing.full_name()}'
}), 400
new_person = Person(
imiona=imiona,
nazwisko=nazwisko,
pesel=pesel
)
db.add(new_person)
db.commit()
db.refresh(new_person)
logger.info(f"Admin {current_user.email} created new person: {new_person.full_name()} (ID: {new_person.id})")
return jsonify({
'success': True,
'person_id': new_person.id,
'message': f'Osoba "{new_person.full_name()}" została utworzona'
})
except Exception as e:
db.rollback()
logger.error(f"Error creating person: {e}")
return jsonify({'success': False, 'error': 'Błąd podczas tworzenia osoby'}), 500
finally:
db.close()
@bp.route('/people/<int:person_id>')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_get(person_id):
"""Get person details (JSON)"""
db = SessionLocal()
try:
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
return jsonify({'success': False, 'error': 'Osoba nie istnieje'}), 404
return jsonify({
'success': True,
'person': {
'id': person.id,
'imiona': person.imiona,
'nazwisko': person.nazwisko,
'pesel_masked': f"{person.pesel[:4]}******" if person.pesel else None,
'full_name': person.full_name()
}
})
finally:
db.close()
@bp.route('/people/<int:person_id>/update', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_update(person_id):
"""Update person data"""
db = SessionLocal()
try:
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
return jsonify({'success': False, 'error': 'Osoba nie istnieje'}), 404
data = request.get_json() or {}
if 'imiona' in data:
imiona = data['imiona'].strip()
if not imiona:
return jsonify({'success': False, 'error': 'Imiona są wymagane'}), 400
person.imiona = imiona
if 'nazwisko' in data:
nazwisko = data['nazwisko'].strip()
if not nazwisko:
return jsonify({'success': False, 'error': 'Nazwisko jest wymagane'}), 400
person.nazwisko = nazwisko
if 'pesel' in data:
pesel = data['pesel'].strip() if data['pesel'] else None
if pesel:
if len(pesel) != 11 or not pesel.isdigit():
return jsonify({'success': False, 'error': 'PESEL musi mieć 11 cyfr'}), 400
existing = db.query(Person).filter(Person.pesel == pesel, Person.id != person_id).first()
if existing:
return jsonify({
'success': False,
'error': f'Osoba z tym PESEL już istnieje: {existing.full_name()}'
}), 400
person.pesel = pesel
person.updated_at = datetime.utcnow()
db.commit()
logger.info(f"Admin {current_user.email} updated person {person.full_name()} (ID: {person_id})")
return jsonify({
'success': True,
'message': 'Dane osoby zaktualizowane'
})
except Exception as e:
db.rollback()
logger.error(f"Error updating person {person_id}: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/people/<int:person_id>/delete', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_delete(person_id):
"""Delete person (hard delete with CASCADE on CompanyPerson)"""
db = SessionLocal()
try:
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
return jsonify({'success': False, 'error': 'Osoba nie istnieje'}), 404
person_name = person.full_name()
roles_count = len(person.company_roles)
# Delete person (CASCADE will handle CompanyPerson)
db.delete(person)
db.commit()
logger.info(f"Admin {current_user.email} deleted person {person_name} (ID: {person_id}, had {roles_count} roles)")
return jsonify({
'success': True,
'message': f'Osoba "{person_name}" została usunięta'
})
finally:
db.close()
@bp.route('/people/<int:person_id>/companies')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_companies(person_id):
"""Get companies associated with a person"""
db = SessionLocal()
try:
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
return jsonify({'success': False, 'error': 'Osoba nie istnieje'}), 404
companies_list = []
for cp in person.company_roles:
company = cp.company
companies_list.append({
'link_id': cp.id,
'company_id': company.id if company else None,
'company_name': company.name if company else 'Nieznana',
'company_nip': company.nip if company else None,
'role': cp.role,
'role_category': cp.role_category,
'shares_percent': float(cp.shares_percent) if cp.shares_percent else None
})
return jsonify({
'success': True,
'person_name': person.full_name(),
'companies': companies_list
})
finally:
db.close()
@bp.route('/people/<int:person_id>/link-company', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_link_company(person_id):
"""Link person to a company"""
db = SessionLocal()
try:
data = request.get_json() or {}
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
return jsonify({'success': False, 'error': 'Osoba nie istnieje'}), 404
company_id = data.get('company_id')
if not company_id:
return jsonify({'success': False, 'error': 'Nie podano firmy'}), 400
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
role = data.get('role', '').strip()
role_category = data.get('role_category', '').strip()
if not role or not role_category:
return jsonify({'success': False, 'error': 'Rola i kategoria roli są wymagane'}), 400
if role_category not in ['zarzad', 'wspolnik', 'prokurent']:
return jsonify({'success': False, 'error': 'Nieprawidłowa kategoria roli'}), 400
# Check if link already exists
existing = db.query(CompanyPerson).filter(
CompanyPerson.company_id == company_id,
CompanyPerson.person_id == person_id,
CompanyPerson.role_category == role_category,
CompanyPerson.role == role
).first()
if existing:
return jsonify({
'success': False,
'error': f'Ta osoba jest już powiązana z firmą w tej roli'
}), 400
new_link = CompanyPerson(
company_id=company_id,
person_id=person_id,
role=role,
role_category=role_category,
shares_percent=data.get('shares_percent') if role_category == 'wspolnik' else None,
source='admin_panel'
)
db.add(new_link)
db.commit()
logger.info(f"Admin {current_user.email} linked person {person.full_name()} to company {company.name} as {role}")
return jsonify({
'success': True,
'message': f'Powiązano {person.full_name()} z {company.name} jako {role}'
})
except Exception as e:
db.rollback()
logger.error(f"Error linking person to company: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@bp.route('/people/<int:person_id>/unlink-company/<int:company_id>', methods=['POST'])
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_person_unlink_company(person_id, company_id):
"""Remove person-company link"""
db = SessionLocal()
try:
data = request.get_json() or {}
link_id = data.get('link_id')
if link_id:
# Delete specific link by ID
link = db.query(CompanyPerson).filter(
CompanyPerson.id == link_id,
CompanyPerson.person_id == person_id,
CompanyPerson.company_id == company_id
).first()
else:
# Delete first matching link
link = db.query(CompanyPerson).filter(
CompanyPerson.person_id == person_id,
CompanyPerson.company_id == company_id
).first()
if not link:
return jsonify({'success': False, 'error': 'Powiązanie nie istnieje'}), 404
person = link.person
company = link.company
role = link.role
db.delete(link)
db.commit()
logger.info(f"Admin {current_user.email} unlinked person {person.full_name()} from company {company.name} (role: {role})")
return jsonify({
'success': True,
'message': f'Usunięto powiązanie z firmą {company.name}'
})
finally:
db.close()
@bp.route('/people/search')
@login_required
@role_required(SystemRole.OFFICE_MANAGER)
def admin_people_search():
"""Search people for autocomplete"""
db = SessionLocal()
try:
query = request.args.get('q', '').strip()
if len(query) < 2:
return jsonify({'success': True, 'results': []})
search_pattern = f'%{query}%'
people = db.query(Person).filter(
(Person.imiona.ilike(search_pattern)) |
(Person.nazwisko.ilike(search_pattern))
).limit(10).all()
results = [{
'id': p.id,
'text': p.full_name(),
'pesel_masked': f"{p.pesel[:4]}******" if p.pesel else None
} for p in people]
return jsonify({'success': True, 'results': results})
finally:
db.close()