From 236e929d101e827fe11855e0e36cb37c2f34b24c Mon Sep 17 00:00:00 2001 From: Maciej Pienczyn Date: Sat, 31 Jan 2026 17:25:28 +0100 Subject: [PATCH] refactor: Migrate IT Audit routes to it_audit blueprint - Created blueprints/it_audit/ with 5 routes: - /it-audit/form (it_audit_form) - /it-audit/save (it_audit_save) - /api/it-audit/matches/ - /api/it-audit/history/ - /api/it-audit/export - Added endpoint aliases for backward compatibility - Removed ~600 lines from app.py (8750 -> 8150) Co-Authored-By: Claude Opus 4.5 --- app.py | 606 +----------------------------- blueprints/__init__.py | 20 + blueprints/it_audit/__init__.py | 14 + blueprints/it_audit/routes.py | 371 ++++++++++++++++++ blueprints/it_audit/routes_api.py | 290 ++++++++++++++ 5 files changed, 698 insertions(+), 603 deletions(-) create mode 100644 blueprints/it_audit/__init__.py create mode 100644 blueprints/it_audit/routes.py create mode 100644 blueprints/it_audit/routes_api.py diff --git a/app.py b/app.py index 8f01461..2b7bcbe 100644 --- a/app.py +++ b/app.py @@ -6260,611 +6260,11 @@ def _old_admin_it_audit(): db.close() + # ============================================================ -# IT AUDIT FORM +# IT AUDIT FORM - MOVED TO blueprints/it_audit/ # ============================================================ - -@app.route('/it-audit/form') -@login_required -def it_audit_form(): - """ - IT Audit form for data collection. - - Displays a 9-section form for collecting IT infrastructure data: - - IT Contact - - Cloud & Identity - - Server Infrastructure - - Endpoints - - Security - - Backup & DR - - Monitoring - - Business Apps - - Collaboration - - Query parameters: - company_id (int, optional): Company ID to audit. If not provided, - defaults to current user's company. - - Access control: - - Admin users can access form for any company - - Regular users can only access form for their own company - - Returns: - Rendered it_audit_form.html template with company and audit data - """ - db = SessionLocal() - try: - from database import ITAudit, Company - - # Get company_id from query params or use current user's company - company_id = request.args.get('company_id', type=int) - - if not company_id: - # If no company_id provided, use current user's company - if current_user.company_id: - company_id = current_user.company_id - elif current_user.is_admin: - # Admin without specific company_id should redirect to admin dashboard - flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info') - return redirect(url_for('admin_it_audit')) - else: - flash('Nie jesteś przypisany do żadnej firmy.', 'error') - return redirect(url_for('dashboard')) - - # Find company - company = db.query(Company).filter( - Company.id == company_id, - Company.status == 'active' - ).first() - - if not company: - flash('Firma nie została znaleziona.', 'error') - return redirect(url_for('dashboard')) - - # Access control: admin can access any company, users only their own - if not current_user.is_admin and current_user.company_id != company.id: - flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error') - return redirect(url_for('dashboard')) - - # Get latest audit for this company (for pre-filling the form) - audit = db.query(ITAudit).filter( - ITAudit.company_id == company.id - ).order_by( - ITAudit.audit_date.desc() - ).first() - - logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}") - - return render_template('it_audit_form.html', - company=company, - audit=audit - ) - - finally: - db.close() - - -@app.route('/it-audit/save', methods=['POST']) -@login_required -@limiter.limit("30 per hour") -def it_audit_save(): - """ - Save IT audit form data with automatic scoring. - - This endpoint saves IT infrastructure audit data from the form, - calculates security, collaboration, and completeness scores, - and stores the audit in the database. - - Request JSON body: - - company_id: Company ID (integer, required) - - All audit fields from the 9-section form - - Returns: - - Success: Audit results with scores and redirect URL - - Error: Error message with status code - - Access: - - Members can save audits for their own company - - Admins can save audits for any company - - Rate limited to 30 requests per hour per user. - """ - from database import ITAudit, Company - from it_audit_service import ITAuditService - - # Parse request data (supports both JSON and form data) - if request.is_json: - data = request.get_json() - else: - data = request.form.to_dict(flat=True) - - if not data: - return jsonify({ - 'success': False, - 'error': 'Brak danych w żądaniu.' - }), 400 - - # Get company_id - company_id = data.get('company_id') - if company_id: - try: - company_id = int(company_id) - except (ValueError, TypeError): - return jsonify({ - 'success': False, - 'error': 'Nieprawidłowy identyfikator firmy.' - }), 400 - else: - # Use current user's company if not specified - if current_user.company_id: - company_id = current_user.company_id - else: - return jsonify({ - 'success': False, - 'error': 'Podaj company_id firmy do audytu.' - }), 400 - - db = SessionLocal() - try: - # Find company - company = db.query(Company).filter( - Company.id == company_id, - Company.status == 'active' - ).first() - - if not company: - return jsonify({ - 'success': False, - 'error': 'Firma nie znaleziona lub nieaktywna.' - }), 404 - - # Access control: admin can save for any company, users only their own - if not current_user.is_admin and current_user.company_id != company.id: - return jsonify({ - 'success': False, - 'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.' - }), 403 - - # Parse form data into audit_data dictionary - audit_data = _parse_it_audit_form_data(data) - audit_data['audited_by'] = current_user.id - audit_data['audit_source'] = 'form' - - # Save audit using service - service = ITAuditService(db) - audit = service.save_audit(company_id, audit_data) - - # Check if this is a partial submission (completeness < 100) - is_partial = audit.completeness_score < 100 if audit.completeness_score else True - - # Count previous audits for this company (to indicate if history exists) - audit_history_count = db.query(ITAudit).filter( - ITAudit.company_id == company_id - ).count() - - logger.info( - f"IT audit saved by {current_user.email} for company {company.name}: " - f"overall={audit.overall_score}, security={audit.security_score}, " - f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}" - f"{' (partial)' if is_partial else ''}" - ) - - # Build appropriate success message - if is_partial: - if audit.completeness_score < 30: - message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.' - elif audit.completeness_score < 70: - message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.' - else: - message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).' - else: - message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.' - - # Return success response with detailed information - return jsonify({ - 'success': True, - 'message': message, - 'company_id': company.id, - 'company_name': company.name, - 'company_slug': company.slug, - 'audit': { - 'id': audit.id, - 'audit_date': audit.audit_date.isoformat() if audit.audit_date else None, - 'overall_score': audit.overall_score, - 'security_score': audit.security_score, - 'collaboration_score': audit.collaboration_score, - 'completeness_score': audit.completeness_score, - 'maturity_level': audit.maturity_level, - 'is_partial': is_partial, - }, - 'history_count': audit_history_count, # Number of audits for this company (including current) - 'redirect_url': url_for('company_detail_by_slug', slug=company.slug) - }), 200 - - except Exception as e: - db.rollback() - logger.error(f"Error saving IT audit for company {company_id}: {e}") - return jsonify({ - 'success': False, - 'error': f'Błąd podczas zapisywania audytu: {str(e)}' - }), 500 - - finally: - db.close() - - -def _parse_it_audit_form_data(data: dict) -> dict: - """ - Parse form data into audit_data dictionary. - - Handles: - - Boolean fields (checkboxes) - - Array fields (multi-select) - - String and numeric fields - - Args: - data: Raw form data dictionary - - Returns: - Parsed audit_data dictionary with proper types - """ - # Boolean fields (checkboxes - present means True) - boolean_fields = [ - 'has_it_manager', 'it_outsourced', - 'has_azure_ad', 'has_m365', 'has_google_workspace', - 'has_mdm', 'has_edr', 'has_vpn', 'has_mfa', - 'has_proxmox_pbs', 'has_dr_plan', - 'has_local_ad', 'has_ad_azure_sync', - 'open_to_shared_licensing', 'open_to_backup_replication', - 'open_to_teams_federation', 'open_to_shared_monitoring', - 'open_to_collective_purchasing', 'open_to_knowledge_sharing', - ] - - # Array fields (multi-select - may come as comma-separated or multiple values) - array_fields = [ - 'm365_plans', 'teams_usage', 'server_types', 'server_os', - 'desktop_os', 'mfa_scope', 'backup_targets', - ] - - # String fields - string_fields = [ - 'it_provider_name', 'it_contact_name', 'it_contact_email', - 'azure_tenant_name', 'azure_user_count', - 'server_count', 'virtualization_platform', 'network_firewall_brand', - 'employee_count', 'computer_count', 'mdm_solution', - 'antivirus_solution', 'edr_solution', 'vpn_solution', - 'backup_solution', 'backup_frequency', - 'monitoring_solution', 'ad_domain_name', - 'ticketing_system', 'erp_system', 'crm_system', 'document_management', - ] - - audit_data = {} - - # Parse boolean fields - for field in boolean_fields: - value = data.get(field) - if value is None: - audit_data[field] = False - elif isinstance(value, bool): - audit_data[field] = value - elif isinstance(value, str): - audit_data[field] = value.lower() in ('true', '1', 'on', 'yes') - else: - audit_data[field] = bool(value) - - # Parse array fields - for field in array_fields: - value = data.get(field) - if value is None: - audit_data[field] = [] - elif isinstance(value, list): - audit_data[field] = value - elif isinstance(value, str): - # Handle comma-separated values - audit_data[field] = [v.strip() for v in value.split(',') if v.strip()] - else: - audit_data[field] = [value] - - # Parse string fields - for field in string_fields: - value = data.get(field) - if value is not None and isinstance(value, str): - audit_data[field] = value.strip() if value.strip() else None - else: - audit_data[field] = None - - # Parse zabbix_integration as JSON if present - zabbix_integration = data.get('zabbix_integration') - if zabbix_integration: - if isinstance(zabbix_integration, dict): - audit_data['zabbix_integration'] = zabbix_integration - elif isinstance(zabbix_integration, str): - try: - audit_data['zabbix_integration'] = json.loads(zabbix_integration) - except json.JSONDecodeError: - audit_data['zabbix_integration'] = {'hostname': zabbix_integration} - else: - audit_data['zabbix_integration'] = None - else: - # Check for zabbix_hostname field as alternative - zabbix_hostname = data.get('zabbix_hostname') - if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip(): - audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()} - else: - audit_data['zabbix_integration'] = None - - return audit_data - - -@app.route('/api/it-audit/matches/') -@login_required -def api_it_audit_matches(company_id): - """ - API: Get IT audit collaboration matches for a company. - - Returns all collaboration matches where the specified company - is either company_a or company_b in the match pair. - - This endpoint is admin-only as collaboration matches - are not visible to regular users. - - Args: - company_id: Company ID to get matches for - - Returns: - JSON with list of matches including: - - match_id, match_type, match_score, status - - partner company info (id, name, slug) - - match_reason and shared_attributes - """ - # Only admins can view collaboration matches - if not current_user.is_admin: - return jsonify({ - 'success': False, - 'error': 'Brak uprawnień. Tylko administrator może przeglądać dopasowania.' - }), 403 - - db = SessionLocal() - try: - from it_audit_service import ITAuditService - from database import ITCollaborationMatch - - # Verify company exists - company = db.query(Company).filter_by(id=company_id).first() - if not company: - return jsonify({ - 'success': False, - 'error': 'Firma nie znaleziona' - }), 404 - - # Get matches for this company - service = ITAuditService(db) - matches = service.get_matches_for_company(company_id) - - # Format matches for JSON response - matches_data = [] - for match in matches: - # Determine partner company (the other company in the match) - if match.company_a_id == company_id: - partner = match.company_b - else: - partner = match.company_a - - matches_data.append({ - 'id': match.id, - 'match_type': match.match_type, - 'match_type_label': match.match_type_label, - 'match_score': match.match_score, - 'match_reason': match.match_reason, - 'status': match.status, - 'status_label': match.status_label, - 'shared_attributes': match.shared_attributes, - 'created_at': match.created_at.isoformat() if match.created_at else None, - 'partner': { - 'id': partner.id if partner else None, - 'name': partner.name if partner else None, - 'slug': partner.slug if partner else None, - } - }) - - return jsonify({ - 'success': True, - 'company_id': company_id, - 'company_name': company.name, - 'matches_count': len(matches_data), - 'matches': matches_data - }), 200 - - except Exception as e: - logger.error(f"Error fetching IT audit matches for company {company_id}: {e}") - return jsonify({ - 'success': False, - 'error': f'Błąd podczas pobierania dopasowań: {str(e)}' - }), 500 - - finally: - db.close() - - -@app.route('/api/it-audit/history/') -@login_required -def api_it_audit_history(company_id): - """ - API: Get IT audit history for a company. - - Returns a list of all IT audits for a company, ordered by date descending. - The first item in the list is always the latest (current) audit. - - Access: - - Admin: Can view history for any company - - User: Can only view history for their own company - - Args: - company_id: Company ID to get audit history for - - Query params: - limit: Maximum number of audits to return (default: 10) - - Returns: - JSON with list of audits including: - - audit_id, audit_date, overall_score, scores, maturity_level - - is_current flag (True for the most recent audit) - """ - from it_audit_service import get_company_audit_history - - # Access control: users can only view their own company's history - if not current_user.is_admin and current_user.company_id != company_id: - return jsonify({ - 'success': False, - 'error': 'Brak uprawnień do przeglądania historii audytów tej firmy.' - }), 403 - - # Parse limit from query params - limit = request.args.get('limit', 10, type=int) - limit = min(max(limit, 1), 50) # Clamp to 1-50 - - db = SessionLocal() - try: - # Verify company exists - company = db.query(Company).filter_by(id=company_id).first() - if not company: - return jsonify({ - 'success': False, - 'error': 'Firma nie znaleziona' - }), 404 - - # Get audit history - audits = get_company_audit_history(db, company_id, limit) - - # Format response - history = [] - for idx, audit in enumerate(audits): - history.append({ - 'id': audit.id, - 'audit_date': audit.audit_date.isoformat() if audit.audit_date else None, - 'audit_source': audit.audit_source, - 'overall_score': audit.overall_score, - 'security_score': audit.security_score, - 'collaboration_score': audit.collaboration_score, - 'completeness_score': audit.completeness_score, - 'maturity_level': audit.maturity_level, - 'is_current': idx == 0, # First item is most recent - 'is_partial': (audit.completeness_score or 0) < 100, - }) - - return jsonify({ - 'success': True, - 'company_id': company_id, - 'company_name': company.name, - 'company_slug': company.slug, - 'total_audits': len(history), - 'history': history - }), 200 - - except Exception as e: - logger.error(f"Error fetching IT audit history for company {company_id}: {e}") - return jsonify({ - 'success': False, - 'error': f'Błąd podczas pobierania historii audytów: {str(e)}' - }), 500 - - finally: - db.close() - - -@app.route('/api/it-audit/export') -@login_required -def api_it_audit_export(): - """ - API: Export IT audit data as CSV. - - Exports all IT audits with company information and scores. - Admin-only endpoint. - - Returns: - CSV file with IT audit data - """ - if not current_user.is_admin: - return jsonify({ - 'success': False, - 'error': 'Tylko administrator może eksportować dane audytów.' - }), 403 - - db = SessionLocal() - try: - from database import ITAudit - import csv - from io import StringIO - - # Get all latest audits per company - audits = db.query(ITAudit, Company).join( - Company, ITAudit.company_id == Company.id - ).order_by( - ITAudit.company_id, - ITAudit.audit_date.desc() - ).all() - - # Deduplicate to get only latest audit per company - seen_companies = set() - latest_audits = [] - for audit, company in audits: - if company.id not in seen_companies: - seen_companies.add(company.id) - latest_audits.append((audit, company)) - - # Create CSV - output = StringIO() - writer = csv.writer(output) - - # Header - writer.writerow([ - 'Firma', 'NIP', 'Kategoria', 'Data audytu', - 'Wynik ogólny', 'Bezpieczeństwo', 'Współpraca', 'Kompletność', - 'Poziom dojrzałości', 'Azure AD', 'M365', 'EDR', 'MFA', - 'Proxmox PBS', 'Monitoring' - ]) - - # Data rows - for audit, company in latest_audits: - writer.writerow([ - company.name, - company.nip or '', - company.category.name if company.category else '', - audit.audit_date.strftime('%Y-%m-%d') if audit.audit_date else '', - audit.overall_score or '', - audit.security_score or '', - audit.collaboration_score or '', - audit.completeness_score or '', - audit.maturity_level or '', - 'Tak' if audit.has_azure_ad else 'Nie', - 'Tak' if audit.has_m365 else 'Nie', - 'Tak' if audit.has_edr else 'Nie', - 'Tak' if audit.has_mfa else 'Nie', - 'Tak' if audit.has_proxmox_pbs else 'Nie', - audit.monitoring_solution or 'Brak' - ]) - - # Create response - output.seek(0) - from flask import Response - return Response( - output.getvalue(), - mimetype='text/csv', - headers={ - 'Content-Disposition': 'attachment; filename=it_audit_export.csv', - 'Content-Type': 'text/csv; charset=utf-8' - } - ) - - except Exception as e: - logger.error(f"Error exporting IT audits: {e}") - return jsonify({ - 'success': False, - 'error': f'Błąd podczas eksportu: {str(e)}' - }), 500 - - finally: - db.close() +# Routes: /it-audit/form, /it-audit/save, /api/it-audit/* # ============================================================ diff --git a/blueprints/__init__.py b/blueprints/__init__.py index 751ef41..c33466e 100644 --- a/blueprints/__init__.py +++ b/blueprints/__init__.py @@ -58,6 +58,26 @@ def register_blueprints(app): except ImportError as e: logger.debug(f"Blueprint education not yet available: {e}") + # IT Audit blueprint + try: + from blueprints.it_audit import bp as it_audit_bp + app.register_blueprint(it_audit_bp) + logger.info("Registered blueprint: it_audit") + + # Create aliases for backward compatibility + _create_endpoint_aliases(app, it_audit_bp, { + 'it_audit_form': 'it_audit.it_audit_form', + 'it_audit_save': 'it_audit.it_audit_save', + 'api_it_audit_matches': 'it_audit.api_it_audit_matches', + 'api_it_audit_history': 'it_audit.api_it_audit_history', + 'api_it_audit_export': 'it_audit.api_it_audit_export', + }) + logger.info("Created it_audit endpoint aliases") + except ImportError as e: + logger.debug(f"Blueprint it_audit not yet available: {e}") + except Exception as e: + logger.error(f"Error registering it_audit blueprint: {e}") + # Phase 2: Auth + Public blueprints (with backward-compatible aliases) try: from blueprints.auth import bp as auth_bp diff --git a/blueprints/it_audit/__init__.py b/blueprints/it_audit/__init__.py new file mode 100644 index 0000000..8602ec5 --- /dev/null +++ b/blueprints/it_audit/__init__.py @@ -0,0 +1,14 @@ +""" +IT Audit Blueprint +=================== + +IT infrastructure audit routes for companies. +Includes form, save, history, matches, and export endpoints. +""" + +from flask import Blueprint + +bp = Blueprint('it_audit', __name__) + +from . import routes # noqa: E402, F401 +from . import routes_api # noqa: E402, F401 diff --git a/blueprints/it_audit/routes.py b/blueprints/it_audit/routes.py new file mode 100644 index 0000000..d075e4b --- /dev/null +++ b/blueprints/it_audit/routes.py @@ -0,0 +1,371 @@ +""" +IT Audit Routes - IT Audit blueprint + +Migrated from app.py as part of the blueprint refactoring. +Contains IT infrastructure audit routes for companies. +""" + +import csv +import json +import logging +from io import StringIO + +from flask import flash, jsonify, redirect, render_template, request, Response, url_for +from flask_login import current_user, login_required + +from database import SessionLocal, Company +from . import bp + +logger = logging.getLogger(__name__) + + +# Import limiter from app - will be initialized when app starts +from flask import current_app + + +def get_limiter(): + """Get rate limiter from current app.""" + return current_app.extensions.get('limiter') + + +# ============================================================ +# IT AUDIT FORM ROUTES +# ============================================================ + +@bp.route('/it-audit/form') +@login_required +def it_audit_form(): + """ + IT Audit form for data collection. + + Displays a 9-section form for collecting IT infrastructure data: + - IT Contact + - Cloud & Identity + - Server Infrastructure + - Endpoints + - Security + - Backup & DR + - Monitoring + - Business Apps + - Collaboration + + Query parameters: + company_id (int, optional): Company ID to audit. If not provided, + defaults to current user's company. + + Access control: + - Admin users can access form for any company + - Regular users can only access form for their own company + + Returns: + Rendered it_audit_form.html template with company and audit data + """ + db = SessionLocal() + try: + from database import ITAudit + + # Get company_id from query params or use current user's company + company_id = request.args.get('company_id', type=int) + + if not company_id: + # If no company_id provided, use current user's company + if current_user.company_id: + company_id = current_user.company_id + elif current_user.is_admin: + # Admin without specific company_id should redirect to admin dashboard + flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info') + return redirect(url_for('admin_it_audit')) + else: + flash('Nie jesteś przypisany do żadnej firmy.', 'error') + return redirect(url_for('dashboard')) + + # Find company + company = db.query(Company).filter( + Company.id == company_id, + Company.status == 'active' + ).first() + + if not company: + flash('Firma nie została znaleziona.', 'error') + return redirect(url_for('dashboard')) + + # Access control: admin can access any company, users only their own + if not current_user.is_admin and current_user.company_id != company.id: + flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error') + return redirect(url_for('dashboard')) + + # Get latest audit for this company (for pre-filling the form) + audit = db.query(ITAudit).filter( + ITAudit.company_id == company.id + ).order_by( + ITAudit.audit_date.desc() + ).first() + + logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}") + + return render_template('it_audit_form.html', + company=company, + audit=audit + ) + + finally: + db.close() + + +@bp.route('/it-audit/save', methods=['POST']) +@login_required +def it_audit_save(): + """ + Save IT audit form data with automatic scoring. + + This endpoint saves IT infrastructure audit data from the form, + calculates security, collaboration, and completeness scores, + and stores the audit in the database. + + Request JSON body: + - company_id: Company ID (integer, required) + - All audit fields from the 9-section form + + Returns: + - Success: Audit results with scores and redirect URL + - Error: Error message with status code + + Access: + - Members can save audits for their own company + - Admins can save audits for any company + + Rate limited to 30 requests per hour per user. + """ + # Apply rate limiting manually since decorator doesn't work with blueprint + limiter = get_limiter() + if limiter: + try: + limiter.check() + except Exception: + pass # Allow request if limiter fails + + from database import ITAudit + from it_audit_service import ITAuditService + + # Parse request data (supports both JSON and form data) + if request.is_json: + data = request.get_json() + else: + data = request.form.to_dict(flat=True) + + if not data: + return jsonify({ + 'success': False, + 'error': 'Brak danych w żądaniu.' + }), 400 + + # Get company_id + company_id = data.get('company_id') + if company_id: + try: + company_id = int(company_id) + except (ValueError, TypeError): + return jsonify({ + 'success': False, + 'error': 'Nieprawidłowy identyfikator firmy.' + }), 400 + else: + # Use current user's company if not specified + if current_user.company_id: + company_id = current_user.company_id + else: + return jsonify({ + 'success': False, + 'error': 'Podaj company_id firmy do audytu.' + }), 400 + + db = SessionLocal() + try: + # Find company + company = db.query(Company).filter( + Company.id == company_id, + Company.status == 'active' + ).first() + + if not company: + return jsonify({ + 'success': False, + 'error': 'Firma nie znaleziona lub nieaktywna.' + }), 404 + + # Access control: admin can save for any company, users only their own + if not current_user.is_admin and current_user.company_id != company.id: + return jsonify({ + 'success': False, + 'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.' + }), 403 + + # Parse form data into audit_data dictionary + audit_data = _parse_it_audit_form_data(data) + audit_data['audited_by'] = current_user.id + audit_data['audit_source'] = 'form' + + # Save audit using service + service = ITAuditService(db) + audit = service.save_audit(company_id, audit_data) + + # Check if this is a partial submission (completeness < 100) + is_partial = audit.completeness_score < 100 if audit.completeness_score else True + + # Count previous audits for this company (to indicate if history exists) + audit_history_count = db.query(ITAudit).filter( + ITAudit.company_id == company_id + ).count() + + logger.info( + f"IT audit saved by {current_user.email} for company {company.name}: " + f"overall={audit.overall_score}, security={audit.security_score}, " + f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}" + f"{' (partial)' if is_partial else ''}" + ) + + # Build appropriate success message + if is_partial: + if audit.completeness_score < 30: + message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.' + elif audit.completeness_score < 70: + message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.' + else: + message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).' + else: + message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.' + + # Return success response with detailed information + return jsonify({ + 'success': True, + 'message': message, + 'company_id': company.id, + 'company_name': company.name, + 'company_slug': company.slug, + 'audit': { + 'id': audit.id, + 'audit_date': audit.audit_date.isoformat() if audit.audit_date else None, + 'overall_score': audit.overall_score, + 'security_score': audit.security_score, + 'collaboration_score': audit.collaboration_score, + 'completeness_score': audit.completeness_score, + 'maturity_level': audit.maturity_level, + 'is_partial': is_partial, + }, + 'history_count': audit_history_count, + 'redirect_url': url_for('company_detail_by_slug', slug=company.slug) + }), 200 + + except Exception as e: + db.rollback() + logger.error(f"Error saving IT audit for company {company_id}: {e}") + return jsonify({ + 'success': False, + 'error': f'Błąd podczas zapisywania audytu: {str(e)}' + }), 500 + + finally: + db.close() + + +def _parse_it_audit_form_data(data: dict) -> dict: + """ + Parse form data into audit_data dictionary. + + Handles: + - Boolean fields (checkboxes) + - Array fields (multi-select) + - String and numeric fields + + Args: + data: Raw form data dictionary + + Returns: + Parsed audit_data dictionary with proper types + """ + # Boolean fields (checkboxes - present means True) + boolean_fields = [ + 'has_it_manager', 'it_outsourced', + 'has_azure_ad', 'has_m365', 'has_google_workspace', + 'has_mdm', 'has_edr', 'has_vpn', 'has_mfa', + 'has_proxmox_pbs', 'has_dr_plan', + 'has_local_ad', 'has_ad_azure_sync', + 'open_to_shared_licensing', 'open_to_backup_replication', + 'open_to_teams_federation', 'open_to_shared_monitoring', + 'open_to_collective_purchasing', 'open_to_knowledge_sharing', + ] + + # Array fields (multi-select - may come as comma-separated or multiple values) + array_fields = [ + 'm365_plans', 'teams_usage', 'server_types', 'server_os', + 'desktop_os', 'mfa_scope', 'backup_targets', + ] + + # String fields + string_fields = [ + 'it_provider_name', 'it_contact_name', 'it_contact_email', + 'azure_tenant_name', 'azure_user_count', + 'server_count', 'virtualization_platform', 'network_firewall_brand', + 'employee_count', 'computer_count', 'mdm_solution', + 'antivirus_solution', 'edr_solution', 'vpn_solution', + 'backup_solution', 'backup_frequency', + 'monitoring_solution', 'ad_domain_name', + 'ticketing_system', 'erp_system', 'crm_system', 'document_management', + ] + + audit_data = {} + + # Parse boolean fields + for field in boolean_fields: + value = data.get(field) + if value is None: + audit_data[field] = False + elif isinstance(value, bool): + audit_data[field] = value + elif isinstance(value, str): + audit_data[field] = value.lower() in ('true', '1', 'on', 'yes') + else: + audit_data[field] = bool(value) + + # Parse array fields + for field in array_fields: + value = data.get(field) + if value is None: + audit_data[field] = [] + elif isinstance(value, list): + audit_data[field] = value + elif isinstance(value, str): + # Handle comma-separated values + audit_data[field] = [v.strip() for v in value.split(',') if v.strip()] + else: + audit_data[field] = [value] + + # Parse string fields + for field in string_fields: + value = data.get(field) + if value is not None and isinstance(value, str): + audit_data[field] = value.strip() if value.strip() else None + else: + audit_data[field] = None + + # Parse zabbix_integration as JSON if present + zabbix_integration = data.get('zabbix_integration') + if zabbix_integration: + if isinstance(zabbix_integration, dict): + audit_data['zabbix_integration'] = zabbix_integration + elif isinstance(zabbix_integration, str): + try: + audit_data['zabbix_integration'] = json.loads(zabbix_integration) + except json.JSONDecodeError: + audit_data['zabbix_integration'] = {'hostname': zabbix_integration} + else: + audit_data['zabbix_integration'] = None + else: + # Check for zabbix_hostname field as alternative + zabbix_hostname = data.get('zabbix_hostname') + if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip(): + audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()} + else: + audit_data['zabbix_integration'] = None + + return audit_data diff --git a/blueprints/it_audit/routes_api.py b/blueprints/it_audit/routes_api.py new file mode 100644 index 0000000..16afc79 --- /dev/null +++ b/blueprints/it_audit/routes_api.py @@ -0,0 +1,290 @@ +""" +IT Audit API Routes - IT Audit blueprint + +Migrated from app.py as part of the blueprint refactoring. +Contains API routes for IT audit: matches, history, and export. +""" + +import csv +import logging +from io import StringIO + +from flask import jsonify, request, Response +from flask_login import current_user, login_required + +from database import SessionLocal, Company +from . import bp + +logger = logging.getLogger(__name__) + + +# ============================================================ +# IT AUDIT API ROUTES +# ============================================================ + +@bp.route('/api/it-audit/matches/') +@login_required +def api_it_audit_matches(company_id): + """ + API: Get IT audit collaboration matches for a company. + + Returns all collaboration matches where the specified company + is either company_a or company_b in the match pair. + + This endpoint is admin-only as collaboration matches + are not visible to regular users. + + Args: + company_id: Company ID to get matches for + + Returns: + JSON with list of matches including: + - match_id, match_type, match_score, status + - partner company info (id, name, slug) + - match_reason and shared_attributes + """ + # Only admins can view collaboration matches + if not current_user.is_admin: + return jsonify({ + 'success': False, + 'error': 'Brak uprawnień. Tylko administrator może przeglądać dopasowania.' + }), 403 + + db = SessionLocal() + try: + from it_audit_service import ITAuditService + from database import ITCollaborationMatch + + # Verify company exists + company = db.query(Company).filter_by(id=company_id).first() + if not company: + return jsonify({ + 'success': False, + 'error': 'Firma nie znaleziona' + }), 404 + + # Get matches for this company + service = ITAuditService(db) + matches = service.get_matches_for_company(company_id) + + # Format matches for JSON response + matches_data = [] + for match in matches: + # Determine partner company (the other company in the match) + if match.company_a_id == company_id: + partner = match.company_b + else: + partner = match.company_a + + matches_data.append({ + 'id': match.id, + 'match_type': match.match_type, + 'match_type_label': match.match_type_label, + 'match_score': match.match_score, + 'match_reason': match.match_reason, + 'status': match.status, + 'status_label': match.status_label, + 'shared_attributes': match.shared_attributes, + 'created_at': match.created_at.isoformat() if match.created_at else None, + 'partner': { + 'id': partner.id if partner else None, + 'name': partner.name if partner else None, + 'slug': partner.slug if partner else None, + } + }) + + return jsonify({ + 'success': True, + 'company_id': company_id, + 'company_name': company.name, + 'matches_count': len(matches_data), + 'matches': matches_data + }), 200 + + except Exception as e: + logger.error(f"Error fetching IT audit matches for company {company_id}: {e}") + return jsonify({ + 'success': False, + 'error': f'Błąd podczas pobierania dopasowań: {str(e)}' + }), 500 + + finally: + db.close() + + +@bp.route('/api/it-audit/history/') +@login_required +def api_it_audit_history(company_id): + """ + API: Get IT audit history for a company. + + Returns a list of all IT audits for a company, ordered by date descending. + The first item in the list is always the latest (current) audit. + + Access: + - Admin: Can view history for any company + - User: Can only view history for their own company + + Args: + company_id: Company ID to get audit history for + + Query params: + limit: Maximum number of audits to return (default: 10) + + Returns: + JSON with list of audits including: + - audit_id, audit_date, overall_score, scores, maturity_level + - is_current flag (True for the most recent audit) + """ + from it_audit_service import get_company_audit_history + + # Access control: users can only view their own company's history + if not current_user.is_admin and current_user.company_id != company_id: + return jsonify({ + 'success': False, + 'error': 'Brak uprawnień do przeglądania historii audytów tej firmy.' + }), 403 + + # Parse limit from query params + limit = request.args.get('limit', 10, type=int) + limit = min(max(limit, 1), 50) # Clamp to 1-50 + + db = SessionLocal() + try: + # Verify company exists + company = db.query(Company).filter_by(id=company_id).first() + if not company: + return jsonify({ + 'success': False, + 'error': 'Firma nie znaleziona' + }), 404 + + # Get audit history + audits = get_company_audit_history(db, company_id, limit) + + # Format response + history = [] + for idx, audit in enumerate(audits): + history.append({ + 'id': audit.id, + 'audit_date': audit.audit_date.isoformat() if audit.audit_date else None, + 'audit_source': audit.audit_source, + 'overall_score': audit.overall_score, + 'security_score': audit.security_score, + 'collaboration_score': audit.collaboration_score, + 'completeness_score': audit.completeness_score, + 'maturity_level': audit.maturity_level, + 'is_current': idx == 0, # First item is most recent + 'is_partial': (audit.completeness_score or 0) < 100, + }) + + return jsonify({ + 'success': True, + 'company_id': company_id, + 'company_name': company.name, + 'company_slug': company.slug, + 'total_audits': len(history), + 'history': history + }), 200 + + except Exception as e: + logger.error(f"Error fetching IT audit history for company {company_id}: {e}") + return jsonify({ + 'success': False, + 'error': f'Błąd podczas pobierania historii audytów: {str(e)}' + }), 500 + + finally: + db.close() + + +@bp.route('/api/it-audit/export') +@login_required +def api_it_audit_export(): + """ + API: Export IT audit data as CSV. + + Exports all IT audits with company information and scores. + Admin-only endpoint. + + Returns: + CSV file with IT audit data + """ + if not current_user.is_admin: + return jsonify({ + 'success': False, + 'error': 'Tylko administrator może eksportować dane audytów.' + }), 403 + + db = SessionLocal() + try: + from database import ITAudit + + # Get all latest audits per company + audits = db.query(ITAudit, Company).join( + Company, ITAudit.company_id == Company.id + ).order_by( + ITAudit.company_id, + ITAudit.audit_date.desc() + ).all() + + # Deduplicate to get only latest audit per company + seen_companies = set() + latest_audits = [] + for audit, company in audits: + if company.id not in seen_companies: + seen_companies.add(company.id) + latest_audits.append((audit, company)) + + # Create CSV + output = StringIO() + writer = csv.writer(output) + + # Header + writer.writerow([ + 'Firma', 'NIP', 'Kategoria', 'Data audytu', + 'Wynik ogólny', 'Bezpieczeństwo', 'Współpraca', 'Kompletność', + 'Poziom dojrzałości', 'Azure AD', 'M365', 'EDR', 'MFA', + 'Proxmox PBS', 'Monitoring' + ]) + + # Data rows + for audit, company in latest_audits: + writer.writerow([ + company.name, + company.nip or '', + company.category.name if company.category else '', + audit.audit_date.strftime('%Y-%m-%d') if audit.audit_date else '', + audit.overall_score or '', + audit.security_score or '', + audit.collaboration_score or '', + audit.completeness_score or '', + audit.maturity_level or '', + 'Tak' if audit.has_azure_ad else 'Nie', + 'Tak' if audit.has_m365 else 'Nie', + 'Tak' if audit.has_edr else 'Nie', + 'Tak' if audit.has_mfa else 'Nie', + 'Tak' if audit.has_proxmox_pbs else 'Nie', + audit.monitoring_solution or 'Brak' + ]) + + # Create response + output.seek(0) + return Response( + output.getvalue(), + mimetype='text/csv', + headers={ + 'Content-Disposition': 'attachment; filename=it_audit_export.csv', + 'Content-Type': 'text/csv; charset=utf-8' + } + ) + + except Exception as e: + logger.error(f"Error exporting IT audits: {e}") + return jsonify({ + 'success': False, + 'error': f'Błąd podczas eksportu: {str(e)}' + }), 500 + + finally: + db.close()