nordabiz/it_audit_service.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

995 lines
34 KiB
Python

"""
IT Audit Service for Norda Biznes Partner
======================================
IT infrastructure audit service with:
- Security posture assessment (50% weight)
- Collaboration readiness scoring (30% weight)
- Completeness scoring (20% weight)
- Maturity level classification
- Cross-company collaboration matching
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-01-09
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Any
from sqlalchemy.orm import Session
from database import Company, ITAudit, ITCollaborationMatch, SessionLocal
# Configure logging
logger = logging.getLogger(__name__)
# === Scoring Configuration ===
# Security score elements (max 65 raw points, normalized to 100, then weighted 50%)
SECURITY_SCORES = {
'has_edr': 15, # EDR solution - highest security value
'has_mfa': 10, # Multi-factor authentication
'has_firewall': 10, # Network firewall
'has_backup': 10, # Backup solution
'has_dr_plan': 10, # Disaster recovery plan
'has_vpn': 5, # VPN solution
'has_monitoring': 5, # Monitoring solution
}
SECURITY_MAX_RAW = 65
# Collaboration score elements (max 100 raw points, weighted 30%)
COLLABORATION_SCORES = {
'open_to_shared_licensing': 10, # Open to shared M365/software licensing
'open_to_backup_replication': 10, # Open to backup replication partnership
'open_to_teams_federation': 10, # Open to Teams federation
'open_to_shared_monitoring': 10, # Open to shared Zabbix monitoring
'open_to_collective_purchasing': 10, # Open to collective purchasing
'open_to_knowledge_sharing': 10, # Open to knowledge exchange
'has_azure_ad': 10, # Azure AD (enables federation)
'has_m365': 10, # Microsoft 365 (enables shared licensing)
'has_proxmox_pbs': 10, # Proxmox PBS (enables backup replication)
'has_zabbix': 10, # Zabbix monitoring (enables shared monitoring)
}
COLLABORATION_MAX_RAW = 100
# Completeness weights for different field categories
COMPLETENESS_WEIGHTS = {
# IT Contact (weight: 10)
'it_contact_name': 3,
'it_contact_email': 3,
'has_it_manager': 2,
'it_provider_name': 2,
# Cloud & Identity (weight: 15)
'has_azure_ad': 4,
'azure_tenant_name': 3,
'azure_user_count': 3,
'has_m365': 3,
'has_google_workspace': 2,
# Server Infrastructure (weight: 15)
'server_count': 3,
'server_types': 3,
'virtualization_platform': 3,
'server_os': 3,
'network_firewall_brand': 3,
# Endpoints (weight: 10)
'employee_count': 3,
'computer_count': 3,
'desktop_os': 2,
'mdm_solution': 2,
# Security (weight: 20)
'antivirus_solution': 4,
'has_edr': 4,
'has_vpn': 4,
'has_mfa': 4,
'mfa_scope': 4,
# Backup & DR (weight: 15)
'backup_solution': 4,
'backup_targets': 3,
'backup_frequency': 3,
'has_proxmox_pbs': 3,
'has_dr_plan': 2,
# Monitoring (weight: 5)
'monitoring_solution': 3,
'zabbix_integration': 2,
# Business Apps (weight: 5)
'ticketing_system': 2,
'erp_system': 2,
'crm_system': 1,
# Collaboration (weight: 5)
'open_to_shared_licensing': 1,
'open_to_backup_replication': 1,
'open_to_teams_federation': 1,
'open_to_shared_monitoring': 1,
'open_to_collective_purchasing': 1,
'open_to_knowledge_sharing': 0, # Bonus, not required for completeness
}
# Maturity level thresholds
MATURITY_LEVELS = {
'basic': (0, 39),
'developing': (40, 59),
'established': (60, 79),
'advanced': (80, 100),
}
# Collaboration match types with Polish labels
MATCH_TYPES = {
'shared_licensing': 'Wspólne licencje',
'backup_replication': 'Replikacja backupów',
'teams_federation': 'Federacja Teams',
'shared_monitoring': 'Wspólny monitoring',
'collective_purchasing': 'Zakupy grupowe',
'knowledge_sharing': 'Wymiana wiedzy',
}
@dataclass
class FieldStatus:
"""Status of a single IT audit field"""
field_name: str
status: str # 'complete', 'partial', 'missing'
value: Optional[Any] = None
score: float = 0.0
max_score: float = 0.0
recommendation: Optional[str] = None
details: Optional[Dict[str, Any]] = None
@dataclass
class ITAuditResult:
"""Complete IT audit result"""
company_id: int
overall_score: int
security_score: int
collaboration_score: int
completeness_score: int
maturity_level: str # basic, developing, established, advanced
fields: Dict[str, FieldStatus] = field(default_factory=dict)
recommendations: List[Dict[str, Any]] = field(default_factory=list)
audit_errors: Optional[str] = None
# Section summaries
section_scores: Dict[str, int] = field(default_factory=dict)
# Technology flags for quick access
has_azure_ad: bool = False
has_m365: bool = False
has_proxmox_pbs: bool = False
has_zabbix: bool = False
has_edr: bool = False
has_mfa: bool = False
@dataclass
class CollaborationMatch:
"""A potential collaboration match between two companies"""
company_a_id: int
company_b_id: int
company_a_name: str
company_b_name: str
match_type: str
match_reason: str
match_score: int
shared_attributes: Dict[str, Any] = field(default_factory=dict)
def get_maturity_level(score: int) -> str:
"""
Get maturity level label based on overall score.
Args:
score: Overall score (0-100)
Returns:
Maturity level: 'basic', 'developing', 'established', or 'advanced'
"""
for level, (min_score, max_score) in MATURITY_LEVELS.items():
if min_score <= score <= max_score:
return level
return 'basic'
def get_maturity_level_label(level: str) -> str:
"""
Get Polish label for maturity level.
Args:
level: Maturity level key
Returns:
Polish label for the maturity level
"""
labels = {
'basic': 'Podstawowy',
'developing': 'Rozwijający się',
'established': 'Dojrzały',
'advanced': 'Zaawansowany',
}
return labels.get(level, 'Nieznany')
class ITAuditService:
"""Service for IT infrastructure auditing and collaboration matching"""
def __init__(self, db: Session):
"""
Initialize IT Audit service.
Args:
db: SQLAlchemy database session
"""
self.db = db
# === Scoring Methods ===
def _calculate_security_score(self, audit_data: dict) -> int:
"""
Calculate security score from audit data.
Security elements and their point values (max 65 points):
- has_edr: +15 pts (EDR solution - highest security value)
- has_mfa: +10 pts (Multi-factor authentication)
- has_firewall: +10 pts (Network firewall)
- has_backup: +10 pts (Backup solution)
- has_dr_plan: +10 pts (Disaster recovery plan)
- has_vpn: +5 pts (VPN solution)
- has_monitoring: +5 pts (Monitoring solution)
The raw score (0-65) is returned directly. When calculating
the overall score, this is weighted at 50% and normalized.
Args:
audit_data: Dictionary with audit field values
Returns:
Raw security score (0-65)
"""
score = 0
# EDR - highest security value
if audit_data.get('has_edr'):
score += SECURITY_SCORES['has_edr']
# MFA - critical for identity security
if audit_data.get('has_mfa'):
score += SECURITY_SCORES['has_mfa']
# Firewall (check for brand name presence or explicit flag)
if audit_data.get('network_firewall_brand') or audit_data.get('has_firewall'):
score += SECURITY_SCORES['has_firewall']
# Backup solution (check for solution name or explicit flag)
if audit_data.get('backup_solution') or audit_data.get('has_backup'):
score += SECURITY_SCORES['has_backup']
# DR plan - disaster recovery readiness
if audit_data.get('has_dr_plan'):
score += SECURITY_SCORES['has_dr_plan']
# VPN - secure remote access
if audit_data.get('has_vpn'):
score += SECURITY_SCORES['has_vpn']
# Monitoring (check for solution name or explicit flag)
if audit_data.get('monitoring_solution') or audit_data.get('has_monitoring'):
score += SECURITY_SCORES['has_monitoring']
return score
def _calculate_collaboration_score(self, audit_data: dict) -> int:
"""
Calculate collaboration readiness score from audit data.
Collaboration elements and their point values:
- Each open_to_* flag: +10 pts (max 60 from flags)
- has_azure_ad: +10 pts (enables Teams federation)
- has_m365: +10 pts (enables shared licensing)
- has_proxmox_pbs: +10 pts (enables backup replication)
- has_zabbix: +10 pts (enables shared monitoring)
Max raw: 100 points
Args:
audit_data: Dictionary with audit field values
Returns:
Collaboration score (0-100)
"""
raw_score = 0
# Collaboration flags
collaboration_flags = [
'open_to_shared_licensing',
'open_to_backup_replication',
'open_to_teams_federation',
'open_to_shared_monitoring',
'open_to_collective_purchasing',
'open_to_knowledge_sharing',
]
for flag in collaboration_flags:
if audit_data.get(flag):
raw_score += COLLABORATION_SCORES[flag]
# Technology bonuses
if audit_data.get('has_azure_ad'):
raw_score += COLLABORATION_SCORES['has_azure_ad']
if audit_data.get('has_m365'):
raw_score += COLLABORATION_SCORES['has_m365']
if audit_data.get('has_proxmox_pbs'):
raw_score += COLLABORATION_SCORES['has_proxmox_pbs']
# Zabbix check (can be in monitoring_solution or zabbix_integration)
monitoring = audit_data.get('monitoring_solution', '').lower() if audit_data.get('monitoring_solution') else ''
has_zabbix = audit_data.get('has_zabbix') or 'zabbix' in monitoring
if has_zabbix:
raw_score += COLLABORATION_SCORES['has_zabbix']
# Cap at 100
return min(raw_score, 100)
def _calculate_completeness_score(self, audit_data: dict) -> int:
"""
Calculate completeness score based on filled fields with weights.
Args:
audit_data: Dictionary with audit field values
Returns:
Completeness percentage (0-100)
"""
total_weight = sum(COMPLETENESS_WEIGHTS.values())
achieved_weight = 0
for field_name, weight in COMPLETENESS_WEIGHTS.items():
value = audit_data.get(field_name)
# Check if field has a meaningful value
if value is not None:
if isinstance(value, bool):
# Boolean fields count if True
if value:
achieved_weight += weight
elif isinstance(value, str):
# String fields count if non-empty
if value.strip():
achieved_weight += weight
elif isinstance(value, (list, dict)):
# List/dict fields count if non-empty
if len(value) > 0:
achieved_weight += weight
elif isinstance(value, (int, float)):
# Numeric fields count if > 0
if value > 0:
achieved_weight += weight
else:
# Any other truthy value counts
achieved_weight += weight
if total_weight == 0:
return 0
return round(achieved_weight / total_weight * 100)
def calculate_scores(self, audit_data: dict) -> ITAuditResult:
"""
Calculate all scores from form data.
Overall score formula:
- Security: 50% weight (normalized from 0-65 to 0-100)
- Collaboration: 30% weight (already 0-100)
- Completeness: 20% weight (already 0-100)
Args:
audit_data: Dictionary with all form field values
Returns:
ITAuditResult with all scores and maturity level
"""
company_id = audit_data.get('company_id', 0)
# Calculate component scores
security_score = self._calculate_security_score(audit_data) # Returns 0-65
collaboration_score = self._calculate_collaboration_score(audit_data) # Returns 0-100
completeness_score = self._calculate_completeness_score(audit_data) # Returns 0-100
# Normalize security score to 0-100 for overall calculation
security_normalized = (security_score / SECURITY_MAX_RAW) * 100
# Calculate weighted overall score
overall_score = round(
security_normalized * 0.50 +
collaboration_score * 0.30 +
completeness_score * 0.20
)
# Determine maturity level
maturity_level = get_maturity_level(overall_score)
# Build result
result = ITAuditResult(
company_id=company_id,
overall_score=overall_score,
security_score=security_score,
collaboration_score=collaboration_score,
completeness_score=completeness_score,
maturity_level=maturity_level,
has_azure_ad=bool(audit_data.get('has_azure_ad')),
has_m365=bool(audit_data.get('has_m365')),
has_proxmox_pbs=bool(audit_data.get('has_proxmox_pbs')),
has_zabbix=self._has_zabbix(audit_data),
has_edr=bool(audit_data.get('has_edr')),
has_mfa=bool(audit_data.get('has_mfa')),
)
return result
def _has_zabbix(self, audit_data: dict) -> bool:
"""Check if company uses Zabbix monitoring."""
monitoring = audit_data.get('monitoring_solution', '').lower() if audit_data.get('monitoring_solution') else ''
return audit_data.get('has_zabbix') or 'zabbix' in monitoring
# === CRUD Methods ===
def save_audit(self, company_id: int, audit_data: dict) -> ITAudit:
"""
Save audit to database.
Args:
company_id: Company ID to save audit for
audit_data: Dictionary with all form field values
Returns:
Saved ITAudit record
"""
# Calculate scores
audit_data['company_id'] = company_id
result = self.calculate_scores(audit_data)
# Create audit record
audit = ITAudit(
company_id=company_id,
audit_date=datetime.now(),
audit_source=audit_data.get('audit_source', 'form'),
audited_by=audit_data.get('audited_by'),
# Scores
overall_score=result.overall_score,
security_score=result.security_score,
collaboration_score=result.collaboration_score,
completeness_score=result.completeness_score,
maturity_level=result.maturity_level,
# Cloud & Identity
has_azure_ad=audit_data.get('has_azure_ad', False),
azure_tenant_name=audit_data.get('azure_tenant_name'),
azure_user_count=audit_data.get('azure_user_count'),
has_m365=audit_data.get('has_m365', False),
m365_plans=audit_data.get('m365_plans'),
teams_usage=audit_data.get('teams_usage'),
has_google_workspace=audit_data.get('has_google_workspace', False),
# Infrastructure
server_count=audit_data.get('server_count'),
server_types=audit_data.get('server_types'),
virtualization_platform=audit_data.get('virtualization_platform'),
server_os=audit_data.get('server_os'),
network_firewall_brand=audit_data.get('network_firewall_brand'),
# Endpoints
employee_count=audit_data.get('employee_count'),
computer_count=audit_data.get('computer_count'),
desktop_os=audit_data.get('desktop_os'),
has_mdm=audit_data.get('has_mdm', False),
mdm_solution=audit_data.get('mdm_solution'),
# Security
antivirus_solution=audit_data.get('antivirus_solution'),
has_edr=audit_data.get('has_edr', False),
edr_solution=audit_data.get('edr_solution'),
has_vpn=audit_data.get('has_vpn', False),
vpn_solution=audit_data.get('vpn_solution'),
has_mfa=audit_data.get('has_mfa', False),
mfa_scope=audit_data.get('mfa_scope'),
# Backup & DR
backup_solution=audit_data.get('backup_solution'),
backup_targets=audit_data.get('backup_targets'),
backup_frequency=audit_data.get('backup_frequency'),
has_proxmox_pbs=audit_data.get('has_proxmox_pbs', False),
has_dr_plan=audit_data.get('has_dr_plan', False),
# Monitoring
monitoring_solution=audit_data.get('monitoring_solution'),
zabbix_integration=audit_data.get('zabbix_integration'),
# Business Apps
ticketing_system=audit_data.get('ticketing_system'),
erp_system=audit_data.get('erp_system'),
crm_system=audit_data.get('crm_system'),
# Active Directory
has_local_ad=audit_data.get('has_local_ad', False),
ad_domain_name=audit_data.get('ad_domain_name'),
has_ad_azure_sync=audit_data.get('has_ad_azure_sync', False),
# Collaboration Flags
open_to_shared_licensing=audit_data.get('open_to_shared_licensing', False),
open_to_backup_replication=audit_data.get('open_to_backup_replication', False),
open_to_teams_federation=audit_data.get('open_to_teams_federation', False),
open_to_shared_monitoring=audit_data.get('open_to_shared_monitoring', False),
open_to_collective_purchasing=audit_data.get('open_to_collective_purchasing', False),
open_to_knowledge_sharing=audit_data.get('open_to_knowledge_sharing', False),
# IT Contact
it_contact_name=audit_data.get('it_contact_name'),
it_contact_email=audit_data.get('it_contact_email'),
has_it_manager=audit_data.get('has_it_manager', False),
it_outsourced=audit_data.get('it_outsourced', False),
it_provider_name=audit_data.get('it_provider_name'),
# Raw Data
form_data=audit_data,
recommendations=result.recommendations if result.recommendations else None,
audit_errors=result.audit_errors,
)
self.db.add(audit)
self.db.commit()
self.db.refresh(audit)
logger.info(
f"IT audit saved for company {company_id}: "
f"overall={result.overall_score}, security={result.security_score}, "
f"collaboration={result.collaboration_score}, completeness={result.completeness_score}"
)
return audit
def get_latest_audit(self, company_id: int) -> Optional[ITAudit]:
"""
Get the most recent audit for a company.
Args:
company_id: Company ID
Returns:
Latest ITAudit or None
"""
return self.db.query(ITAudit).filter(
ITAudit.company_id == company_id
).order_by(ITAudit.audit_date.desc()).first()
def get_audit_history(self, company_id: int, limit: int = 10) -> List[ITAudit]:
"""
Get audit history for a company.
Args:
company_id: Company ID
limit: Maximum number of audits to return
Returns:
List of ITAudit records ordered by date descending
"""
return self.db.query(ITAudit).filter(
ITAudit.company_id == company_id
).order_by(ITAudit.audit_date.desc()).limit(limit).all()
# === Collaboration Matching Methods ===
def find_collaboration_matches(self, company_id: int) -> List[CollaborationMatch]:
"""
Find potential collaboration partners for a company.
Match types:
- shared_licensing: Both have M365 with same plans
- backup_replication: Both have Proxmox PBS + open flag
- teams_federation: Both have Azure AD + open flag
- shared_monitoring: Both use Zabbix + open flag
- collective_purchasing: Similar company size + open flag
- knowledge_sharing: Similar tech stack + open flag
Args:
company_id: Company ID to find matches for
Returns:
List of CollaborationMatch objects
"""
# Get the company's audit
company_audit = self.get_latest_audit(company_id)
if not company_audit:
return []
company = self.db.query(Company).filter(Company.id == company_id).first()
if not company:
return []
# Get all other companies with audits
other_audits = self.db.query(ITAudit).filter(
ITAudit.company_id != company_id
).order_by(ITAudit.audit_date.desc()).all()
# Deduplicate by company_id (keep latest audit per company)
seen_companies = set()
latest_audits = []
for audit in other_audits:
if audit.company_id not in seen_companies:
seen_companies.add(audit.company_id)
latest_audits.append(audit)
matches = []
for other_audit in latest_audits:
other_company = self.db.query(Company).filter(
Company.id == other_audit.company_id
).first()
if not other_company:
continue
# Check each match type
match_list = self._check_all_match_types(
company, company_audit, other_company, other_audit
)
matches.extend(match_list)
return matches
def _check_all_match_types(
self,
company_a: Company,
audit_a: ITAudit,
company_b: Company,
audit_b: ITAudit
) -> List[CollaborationMatch]:
"""Check all match types between two companies."""
matches = []
# Shared Licensing (M365)
if (audit_a.has_m365 and audit_b.has_m365 and
audit_a.open_to_shared_licensing and audit_b.open_to_shared_licensing):
matches.append(CollaborationMatch(
company_a_id=company_a.id,
company_b_id=company_b.id,
company_a_name=company_a.name,
company_b_name=company_b.name,
match_type='shared_licensing',
match_reason='Obie firmy korzystają z Microsoft 365 i są otwarte na wspólne licencjonowanie',
match_score=80,
shared_attributes={'m365_plans_a': audit_a.m365_plans, 'm365_plans_b': audit_b.m365_plans}
))
# Backup Replication (Proxmox PBS)
if (audit_a.has_proxmox_pbs and audit_b.has_proxmox_pbs and
audit_a.open_to_backup_replication and audit_b.open_to_backup_replication):
matches.append(CollaborationMatch(
company_a_id=company_a.id,
company_b_id=company_b.id,
company_a_name=company_a.name,
company_b_name=company_b.name,
match_type='backup_replication',
match_reason='Obie firmy używają Proxmox PBS i są otwarte na replikację backupów',
match_score=90,
shared_attributes={'pbs': True}
))
# Teams Federation (Azure AD)
if (audit_a.has_azure_ad and audit_b.has_azure_ad and
audit_a.open_to_teams_federation and audit_b.open_to_teams_federation):
matches.append(CollaborationMatch(
company_a_id=company_a.id,
company_b_id=company_b.id,
company_a_name=company_a.name,
company_b_name=company_b.name,
match_type='teams_federation',
match_reason='Obie firmy mają Azure AD i są otwarte na federację Teams',
match_score=85,
shared_attributes={
'tenant_a': audit_a.azure_tenant_name,
'tenant_b': audit_b.azure_tenant_name
}
))
# Shared Monitoring (Zabbix)
monitoring_a = (audit_a.monitoring_solution or '').lower()
monitoring_b = (audit_b.monitoring_solution or '').lower()
has_zabbix_a = 'zabbix' in monitoring_a
has_zabbix_b = 'zabbix' in monitoring_b
if (has_zabbix_a and has_zabbix_b and
audit_a.open_to_shared_monitoring and audit_b.open_to_shared_monitoring):
matches.append(CollaborationMatch(
company_a_id=company_a.id,
company_b_id=company_b.id,
company_a_name=company_a.name,
company_b_name=company_b.name,
match_type='shared_monitoring',
match_reason='Obie firmy używają Zabbix i są otwarte na wspólny monitoring',
match_score=75,
shared_attributes={'monitoring': 'zabbix'}
))
# Collective Purchasing (similar size)
if (audit_a.open_to_collective_purchasing and audit_b.open_to_collective_purchasing):
# Compare employee count ranges
size_a = self._parse_count_range(audit_a.employee_count)
size_b = self._parse_count_range(audit_b.employee_count)
if size_a and size_b and self._sizes_similar(size_a, size_b):
matches.append(CollaborationMatch(
company_a_id=company_a.id,
company_b_id=company_b.id,
company_a_name=company_a.name,
company_b_name=company_b.name,
match_type='collective_purchasing',
match_reason='Firmy o podobnej wielkości, otwarte na zakupy grupowe',
match_score=70,
shared_attributes={
'size_a': audit_a.employee_count,
'size_b': audit_b.employee_count
}
))
# Knowledge Sharing (similar tech stack)
if (audit_a.open_to_knowledge_sharing and audit_b.open_to_knowledge_sharing):
common_tech = self._find_common_tech(audit_a, audit_b)
if len(common_tech) >= 2:
matches.append(CollaborationMatch(
company_a_id=company_a.id,
company_b_id=company_b.id,
company_a_name=company_a.name,
company_b_name=company_b.name,
match_type='knowledge_sharing',
match_reason=f'Wspólny stack technologiczny: {", ".join(common_tech)}',
match_score=65,
shared_attributes={'common_tech': common_tech}
))
return matches
def _parse_count_range(self, count_str: Optional[str]) -> Optional[int]:
"""Parse employee count string to approximate number."""
if not count_str:
return None
# Handle ranges like "1-10", "11-50", etc.
ranges = {
'1-10': 5,
'11-50': 30,
'51-100': 75,
'101-250': 175,
'251-500': 375,
'500+': 750,
}
return ranges.get(count_str, None)
def _sizes_similar(self, size_a: int, size_b: int) -> bool:
"""Check if two company sizes are similar (within 2x)."""
if size_a == 0 or size_b == 0:
return False
ratio = max(size_a, size_b) / min(size_a, size_b)
return ratio <= 3 # Within 3x difference
def _find_common_tech(self, audit_a: ITAudit, audit_b: ITAudit) -> List[str]:
"""Find common technologies between two audits."""
common = []
# Check virtualization platform
if audit_a.virtualization_platform and audit_b.virtualization_platform:
if audit_a.virtualization_platform.lower() == audit_b.virtualization_platform.lower():
common.append(audit_a.virtualization_platform)
# Check backup solution
if audit_a.backup_solution and audit_b.backup_solution:
if audit_a.backup_solution.lower() == audit_b.backup_solution.lower():
common.append(f"Backup: {audit_a.backup_solution}")
# Check firewall brand
if audit_a.network_firewall_brand and audit_b.network_firewall_brand:
if audit_a.network_firewall_brand.lower() == audit_b.network_firewall_brand.lower():
common.append(f"Firewall: {audit_a.network_firewall_brand}")
# Check ERP
if audit_a.erp_system and audit_b.erp_system:
if audit_a.erp_system.lower() == audit_b.erp_system.lower():
common.append(f"ERP: {audit_a.erp_system}")
# Check cloud platforms
if audit_a.has_azure_ad and audit_b.has_azure_ad:
common.append("Azure AD")
if audit_a.has_m365 and audit_b.has_m365:
common.append("Microsoft 365")
return common
def save_collaboration_match(self, match: CollaborationMatch) -> ITCollaborationMatch:
"""
Save a collaboration match to the database.
Args:
match: CollaborationMatch to save
Returns:
Saved ITCollaborationMatch record
"""
db_match = ITCollaborationMatch(
company_a_id=match.company_a_id,
company_b_id=match.company_b_id,
match_type=match.match_type,
match_reason=match.match_reason,
match_score=match.match_score,
status='suggested',
shared_attributes=match.shared_attributes,
)
self.db.add(db_match)
self.db.commit()
self.db.refresh(db_match)
logger.info(
f"Collaboration match saved: {match.company_a_name} <-> {match.company_b_name} "
f"({match.match_type})"
)
return db_match
def get_matches_for_company(self, company_id: int) -> List[ITCollaborationMatch]:
"""
Get all collaboration matches for a company.
Args:
company_id: Company ID
Returns:
List of ITCollaborationMatch records where company is either A or B
"""
return self.db.query(ITCollaborationMatch).filter(
(ITCollaborationMatch.company_a_id == company_id) |
(ITCollaborationMatch.company_b_id == company_id)
).order_by(ITCollaborationMatch.match_score.desc()).all()
# === Convenience Functions ===
def audit_company(db: Session, company_id: int, audit_data: dict) -> ITAudit:
"""
Audit a company's IT infrastructure.
Args:
db: Database session
company_id: Company ID to audit
audit_data: Dictionary with form field values
Returns:
Saved ITAudit record
"""
service = ITAuditService(db)
return service.save_audit(company_id, audit_data)
def get_company_audit(db: Session, company_id: int) -> Optional[ITAudit]:
"""
Get the latest audit for a company.
Args:
db: Database session
company_id: Company ID
Returns:
Latest ITAudit or None
"""
service = ITAuditService(db)
return service.get_latest_audit(company_id)
def calculate_scores(audit_data: dict) -> ITAuditResult:
"""
Calculate IT audit scores without saving.
Args:
audit_data: Dictionary with form field values
Returns:
ITAuditResult with all scores
"""
# Create a temporary service without DB connection
service = ITAuditService.__new__(ITAuditService)
return service.calculate_scores(audit_data)
def get_company_audit_history(db: Session, company_id: int, limit: int = 10) -> List[ITAudit]:
"""
Get audit history for a company.
Args:
db: Database session
company_id: Company ID
limit: Maximum number of audits to return
Returns:
List of ITAudit records ordered by date descending
"""
service = ITAuditService(db)
return service.get_audit_history(company_id, limit)
def has_company_audit(db: Session, company_id: int) -> bool:
"""
Check if a company has any IT audit.
Args:
db: Database session
company_id: Company ID
Returns:
True if company has at least one audit, False otherwise
"""
return db.query(ITAudit).filter(ITAudit.company_id == company_id).first() is not None
# === Main for Testing ===
if __name__ == '__main__':
import sys
logging.basicConfig(level=logging.INFO)
# Test scoring calculation
print("Testing IT Audit Service")
print("-" * 50)
# Test security scoring
test_data = {
'has_edr': True,
'has_mfa': True,
'network_firewall_brand': 'Fortinet',
'backup_solution': 'Veeam',
'has_dr_plan': True,
'has_vpn': True,
'monitoring_solution': 'Zabbix',
}
service = ITAuditService.__new__(ITAuditService)
security_score = service._calculate_security_score(test_data)
print(f"Security score (all elements): {security_score}/100")
# Test with minimal data
minimal_data = {'has_edr': True, 'has_mfa': True}
security_minimal = service._calculate_security_score(minimal_data)
print(f"Security score (EDR + MFA only): {security_minimal}/100")
# Test collaboration scoring
collab_data = {
'has_azure_ad': True,
'has_m365': True,
'open_to_shared_licensing': True,
'open_to_teams_federation': True,
}
collab_score = service._calculate_collaboration_score(collab_data)
print(f"Collaboration score: {collab_score}/100")
# Test maturity levels
print("\nMaturity levels:")
for score in [25, 50, 70, 90]:
level = get_maturity_level(score)
label = get_maturity_level_label(level)
print(f" Score {score}: {level} ({label})")
print("\n" + "-" * 50)
print("IT Audit Service loaded successfully!")