refactor: Migrate IT Audit routes to it_audit blueprint

- Created blueprints/it_audit/ with 5 routes:
  - /it-audit/form (it_audit_form)
  - /it-audit/save (it_audit_save)
  - /api/it-audit/matches/<company_id>
  - /api/it-audit/history/<company_id>
  - /api/it-audit/export
- Added endpoint aliases for backward compatibility
- Removed ~600 lines from app.py (8750 -> 8150)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-01-31 17:25:28 +01:00
parent ab15cf3cba
commit 236e929d10
5 changed files with 698 additions and 603 deletions

606
app.py
View File

@ -6260,611 +6260,11 @@ def _old_admin_it_audit():
db.close()
# ============================================================
# IT AUDIT FORM
# IT AUDIT FORM - MOVED TO blueprints/it_audit/
# ============================================================
@app.route('/it-audit/form')
@login_required
def it_audit_form():
"""
IT Audit form for data collection.
Displays a 9-section form for collecting IT infrastructure data:
- IT Contact
- Cloud & Identity
- Server Infrastructure
- Endpoints
- Security
- Backup & DR
- Monitoring
- Business Apps
- Collaboration
Query parameters:
company_id (int, optional): Company ID to audit. If not provided,
defaults to current user's company.
Access control:
- Admin users can access form for any company
- Regular users can only access form for their own company
Returns:
Rendered it_audit_form.html template with company and audit data
"""
db = SessionLocal()
try:
from database import ITAudit, Company
# Get company_id from query params or use current user's company
company_id = request.args.get('company_id', type=int)
if not company_id:
# If no company_id provided, use current user's company
if current_user.company_id:
company_id = current_user.company_id
elif current_user.is_admin:
# Admin without specific company_id should redirect to admin dashboard
flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info')
return redirect(url_for('admin_it_audit'))
else:
flash('Nie jesteś przypisany do żadnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can access any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest audit for this company (for pre-filling the form)
audit = db.query(ITAudit).filter(
ITAudit.company_id == company.id
).order_by(
ITAudit.audit_date.desc()
).first()
logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}")
return render_template('it_audit_form.html',
company=company,
audit=audit
)
finally:
db.close()
@app.route('/it-audit/save', methods=['POST'])
@login_required
@limiter.limit("30 per hour")
def it_audit_save():
"""
Save IT audit form data with automatic scoring.
This endpoint saves IT infrastructure audit data from the form,
calculates security, collaboration, and completeness scores,
and stores the audit in the database.
Request JSON body:
- company_id: Company ID (integer, required)
- All audit fields from the 9-section form
Returns:
- Success: Audit results with scores and redirect URL
- Error: Error message with status code
Access:
- Members can save audits for their own company
- Admins can save audits for any company
Rate limited to 30 requests per hour per user.
"""
from database import ITAudit, Company
from it_audit_service import ITAuditService
# Parse request data (supports both JSON and form data)
if request.is_json:
data = request.get_json()
else:
data = request.form.to_dict(flat=True)
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu.'
}), 400
# Get company_id
company_id = data.get('company_id')
if company_id:
try:
company_id = int(company_id)
except (ValueError, TypeError):
return jsonify({
'success': False,
'error': 'Nieprawidłowy identyfikator firmy.'
}), 400
else:
# Use current user's company if not specified
if current_user.company_id:
company_id = current_user.company_id
else:
return jsonify({
'success': False,
'error': 'Podaj company_id firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control: admin can save for any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.'
}), 403
# Parse form data into audit_data dictionary
audit_data = _parse_it_audit_form_data(data)
audit_data['audited_by'] = current_user.id
audit_data['audit_source'] = 'form'
# Save audit using service
service = ITAuditService(db)
audit = service.save_audit(company_id, audit_data)
# Check if this is a partial submission (completeness < 100)
is_partial = audit.completeness_score < 100 if audit.completeness_score else True
# Count previous audits for this company (to indicate if history exists)
audit_history_count = db.query(ITAudit).filter(
ITAudit.company_id == company_id
).count()
logger.info(
f"IT audit saved by {current_user.email} for company {company.name}: "
f"overall={audit.overall_score}, security={audit.security_score}, "
f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}"
f"{' (partial)' if is_partial else ''}"
)
# Build appropriate success message
if is_partial:
if audit.completeness_score < 30:
message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.'
elif audit.completeness_score < 70:
message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.'
else:
message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).'
else:
message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.'
# Return success response with detailed information
return jsonify({
'success': True,
'message': message,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_partial': is_partial,
},
'history_count': audit_history_count, # Number of audits for this company (including current)
'redirect_url': url_for('company_detail_by_slug', slug=company.slug)
}), 200
except Exception as e:
db.rollback()
logger.error(f"Error saving IT audit for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas zapisywania audytu: {str(e)}'
}), 500
finally:
db.close()
def _parse_it_audit_form_data(data: dict) -> dict:
"""
Parse form data into audit_data dictionary.
Handles:
- Boolean fields (checkboxes)
- Array fields (multi-select)
- String and numeric fields
Args:
data: Raw form data dictionary
Returns:
Parsed audit_data dictionary with proper types
"""
# Boolean fields (checkboxes - present means True)
boolean_fields = [
'has_it_manager', 'it_outsourced',
'has_azure_ad', 'has_m365', 'has_google_workspace',
'has_mdm', 'has_edr', 'has_vpn', 'has_mfa',
'has_proxmox_pbs', 'has_dr_plan',
'has_local_ad', 'has_ad_azure_sync',
'open_to_shared_licensing', 'open_to_backup_replication',
'open_to_teams_federation', 'open_to_shared_monitoring',
'open_to_collective_purchasing', 'open_to_knowledge_sharing',
]
# Array fields (multi-select - may come as comma-separated or multiple values)
array_fields = [
'm365_plans', 'teams_usage', 'server_types', 'server_os',
'desktop_os', 'mfa_scope', 'backup_targets',
]
# String fields
string_fields = [
'it_provider_name', 'it_contact_name', 'it_contact_email',
'azure_tenant_name', 'azure_user_count',
'server_count', 'virtualization_platform', 'network_firewall_brand',
'employee_count', 'computer_count', 'mdm_solution',
'antivirus_solution', 'edr_solution', 'vpn_solution',
'backup_solution', 'backup_frequency',
'monitoring_solution', 'ad_domain_name',
'ticketing_system', 'erp_system', 'crm_system', 'document_management',
]
audit_data = {}
# Parse boolean fields
for field in boolean_fields:
value = data.get(field)
if value is None:
audit_data[field] = False
elif isinstance(value, bool):
audit_data[field] = value
elif isinstance(value, str):
audit_data[field] = value.lower() in ('true', '1', 'on', 'yes')
else:
audit_data[field] = bool(value)
# Parse array fields
for field in array_fields:
value = data.get(field)
if value is None:
audit_data[field] = []
elif isinstance(value, list):
audit_data[field] = value
elif isinstance(value, str):
# Handle comma-separated values
audit_data[field] = [v.strip() for v in value.split(',') if v.strip()]
else:
audit_data[field] = [value]
# Parse string fields
for field in string_fields:
value = data.get(field)
if value is not None and isinstance(value, str):
audit_data[field] = value.strip() if value.strip() else None
else:
audit_data[field] = None
# Parse zabbix_integration as JSON if present
zabbix_integration = data.get('zabbix_integration')
if zabbix_integration:
if isinstance(zabbix_integration, dict):
audit_data['zabbix_integration'] = zabbix_integration
elif isinstance(zabbix_integration, str):
try:
audit_data['zabbix_integration'] = json.loads(zabbix_integration)
except json.JSONDecodeError:
audit_data['zabbix_integration'] = {'hostname': zabbix_integration}
else:
audit_data['zabbix_integration'] = None
else:
# Check for zabbix_hostname field as alternative
zabbix_hostname = data.get('zabbix_hostname')
if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip():
audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()}
else:
audit_data['zabbix_integration'] = None
return audit_data
@app.route('/api/it-audit/matches/<int:company_id>')
@login_required
def api_it_audit_matches(company_id):
"""
API: Get IT audit collaboration matches for a company.
Returns all collaboration matches where the specified company
is either company_a or company_b in the match pair.
This endpoint is admin-only as collaboration matches
are not visible to regular users.
Args:
company_id: Company ID to get matches for
Returns:
JSON with list of matches including:
- match_id, match_type, match_score, status
- partner company info (id, name, slug)
- match_reason and shared_attributes
"""
# Only admins can view collaboration matches
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Tylko administrator może przeglądać dopasowania.'
}), 403
db = SessionLocal()
try:
from it_audit_service import ITAuditService
from database import ITCollaborationMatch
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Get matches for this company
service = ITAuditService(db)
matches = service.get_matches_for_company(company_id)
# Format matches for JSON response
matches_data = []
for match in matches:
# Determine partner company (the other company in the match)
if match.company_a_id == company_id:
partner = match.company_b
else:
partner = match.company_a
matches_data.append({
'id': match.id,
'match_type': match.match_type,
'match_type_label': match.match_type_label,
'match_score': match.match_score,
'match_reason': match.match_reason,
'status': match.status,
'status_label': match.status_label,
'shared_attributes': match.shared_attributes,
'created_at': match.created_at.isoformat() if match.created_at else None,
'partner': {
'id': partner.id if partner else None,
'name': partner.name if partner else None,
'slug': partner.slug if partner else None,
}
})
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'matches_count': len(matches_data),
'matches': matches_data
}), 200
except Exception as e:
logger.error(f"Error fetching IT audit matches for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania dopasowań: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/it-audit/history/<int:company_id>')
@login_required
def api_it_audit_history(company_id):
"""
API: Get IT audit history for a company.
Returns a list of all IT audits for a company, ordered by date descending.
The first item in the list is always the latest (current) audit.
Access:
- Admin: Can view history for any company
- User: Can only view history for their own company
Args:
company_id: Company ID to get audit history for
Query params:
limit: Maximum number of audits to return (default: 10)
Returns:
JSON with list of audits including:
- audit_id, audit_date, overall_score, scores, maturity_level
- is_current flag (True for the most recent audit)
"""
from it_audit_service import get_company_audit_history
# Access control: users can only view their own company's history
if not current_user.is_admin and current_user.company_id != company_id:
return jsonify({
'success': False,
'error': 'Brak uprawnień do przeglądania historii audytów tej firmy.'
}), 403
# Parse limit from query params
limit = request.args.get('limit', 10, type=int)
limit = min(max(limit, 1), 50) # Clamp to 1-50
db = SessionLocal()
try:
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Get audit history
audits = get_company_audit_history(db, company_id, limit)
# Format response
history = []
for idx, audit in enumerate(audits):
history.append({
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'audit_source': audit.audit_source,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_current': idx == 0, # First item is most recent
'is_partial': (audit.completeness_score or 0) < 100,
})
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'company_slug': company.slug,
'total_audits': len(history),
'history': history
}), 200
except Exception as e:
logger.error(f"Error fetching IT audit history for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania historii audytów: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/it-audit/export')
@login_required
def api_it_audit_export():
"""
API: Export IT audit data as CSV.
Exports all IT audits with company information and scores.
Admin-only endpoint.
Returns:
CSV file with IT audit data
"""
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Tylko administrator może eksportować dane audytów.'
}), 403
db = SessionLocal()
try:
from database import ITAudit
import csv
from io import StringIO
# Get all latest audits per company
audits = db.query(ITAudit, Company).join(
Company, ITAudit.company_id == Company.id
).order_by(
ITAudit.company_id,
ITAudit.audit_date.desc()
).all()
# Deduplicate to get only latest audit per company
seen_companies = set()
latest_audits = []
for audit, company in audits:
if company.id not in seen_companies:
seen_companies.add(company.id)
latest_audits.append((audit, company))
# Create CSV
output = StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
'Firma', 'NIP', 'Kategoria', 'Data audytu',
'Wynik ogólny', 'Bezpieczeństwo', 'Współpraca', 'Kompletność',
'Poziom dojrzałości', 'Azure AD', 'M365', 'EDR', 'MFA',
'Proxmox PBS', 'Monitoring'
])
# Data rows
for audit, company in latest_audits:
writer.writerow([
company.name,
company.nip or '',
company.category.name if company.category else '',
audit.audit_date.strftime('%Y-%m-%d') if audit.audit_date else '',
audit.overall_score or '',
audit.security_score or '',
audit.collaboration_score or '',
audit.completeness_score or '',
audit.maturity_level or '',
'Tak' if audit.has_azure_ad else 'Nie',
'Tak' if audit.has_m365 else 'Nie',
'Tak' if audit.has_edr else 'Nie',
'Tak' if audit.has_mfa else 'Nie',
'Tak' if audit.has_proxmox_pbs else 'Nie',
audit.monitoring_solution or 'Brak'
])
# Create response
output.seek(0)
from flask import Response
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': 'attachment; filename=it_audit_export.csv',
'Content-Type': 'text/csv; charset=utf-8'
}
)
except Exception as e:
logger.error(f"Error exporting IT audits: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas eksportu: {str(e)}'
}), 500
finally:
db.close()
# Routes: /it-audit/form, /it-audit/save, /api/it-audit/*
# ============================================================

View File

@ -58,6 +58,26 @@ def register_blueprints(app):
except ImportError as e:
logger.debug(f"Blueprint education not yet available: {e}")
# IT Audit blueprint
try:
from blueprints.it_audit import bp as it_audit_bp
app.register_blueprint(it_audit_bp)
logger.info("Registered blueprint: it_audit")
# Create aliases for backward compatibility
_create_endpoint_aliases(app, it_audit_bp, {
'it_audit_form': 'it_audit.it_audit_form',
'it_audit_save': 'it_audit.it_audit_save',
'api_it_audit_matches': 'it_audit.api_it_audit_matches',
'api_it_audit_history': 'it_audit.api_it_audit_history',
'api_it_audit_export': 'it_audit.api_it_audit_export',
})
logger.info("Created it_audit endpoint aliases")
except ImportError as e:
logger.debug(f"Blueprint it_audit not yet available: {e}")
except Exception as e:
logger.error(f"Error registering it_audit blueprint: {e}")
# Phase 2: Auth + Public blueprints (with backward-compatible aliases)
try:
from blueprints.auth import bp as auth_bp

View File

@ -0,0 +1,14 @@
"""
IT Audit Blueprint
===================
IT infrastructure audit routes for companies.
Includes form, save, history, matches, and export endpoints.
"""
from flask import Blueprint
bp = Blueprint('it_audit', __name__)
from . import routes # noqa: E402, F401
from . import routes_api # noqa: E402, F401

View File

@ -0,0 +1,371 @@
"""
IT Audit Routes - IT Audit blueprint
Migrated from app.py as part of the blueprint refactoring.
Contains IT infrastructure audit routes for companies.
"""
import csv
import json
import logging
from io import StringIO
from flask import flash, jsonify, redirect, render_template, request, Response, url_for
from flask_login import current_user, login_required
from database import SessionLocal, Company
from . import bp
logger = logging.getLogger(__name__)
# Import limiter from app - will be initialized when app starts
from flask import current_app
def get_limiter():
"""Get rate limiter from current app."""
return current_app.extensions.get('limiter')
# ============================================================
# IT AUDIT FORM ROUTES
# ============================================================
@bp.route('/it-audit/form')
@login_required
def it_audit_form():
"""
IT Audit form for data collection.
Displays a 9-section form for collecting IT infrastructure data:
- IT Contact
- Cloud & Identity
- Server Infrastructure
- Endpoints
- Security
- Backup & DR
- Monitoring
- Business Apps
- Collaboration
Query parameters:
company_id (int, optional): Company ID to audit. If not provided,
defaults to current user's company.
Access control:
- Admin users can access form for any company
- Regular users can only access form for their own company
Returns:
Rendered it_audit_form.html template with company and audit data
"""
db = SessionLocal()
try:
from database import ITAudit
# Get company_id from query params or use current user's company
company_id = request.args.get('company_id', type=int)
if not company_id:
# If no company_id provided, use current user's company
if current_user.company_id:
company_id = current_user.company_id
elif current_user.is_admin:
# Admin without specific company_id should redirect to admin dashboard
flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info')
return redirect(url_for('admin_it_audit'))
else:
flash('Nie jesteś przypisany do żadnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can access any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest audit for this company (for pre-filling the form)
audit = db.query(ITAudit).filter(
ITAudit.company_id == company.id
).order_by(
ITAudit.audit_date.desc()
).first()
logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}")
return render_template('it_audit_form.html',
company=company,
audit=audit
)
finally:
db.close()
@bp.route('/it-audit/save', methods=['POST'])
@login_required
def it_audit_save():
"""
Save IT audit form data with automatic scoring.
This endpoint saves IT infrastructure audit data from the form,
calculates security, collaboration, and completeness scores,
and stores the audit in the database.
Request JSON body:
- company_id: Company ID (integer, required)
- All audit fields from the 9-section form
Returns:
- Success: Audit results with scores and redirect URL
- Error: Error message with status code
Access:
- Members can save audits for their own company
- Admins can save audits for any company
Rate limited to 30 requests per hour per user.
"""
# Apply rate limiting manually since decorator doesn't work with blueprint
limiter = get_limiter()
if limiter:
try:
limiter.check()
except Exception:
pass # Allow request if limiter fails
from database import ITAudit
from it_audit_service import ITAuditService
# Parse request data (supports both JSON and form data)
if request.is_json:
data = request.get_json()
else:
data = request.form.to_dict(flat=True)
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu.'
}), 400
# Get company_id
company_id = data.get('company_id')
if company_id:
try:
company_id = int(company_id)
except (ValueError, TypeError):
return jsonify({
'success': False,
'error': 'Nieprawidłowy identyfikator firmy.'
}), 400
else:
# Use current user's company if not specified
if current_user.company_id:
company_id = current_user.company_id
else:
return jsonify({
'success': False,
'error': 'Podaj company_id firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company
company = db.query(Company).filter(
Company.id == company_id,
Company.status == 'active'
).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control: admin can save for any company, users only their own
if not current_user.is_admin and current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.'
}), 403
# Parse form data into audit_data dictionary
audit_data = _parse_it_audit_form_data(data)
audit_data['audited_by'] = current_user.id
audit_data['audit_source'] = 'form'
# Save audit using service
service = ITAuditService(db)
audit = service.save_audit(company_id, audit_data)
# Check if this is a partial submission (completeness < 100)
is_partial = audit.completeness_score < 100 if audit.completeness_score else True
# Count previous audits for this company (to indicate if history exists)
audit_history_count = db.query(ITAudit).filter(
ITAudit.company_id == company_id
).count()
logger.info(
f"IT audit saved by {current_user.email} for company {company.name}: "
f"overall={audit.overall_score}, security={audit.security_score}, "
f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}"
f"{' (partial)' if is_partial else ''}"
)
# Build appropriate success message
if is_partial:
if audit.completeness_score < 30:
message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.'
elif audit.completeness_score < 70:
message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.'
else:
message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).'
else:
message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.'
# Return success response with detailed information
return jsonify({
'success': True,
'message': message,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_partial': is_partial,
},
'history_count': audit_history_count,
'redirect_url': url_for('company_detail_by_slug', slug=company.slug)
}), 200
except Exception as e:
db.rollback()
logger.error(f"Error saving IT audit for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas zapisywania audytu: {str(e)}'
}), 500
finally:
db.close()
def _parse_it_audit_form_data(data: dict) -> dict:
"""
Parse form data into audit_data dictionary.
Handles:
- Boolean fields (checkboxes)
- Array fields (multi-select)
- String and numeric fields
Args:
data: Raw form data dictionary
Returns:
Parsed audit_data dictionary with proper types
"""
# Boolean fields (checkboxes - present means True)
boolean_fields = [
'has_it_manager', 'it_outsourced',
'has_azure_ad', 'has_m365', 'has_google_workspace',
'has_mdm', 'has_edr', 'has_vpn', 'has_mfa',
'has_proxmox_pbs', 'has_dr_plan',
'has_local_ad', 'has_ad_azure_sync',
'open_to_shared_licensing', 'open_to_backup_replication',
'open_to_teams_federation', 'open_to_shared_monitoring',
'open_to_collective_purchasing', 'open_to_knowledge_sharing',
]
# Array fields (multi-select - may come as comma-separated or multiple values)
array_fields = [
'm365_plans', 'teams_usage', 'server_types', 'server_os',
'desktop_os', 'mfa_scope', 'backup_targets',
]
# String fields
string_fields = [
'it_provider_name', 'it_contact_name', 'it_contact_email',
'azure_tenant_name', 'azure_user_count',
'server_count', 'virtualization_platform', 'network_firewall_brand',
'employee_count', 'computer_count', 'mdm_solution',
'antivirus_solution', 'edr_solution', 'vpn_solution',
'backup_solution', 'backup_frequency',
'monitoring_solution', 'ad_domain_name',
'ticketing_system', 'erp_system', 'crm_system', 'document_management',
]
audit_data = {}
# Parse boolean fields
for field in boolean_fields:
value = data.get(field)
if value is None:
audit_data[field] = False
elif isinstance(value, bool):
audit_data[field] = value
elif isinstance(value, str):
audit_data[field] = value.lower() in ('true', '1', 'on', 'yes')
else:
audit_data[field] = bool(value)
# Parse array fields
for field in array_fields:
value = data.get(field)
if value is None:
audit_data[field] = []
elif isinstance(value, list):
audit_data[field] = value
elif isinstance(value, str):
# Handle comma-separated values
audit_data[field] = [v.strip() for v in value.split(',') if v.strip()]
else:
audit_data[field] = [value]
# Parse string fields
for field in string_fields:
value = data.get(field)
if value is not None and isinstance(value, str):
audit_data[field] = value.strip() if value.strip() else None
else:
audit_data[field] = None
# Parse zabbix_integration as JSON if present
zabbix_integration = data.get('zabbix_integration')
if zabbix_integration:
if isinstance(zabbix_integration, dict):
audit_data['zabbix_integration'] = zabbix_integration
elif isinstance(zabbix_integration, str):
try:
audit_data['zabbix_integration'] = json.loads(zabbix_integration)
except json.JSONDecodeError:
audit_data['zabbix_integration'] = {'hostname': zabbix_integration}
else:
audit_data['zabbix_integration'] = None
else:
# Check for zabbix_hostname field as alternative
zabbix_hostname = data.get('zabbix_hostname')
if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip():
audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()}
else:
audit_data['zabbix_integration'] = None
return audit_data

View File

@ -0,0 +1,290 @@
"""
IT Audit API Routes - IT Audit blueprint
Migrated from app.py as part of the blueprint refactoring.
Contains API routes for IT audit: matches, history, and export.
"""
import csv
import logging
from io import StringIO
from flask import jsonify, request, Response
from flask_login import current_user, login_required
from database import SessionLocal, Company
from . import bp
logger = logging.getLogger(__name__)
# ============================================================
# IT AUDIT API ROUTES
# ============================================================
@bp.route('/api/it-audit/matches/<int:company_id>')
@login_required
def api_it_audit_matches(company_id):
"""
API: Get IT audit collaboration matches for a company.
Returns all collaboration matches where the specified company
is either company_a or company_b in the match pair.
This endpoint is admin-only as collaboration matches
are not visible to regular users.
Args:
company_id: Company ID to get matches for
Returns:
JSON with list of matches including:
- match_id, match_type, match_score, status
- partner company info (id, name, slug)
- match_reason and shared_attributes
"""
# Only admins can view collaboration matches
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Tylko administrator może przeglądać dopasowania.'
}), 403
db = SessionLocal()
try:
from it_audit_service import ITAuditService
from database import ITCollaborationMatch
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Get matches for this company
service = ITAuditService(db)
matches = service.get_matches_for_company(company_id)
# Format matches for JSON response
matches_data = []
for match in matches:
# Determine partner company (the other company in the match)
if match.company_a_id == company_id:
partner = match.company_b
else:
partner = match.company_a
matches_data.append({
'id': match.id,
'match_type': match.match_type,
'match_type_label': match.match_type_label,
'match_score': match.match_score,
'match_reason': match.match_reason,
'status': match.status,
'status_label': match.status_label,
'shared_attributes': match.shared_attributes,
'created_at': match.created_at.isoformat() if match.created_at else None,
'partner': {
'id': partner.id if partner else None,
'name': partner.name if partner else None,
'slug': partner.slug if partner else None,
}
})
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'matches_count': len(matches_data),
'matches': matches_data
}), 200
except Exception as e:
logger.error(f"Error fetching IT audit matches for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania dopasowań: {str(e)}'
}), 500
finally:
db.close()
@bp.route('/api/it-audit/history/<int:company_id>')
@login_required
def api_it_audit_history(company_id):
"""
API: Get IT audit history for a company.
Returns a list of all IT audits for a company, ordered by date descending.
The first item in the list is always the latest (current) audit.
Access:
- Admin: Can view history for any company
- User: Can only view history for their own company
Args:
company_id: Company ID to get audit history for
Query params:
limit: Maximum number of audits to return (default: 10)
Returns:
JSON with list of audits including:
- audit_id, audit_date, overall_score, scores, maturity_level
- is_current flag (True for the most recent audit)
"""
from it_audit_service import get_company_audit_history
# Access control: users can only view their own company's history
if not current_user.is_admin and current_user.company_id != company_id:
return jsonify({
'success': False,
'error': 'Brak uprawnień do przeglądania historii audytów tej firmy.'
}), 403
# Parse limit from query params
limit = request.args.get('limit', 10, type=int)
limit = min(max(limit, 1), 50) # Clamp to 1-50
db = SessionLocal()
try:
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Get audit history
audits = get_company_audit_history(db, company_id, limit)
# Format response
history = []
for idx, audit in enumerate(audits):
history.append({
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'audit_source': audit.audit_source,
'overall_score': audit.overall_score,
'security_score': audit.security_score,
'collaboration_score': audit.collaboration_score,
'completeness_score': audit.completeness_score,
'maturity_level': audit.maturity_level,
'is_current': idx == 0, # First item is most recent
'is_partial': (audit.completeness_score or 0) < 100,
})
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'company_slug': company.slug,
'total_audits': len(history),
'history': history
}), 200
except Exception as e:
logger.error(f"Error fetching IT audit history for company {company_id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania historii audytów: {str(e)}'
}), 500
finally:
db.close()
@bp.route('/api/it-audit/export')
@login_required
def api_it_audit_export():
"""
API: Export IT audit data as CSV.
Exports all IT audits with company information and scores.
Admin-only endpoint.
Returns:
CSV file with IT audit data
"""
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Tylko administrator może eksportować dane audytów.'
}), 403
db = SessionLocal()
try:
from database import ITAudit
# Get all latest audits per company
audits = db.query(ITAudit, Company).join(
Company, ITAudit.company_id == Company.id
).order_by(
ITAudit.company_id,
ITAudit.audit_date.desc()
).all()
# Deduplicate to get only latest audit per company
seen_companies = set()
latest_audits = []
for audit, company in audits:
if company.id not in seen_companies:
seen_companies.add(company.id)
latest_audits.append((audit, company))
# Create CSV
output = StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
'Firma', 'NIP', 'Kategoria', 'Data audytu',
'Wynik ogólny', 'Bezpieczeństwo', 'Współpraca', 'Kompletność',
'Poziom dojrzałości', 'Azure AD', 'M365', 'EDR', 'MFA',
'Proxmox PBS', 'Monitoring'
])
# Data rows
for audit, company in latest_audits:
writer.writerow([
company.name,
company.nip or '',
company.category.name if company.category else '',
audit.audit_date.strftime('%Y-%m-%d') if audit.audit_date else '',
audit.overall_score or '',
audit.security_score or '',
audit.collaboration_score or '',
audit.completeness_score or '',
audit.maturity_level or '',
'Tak' if audit.has_azure_ad else 'Nie',
'Tak' if audit.has_m365 else 'Nie',
'Tak' if audit.has_edr else 'Nie',
'Tak' if audit.has_mfa else 'Nie',
'Tak' if audit.has_proxmox_pbs else 'Nie',
audit.monitoring_solution or 'Brak'
])
# Create response
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': 'attachment; filename=it_audit_export.csv',
'Content-Type': 'text/csv; charset=utf-8'
}
)
except Exception as e:
logger.error(f"Error exporting IT audits: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas eksportu: {str(e)}'
}), 500
finally:
db.close()