auto-claude: subtask-2-1 - Create it_audit_service.py with ITAuditResult dataclass
Added IT audit service module with:
- ITAuditResult dataclass with all required fields (overall_score, security_score,
collaboration_score, completeness_score, maturity_level, fields, recommendations)
- FieldStatus dataclass for individual field status tracking
- CollaborationMatch dataclass for collaboration matching results
- ITAuditService class with complete structure:
- Scoring methods: _calculate_security_score, _calculate_collaboration_score,
_calculate_completeness_score, calculate_scores
- CRUD methods: save_audit, get_latest_audit, get_audit_history
- Matching methods: find_collaboration_matches, save_collaboration_match,
get_matches_for_company
- Helper functions: get_maturity_level, get_maturity_level_label
- Scoring configuration constants following spec:
- Security (50% weight): EDR, MFA, firewall, backup, DR plan, VPN, monitoring
- Collaboration (30% weight): 6 flags + Azure AD, M365, PBS, Zabbix bonuses
- Completeness (20% weight): weighted field completion percentage
- Maturity levels: basic (0-39), developing (40-59), established (60-79), advanced (80-100)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
23b6425861
commit
e929a9e825
962
it_audit_service.py
Normal file
962
it_audit_service.py
Normal file
@ -0,0 +1,962 @@
|
||||
"""
|
||||
IT Audit Service for Norda Biznes Hub
|
||||
======================================
|
||||
|
||||
IT infrastructure audit service with:
|
||||
- Security posture assessment (50% weight)
|
||||
- Collaboration readiness scoring (30% weight)
|
||||
- Completeness scoring (20% weight)
|
||||
- Maturity level classification
|
||||
- Cross-company collaboration matching
|
||||
|
||||
Author: Norda Biznes Development Team
|
||||
Created: 2026-01-09
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from database import Company, ITAudit, ITCollaborationMatch, SessionLocal
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# === Scoring Configuration ===
|
||||
|
||||
# Security score elements (max 65 raw points, normalized to 100, then weighted 50%)
|
||||
SECURITY_SCORES = {
|
||||
'has_edr': 15, # EDR solution - highest security value
|
||||
'has_mfa': 10, # Multi-factor authentication
|
||||
'has_firewall': 10, # Network firewall
|
||||
'has_backup': 10, # Backup solution
|
||||
'has_dr_plan': 10, # Disaster recovery plan
|
||||
'has_vpn': 5, # VPN solution
|
||||
'has_monitoring': 5, # Monitoring solution
|
||||
}
|
||||
SECURITY_MAX_RAW = 65
|
||||
|
||||
# Collaboration score elements (max 100 raw points, weighted 30%)
|
||||
COLLABORATION_SCORES = {
|
||||
'open_to_shared_licensing': 10, # Open to shared M365/software licensing
|
||||
'open_to_backup_replication': 10, # Open to backup replication partnership
|
||||
'open_to_teams_federation': 10, # Open to Teams federation
|
||||
'open_to_shared_monitoring': 10, # Open to shared Zabbix monitoring
|
||||
'open_to_collective_purchasing': 10, # Open to collective purchasing
|
||||
'open_to_knowledge_sharing': 10, # Open to knowledge exchange
|
||||
'has_azure_ad': 10, # Azure AD (enables federation)
|
||||
'has_m365': 10, # Microsoft 365 (enables shared licensing)
|
||||
'has_proxmox_pbs': 10, # Proxmox PBS (enables backup replication)
|
||||
'has_zabbix': 10, # Zabbix monitoring (enables shared monitoring)
|
||||
}
|
||||
COLLABORATION_MAX_RAW = 100
|
||||
|
||||
# Completeness weights for different field categories
|
||||
COMPLETENESS_WEIGHTS = {
|
||||
# IT Contact (weight: 10)
|
||||
'it_contact_name': 3,
|
||||
'it_contact_email': 3,
|
||||
'has_it_manager': 2,
|
||||
'it_provider_name': 2,
|
||||
|
||||
# Cloud & Identity (weight: 15)
|
||||
'has_azure_ad': 4,
|
||||
'azure_tenant_name': 3,
|
||||
'azure_user_count': 3,
|
||||
'has_m365': 3,
|
||||
'has_google_workspace': 2,
|
||||
|
||||
# Server Infrastructure (weight: 15)
|
||||
'server_count': 3,
|
||||
'server_types': 3,
|
||||
'virtualization_platform': 3,
|
||||
'server_os': 3,
|
||||
'network_firewall_brand': 3,
|
||||
|
||||
# Endpoints (weight: 10)
|
||||
'employee_count': 3,
|
||||
'computer_count': 3,
|
||||
'desktop_os': 2,
|
||||
'mdm_solution': 2,
|
||||
|
||||
# Security (weight: 20)
|
||||
'antivirus_solution': 4,
|
||||
'has_edr': 4,
|
||||
'has_vpn': 4,
|
||||
'has_mfa': 4,
|
||||
'mfa_scope': 4,
|
||||
|
||||
# Backup & DR (weight: 15)
|
||||
'backup_solution': 4,
|
||||
'backup_targets': 3,
|
||||
'backup_frequency': 3,
|
||||
'has_proxmox_pbs': 3,
|
||||
'has_dr_plan': 2,
|
||||
|
||||
# Monitoring (weight: 5)
|
||||
'monitoring_solution': 3,
|
||||
'zabbix_integration': 2,
|
||||
|
||||
# Business Apps (weight: 5)
|
||||
'ticketing_system': 2,
|
||||
'erp_system': 2,
|
||||
'crm_system': 1,
|
||||
|
||||
# Collaboration (weight: 5)
|
||||
'open_to_shared_licensing': 1,
|
||||
'open_to_backup_replication': 1,
|
||||
'open_to_teams_federation': 1,
|
||||
'open_to_shared_monitoring': 1,
|
||||
'open_to_collective_purchasing': 1,
|
||||
'open_to_knowledge_sharing': 0, # Bonus, not required for completeness
|
||||
}
|
||||
|
||||
# Maturity level thresholds
|
||||
MATURITY_LEVELS = {
|
||||
'basic': (0, 39),
|
||||
'developing': (40, 59),
|
||||
'established': (60, 79),
|
||||
'advanced': (80, 100),
|
||||
}
|
||||
|
||||
# Collaboration match types with Polish labels
|
||||
MATCH_TYPES = {
|
||||
'shared_licensing': 'Wspólne licencje',
|
||||
'backup_replication': 'Replikacja backupów',
|
||||
'teams_federation': 'Federacja Teams',
|
||||
'shared_monitoring': 'Wspólny monitoring',
|
||||
'collective_purchasing': 'Zakupy grupowe',
|
||||
'knowledge_sharing': 'Wymiana wiedzy',
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class FieldStatus:
|
||||
"""Status of a single IT audit field"""
|
||||
field_name: str
|
||||
status: str # 'complete', 'partial', 'missing'
|
||||
value: Optional[Any] = None
|
||||
score: float = 0.0
|
||||
max_score: float = 0.0
|
||||
recommendation: Optional[str] = None
|
||||
details: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ITAuditResult:
|
||||
"""Complete IT audit result"""
|
||||
company_id: int
|
||||
overall_score: int
|
||||
security_score: int
|
||||
collaboration_score: int
|
||||
completeness_score: int
|
||||
maturity_level: str # basic, developing, established, advanced
|
||||
fields: Dict[str, FieldStatus] = field(default_factory=dict)
|
||||
recommendations: List[Dict[str, Any]] = field(default_factory=list)
|
||||
audit_errors: Optional[str] = None
|
||||
|
||||
# Section summaries
|
||||
section_scores: Dict[str, int] = field(default_factory=dict)
|
||||
|
||||
# Technology flags for quick access
|
||||
has_azure_ad: bool = False
|
||||
has_m365: bool = False
|
||||
has_proxmox_pbs: bool = False
|
||||
has_zabbix: bool = False
|
||||
has_edr: bool = False
|
||||
has_mfa: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class CollaborationMatch:
|
||||
"""A potential collaboration match between two companies"""
|
||||
company_a_id: int
|
||||
company_b_id: int
|
||||
company_a_name: str
|
||||
company_b_name: str
|
||||
match_type: str
|
||||
match_reason: str
|
||||
match_score: int
|
||||
shared_attributes: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
def get_maturity_level(score: int) -> str:
|
||||
"""
|
||||
Get maturity level label based on overall score.
|
||||
|
||||
Args:
|
||||
score: Overall score (0-100)
|
||||
|
||||
Returns:
|
||||
Maturity level: 'basic', 'developing', 'established', or 'advanced'
|
||||
"""
|
||||
for level, (min_score, max_score) in MATURITY_LEVELS.items():
|
||||
if min_score <= score <= max_score:
|
||||
return level
|
||||
return 'basic'
|
||||
|
||||
|
||||
def get_maturity_level_label(level: str) -> str:
|
||||
"""
|
||||
Get Polish label for maturity level.
|
||||
|
||||
Args:
|
||||
level: Maturity level key
|
||||
|
||||
Returns:
|
||||
Polish label for the maturity level
|
||||
"""
|
||||
labels = {
|
||||
'basic': 'Podstawowy',
|
||||
'developing': 'Rozwijający się',
|
||||
'established': 'Dojrzały',
|
||||
'advanced': 'Zaawansowany',
|
||||
}
|
||||
return labels.get(level, 'Nieznany')
|
||||
|
||||
|
||||
class ITAuditService:
|
||||
"""Service for IT infrastructure auditing and collaboration matching"""
|
||||
|
||||
def __init__(self, db: Session):
|
||||
"""
|
||||
Initialize IT Audit service.
|
||||
|
||||
Args:
|
||||
db: SQLAlchemy database session
|
||||
"""
|
||||
self.db = db
|
||||
|
||||
# === Scoring Methods ===
|
||||
|
||||
def _calculate_security_score(self, audit_data: dict) -> int:
|
||||
"""
|
||||
Calculate security score from audit data.
|
||||
|
||||
Security elements and their point values:
|
||||
- has_edr: +15 pts
|
||||
- has_mfa: +10 pts
|
||||
- has_firewall (network_firewall_brand): +10 pts
|
||||
- has_backup (backup_solution): +10 pts
|
||||
- has_dr_plan: +10 pts
|
||||
- has_vpn: +5 pts
|
||||
- has_monitoring (monitoring_solution): +5 pts
|
||||
|
||||
Max raw: 65 points, normalized to 100
|
||||
|
||||
Args:
|
||||
audit_data: Dictionary with audit field values
|
||||
|
||||
Returns:
|
||||
Normalized security score (0-100)
|
||||
"""
|
||||
raw_score = 0
|
||||
|
||||
# EDR
|
||||
if audit_data.get('has_edr'):
|
||||
raw_score += SECURITY_SCORES['has_edr']
|
||||
|
||||
# MFA
|
||||
if audit_data.get('has_mfa'):
|
||||
raw_score += SECURITY_SCORES['has_mfa']
|
||||
|
||||
# Firewall (check for brand name presence)
|
||||
if audit_data.get('network_firewall_brand') or audit_data.get('has_firewall'):
|
||||
raw_score += SECURITY_SCORES['has_firewall']
|
||||
|
||||
# Backup solution
|
||||
if audit_data.get('backup_solution') or audit_data.get('has_backup'):
|
||||
raw_score += SECURITY_SCORES['has_backup']
|
||||
|
||||
# DR plan
|
||||
if audit_data.get('has_dr_plan'):
|
||||
raw_score += SECURITY_SCORES['has_dr_plan']
|
||||
|
||||
# VPN
|
||||
if audit_data.get('has_vpn'):
|
||||
raw_score += SECURITY_SCORES['has_vpn']
|
||||
|
||||
# Monitoring
|
||||
if audit_data.get('monitoring_solution') or audit_data.get('has_monitoring'):
|
||||
raw_score += SECURITY_SCORES['has_monitoring']
|
||||
|
||||
# Normalize to 0-100
|
||||
normalized = min(round(raw_score / SECURITY_MAX_RAW * 100), 100)
|
||||
return normalized
|
||||
|
||||
def _calculate_collaboration_score(self, audit_data: dict) -> int:
|
||||
"""
|
||||
Calculate collaboration readiness score from audit data.
|
||||
|
||||
Collaboration elements and their point values:
|
||||
- Each open_to_* flag: +10 pts (max 60 from flags)
|
||||
- has_azure_ad: +10 pts (enables Teams federation)
|
||||
- has_m365: +10 pts (enables shared licensing)
|
||||
- has_proxmox_pbs: +10 pts (enables backup replication)
|
||||
- has_zabbix: +10 pts (enables shared monitoring)
|
||||
|
||||
Max raw: 100 points
|
||||
|
||||
Args:
|
||||
audit_data: Dictionary with audit field values
|
||||
|
||||
Returns:
|
||||
Collaboration score (0-100)
|
||||
"""
|
||||
raw_score = 0
|
||||
|
||||
# Collaboration flags
|
||||
collaboration_flags = [
|
||||
'open_to_shared_licensing',
|
||||
'open_to_backup_replication',
|
||||
'open_to_teams_federation',
|
||||
'open_to_shared_monitoring',
|
||||
'open_to_collective_purchasing',
|
||||
'open_to_knowledge_sharing',
|
||||
]
|
||||
|
||||
for flag in collaboration_flags:
|
||||
if audit_data.get(flag):
|
||||
raw_score += COLLABORATION_SCORES[flag]
|
||||
|
||||
# Technology bonuses
|
||||
if audit_data.get('has_azure_ad'):
|
||||
raw_score += COLLABORATION_SCORES['has_azure_ad']
|
||||
|
||||
if audit_data.get('has_m365'):
|
||||
raw_score += COLLABORATION_SCORES['has_m365']
|
||||
|
||||
if audit_data.get('has_proxmox_pbs'):
|
||||
raw_score += COLLABORATION_SCORES['has_proxmox_pbs']
|
||||
|
||||
# Zabbix check (can be in monitoring_solution or zabbix_integration)
|
||||
monitoring = audit_data.get('monitoring_solution', '').lower() if audit_data.get('monitoring_solution') else ''
|
||||
has_zabbix = audit_data.get('has_zabbix') or 'zabbix' in monitoring
|
||||
if has_zabbix:
|
||||
raw_score += COLLABORATION_SCORES['has_zabbix']
|
||||
|
||||
# Cap at 100
|
||||
return min(raw_score, 100)
|
||||
|
||||
def _calculate_completeness_score(self, audit_data: dict) -> int:
|
||||
"""
|
||||
Calculate completeness score based on filled fields with weights.
|
||||
|
||||
Args:
|
||||
audit_data: Dictionary with audit field values
|
||||
|
||||
Returns:
|
||||
Completeness percentage (0-100)
|
||||
"""
|
||||
total_weight = sum(COMPLETENESS_WEIGHTS.values())
|
||||
achieved_weight = 0
|
||||
|
||||
for field_name, weight in COMPLETENESS_WEIGHTS.items():
|
||||
value = audit_data.get(field_name)
|
||||
|
||||
# Check if field has a meaningful value
|
||||
if value is not None:
|
||||
if isinstance(value, bool):
|
||||
# Boolean fields count if True
|
||||
if value:
|
||||
achieved_weight += weight
|
||||
elif isinstance(value, str):
|
||||
# String fields count if non-empty
|
||||
if value.strip():
|
||||
achieved_weight += weight
|
||||
elif isinstance(value, (list, dict)):
|
||||
# List/dict fields count if non-empty
|
||||
if len(value) > 0:
|
||||
achieved_weight += weight
|
||||
elif isinstance(value, (int, float)):
|
||||
# Numeric fields count if > 0
|
||||
if value > 0:
|
||||
achieved_weight += weight
|
||||
else:
|
||||
# Any other truthy value counts
|
||||
achieved_weight += weight
|
||||
|
||||
if total_weight == 0:
|
||||
return 0
|
||||
|
||||
return round(achieved_weight / total_weight * 100)
|
||||
|
||||
def calculate_scores(self, audit_data: dict) -> ITAuditResult:
|
||||
"""
|
||||
Calculate all scores from form data.
|
||||
|
||||
Overall score formula:
|
||||
- Security: 50% weight
|
||||
- Collaboration: 30% weight
|
||||
- Completeness: 20% weight
|
||||
|
||||
Args:
|
||||
audit_data: Dictionary with all form field values
|
||||
|
||||
Returns:
|
||||
ITAuditResult with all scores and maturity level
|
||||
"""
|
||||
company_id = audit_data.get('company_id', 0)
|
||||
|
||||
# Calculate component scores
|
||||
security_score = self._calculate_security_score(audit_data)
|
||||
collaboration_score = self._calculate_collaboration_score(audit_data)
|
||||
completeness_score = self._calculate_completeness_score(audit_data)
|
||||
|
||||
# Calculate weighted overall score
|
||||
overall_score = round(
|
||||
security_score * 0.50 +
|
||||
collaboration_score * 0.30 +
|
||||
completeness_score * 0.20
|
||||
)
|
||||
|
||||
# Determine maturity level
|
||||
maturity_level = get_maturity_level(overall_score)
|
||||
|
||||
# Build result
|
||||
result = ITAuditResult(
|
||||
company_id=company_id,
|
||||
overall_score=overall_score,
|
||||
security_score=security_score,
|
||||
collaboration_score=collaboration_score,
|
||||
completeness_score=completeness_score,
|
||||
maturity_level=maturity_level,
|
||||
has_azure_ad=bool(audit_data.get('has_azure_ad')),
|
||||
has_m365=bool(audit_data.get('has_m365')),
|
||||
has_proxmox_pbs=bool(audit_data.get('has_proxmox_pbs')),
|
||||
has_zabbix=self._has_zabbix(audit_data),
|
||||
has_edr=bool(audit_data.get('has_edr')),
|
||||
has_mfa=bool(audit_data.get('has_mfa')),
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def _has_zabbix(self, audit_data: dict) -> bool:
|
||||
"""Check if company uses Zabbix monitoring."""
|
||||
monitoring = audit_data.get('monitoring_solution', '').lower() if audit_data.get('monitoring_solution') else ''
|
||||
return audit_data.get('has_zabbix') or 'zabbix' in monitoring
|
||||
|
||||
# === CRUD Methods ===
|
||||
|
||||
def save_audit(self, company_id: int, audit_data: dict) -> ITAudit:
|
||||
"""
|
||||
Save audit to database.
|
||||
|
||||
Args:
|
||||
company_id: Company ID to save audit for
|
||||
audit_data: Dictionary with all form field values
|
||||
|
||||
Returns:
|
||||
Saved ITAudit record
|
||||
"""
|
||||
# Calculate scores
|
||||
audit_data['company_id'] = company_id
|
||||
result = self.calculate_scores(audit_data)
|
||||
|
||||
# Create audit record
|
||||
audit = ITAudit(
|
||||
company_id=company_id,
|
||||
audit_date=datetime.now(),
|
||||
audit_source=audit_data.get('audit_source', 'form'),
|
||||
audited_by=audit_data.get('audited_by'),
|
||||
|
||||
# Scores
|
||||
overall_score=result.overall_score,
|
||||
security_score=result.security_score,
|
||||
collaboration_score=result.collaboration_score,
|
||||
completeness_score=result.completeness_score,
|
||||
maturity_level=result.maturity_level,
|
||||
|
||||
# Cloud & Identity
|
||||
has_azure_ad=audit_data.get('has_azure_ad', False),
|
||||
azure_tenant_name=audit_data.get('azure_tenant_name'),
|
||||
azure_user_count=audit_data.get('azure_user_count'),
|
||||
has_m365=audit_data.get('has_m365', False),
|
||||
m365_plans=audit_data.get('m365_plans'),
|
||||
teams_usage=audit_data.get('teams_usage'),
|
||||
has_google_workspace=audit_data.get('has_google_workspace', False),
|
||||
|
||||
# Infrastructure
|
||||
server_count=audit_data.get('server_count'),
|
||||
server_types=audit_data.get('server_types'),
|
||||
virtualization_platform=audit_data.get('virtualization_platform'),
|
||||
server_os=audit_data.get('server_os'),
|
||||
network_firewall_brand=audit_data.get('network_firewall_brand'),
|
||||
|
||||
# Endpoints
|
||||
employee_count=audit_data.get('employee_count'),
|
||||
computer_count=audit_data.get('computer_count'),
|
||||
desktop_os=audit_data.get('desktop_os'),
|
||||
has_mdm=audit_data.get('has_mdm', False),
|
||||
mdm_solution=audit_data.get('mdm_solution'),
|
||||
|
||||
# Security
|
||||
antivirus_solution=audit_data.get('antivirus_solution'),
|
||||
has_edr=audit_data.get('has_edr', False),
|
||||
edr_solution=audit_data.get('edr_solution'),
|
||||
has_vpn=audit_data.get('has_vpn', False),
|
||||
vpn_solution=audit_data.get('vpn_solution'),
|
||||
has_mfa=audit_data.get('has_mfa', False),
|
||||
mfa_scope=audit_data.get('mfa_scope'),
|
||||
|
||||
# Backup & DR
|
||||
backup_solution=audit_data.get('backup_solution'),
|
||||
backup_targets=audit_data.get('backup_targets'),
|
||||
backup_frequency=audit_data.get('backup_frequency'),
|
||||
has_proxmox_pbs=audit_data.get('has_proxmox_pbs', False),
|
||||
has_dr_plan=audit_data.get('has_dr_plan', False),
|
||||
|
||||
# Monitoring
|
||||
monitoring_solution=audit_data.get('monitoring_solution'),
|
||||
zabbix_integration=audit_data.get('zabbix_integration'),
|
||||
|
||||
# Business Apps
|
||||
ticketing_system=audit_data.get('ticketing_system'),
|
||||
erp_system=audit_data.get('erp_system'),
|
||||
crm_system=audit_data.get('crm_system'),
|
||||
|
||||
# Active Directory
|
||||
has_local_ad=audit_data.get('has_local_ad', False),
|
||||
ad_domain_name=audit_data.get('ad_domain_name'),
|
||||
has_ad_azure_sync=audit_data.get('has_ad_azure_sync', False),
|
||||
|
||||
# Collaboration Flags
|
||||
open_to_shared_licensing=audit_data.get('open_to_shared_licensing', False),
|
||||
open_to_backup_replication=audit_data.get('open_to_backup_replication', False),
|
||||
open_to_teams_federation=audit_data.get('open_to_teams_federation', False),
|
||||
open_to_shared_monitoring=audit_data.get('open_to_shared_monitoring', False),
|
||||
open_to_collective_purchasing=audit_data.get('open_to_collective_purchasing', False),
|
||||
open_to_knowledge_sharing=audit_data.get('open_to_knowledge_sharing', False),
|
||||
|
||||
# IT Contact
|
||||
it_contact_name=audit_data.get('it_contact_name'),
|
||||
it_contact_email=audit_data.get('it_contact_email'),
|
||||
has_it_manager=audit_data.get('has_it_manager', False),
|
||||
it_outsourced=audit_data.get('it_outsourced', False),
|
||||
it_provider_name=audit_data.get('it_provider_name'),
|
||||
|
||||
# Raw Data
|
||||
form_data=audit_data,
|
||||
recommendations=result.recommendations if result.recommendations else None,
|
||||
audit_errors=result.audit_errors,
|
||||
)
|
||||
|
||||
self.db.add(audit)
|
||||
self.db.commit()
|
||||
self.db.refresh(audit)
|
||||
|
||||
logger.info(
|
||||
f"IT audit saved for company {company_id}: "
|
||||
f"overall={result.overall_score}, security={result.security_score}, "
|
||||
f"collaboration={result.collaboration_score}, completeness={result.completeness_score}"
|
||||
)
|
||||
|
||||
return audit
|
||||
|
||||
def get_latest_audit(self, company_id: int) -> Optional[ITAudit]:
|
||||
"""
|
||||
Get the most recent audit for a company.
|
||||
|
||||
Args:
|
||||
company_id: Company ID
|
||||
|
||||
Returns:
|
||||
Latest ITAudit or None
|
||||
"""
|
||||
return self.db.query(ITAudit).filter(
|
||||
ITAudit.company_id == company_id
|
||||
).order_by(ITAudit.audit_date.desc()).first()
|
||||
|
||||
def get_audit_history(self, company_id: int, limit: int = 10) -> List[ITAudit]:
|
||||
"""
|
||||
Get audit history for a company.
|
||||
|
||||
Args:
|
||||
company_id: Company ID
|
||||
limit: Maximum number of audits to return
|
||||
|
||||
Returns:
|
||||
List of ITAudit records ordered by date descending
|
||||
"""
|
||||
return self.db.query(ITAudit).filter(
|
||||
ITAudit.company_id == company_id
|
||||
).order_by(ITAudit.audit_date.desc()).limit(limit).all()
|
||||
|
||||
# === Collaboration Matching Methods ===
|
||||
|
||||
def find_collaboration_matches(self, company_id: int) -> List[CollaborationMatch]:
|
||||
"""
|
||||
Find potential collaboration partners for a company.
|
||||
|
||||
Match types:
|
||||
- shared_licensing: Both have M365 with same plans
|
||||
- backup_replication: Both have Proxmox PBS + open flag
|
||||
- teams_federation: Both have Azure AD + open flag
|
||||
- shared_monitoring: Both use Zabbix + open flag
|
||||
- collective_purchasing: Similar company size + open flag
|
||||
- knowledge_sharing: Similar tech stack + open flag
|
||||
|
||||
Args:
|
||||
company_id: Company ID to find matches for
|
||||
|
||||
Returns:
|
||||
List of CollaborationMatch objects
|
||||
"""
|
||||
# Get the company's audit
|
||||
company_audit = self.get_latest_audit(company_id)
|
||||
if not company_audit:
|
||||
return []
|
||||
|
||||
company = self.db.query(Company).filter(Company.id == company_id).first()
|
||||
if not company:
|
||||
return []
|
||||
|
||||
# Get all other companies with audits
|
||||
other_audits = self.db.query(ITAudit).filter(
|
||||
ITAudit.company_id != company_id
|
||||
).order_by(ITAudit.audit_date.desc()).all()
|
||||
|
||||
# Deduplicate by company_id (keep latest audit per company)
|
||||
seen_companies = set()
|
||||
latest_audits = []
|
||||
for audit in other_audits:
|
||||
if audit.company_id not in seen_companies:
|
||||
seen_companies.add(audit.company_id)
|
||||
latest_audits.append(audit)
|
||||
|
||||
matches = []
|
||||
|
||||
for other_audit in latest_audits:
|
||||
other_company = self.db.query(Company).filter(
|
||||
Company.id == other_audit.company_id
|
||||
).first()
|
||||
|
||||
if not other_company:
|
||||
continue
|
||||
|
||||
# Check each match type
|
||||
match_list = self._check_all_match_types(
|
||||
company, company_audit, other_company, other_audit
|
||||
)
|
||||
matches.extend(match_list)
|
||||
|
||||
return matches
|
||||
|
||||
def _check_all_match_types(
|
||||
self,
|
||||
company_a: Company,
|
||||
audit_a: ITAudit,
|
||||
company_b: Company,
|
||||
audit_b: ITAudit
|
||||
) -> List[CollaborationMatch]:
|
||||
"""Check all match types between two companies."""
|
||||
matches = []
|
||||
|
||||
# Shared Licensing (M365)
|
||||
if (audit_a.has_m365 and audit_b.has_m365 and
|
||||
audit_a.open_to_shared_licensing and audit_b.open_to_shared_licensing):
|
||||
matches.append(CollaborationMatch(
|
||||
company_a_id=company_a.id,
|
||||
company_b_id=company_b.id,
|
||||
company_a_name=company_a.name,
|
||||
company_b_name=company_b.name,
|
||||
match_type='shared_licensing',
|
||||
match_reason='Obie firmy korzystają z Microsoft 365 i są otwarte na wspólne licencjonowanie',
|
||||
match_score=80,
|
||||
shared_attributes={'m365_plans_a': audit_a.m365_plans, 'm365_plans_b': audit_b.m365_plans}
|
||||
))
|
||||
|
||||
# Backup Replication (Proxmox PBS)
|
||||
if (audit_a.has_proxmox_pbs and audit_b.has_proxmox_pbs and
|
||||
audit_a.open_to_backup_replication and audit_b.open_to_backup_replication):
|
||||
matches.append(CollaborationMatch(
|
||||
company_a_id=company_a.id,
|
||||
company_b_id=company_b.id,
|
||||
company_a_name=company_a.name,
|
||||
company_b_name=company_b.name,
|
||||
match_type='backup_replication',
|
||||
match_reason='Obie firmy używają Proxmox PBS i są otwarte na replikację backupów',
|
||||
match_score=90,
|
||||
shared_attributes={'pbs': True}
|
||||
))
|
||||
|
||||
# Teams Federation (Azure AD)
|
||||
if (audit_a.has_azure_ad and audit_b.has_azure_ad and
|
||||
audit_a.open_to_teams_federation and audit_b.open_to_teams_federation):
|
||||
matches.append(CollaborationMatch(
|
||||
company_a_id=company_a.id,
|
||||
company_b_id=company_b.id,
|
||||
company_a_name=company_a.name,
|
||||
company_b_name=company_b.name,
|
||||
match_type='teams_federation',
|
||||
match_reason='Obie firmy mają Azure AD i są otwarte na federację Teams',
|
||||
match_score=85,
|
||||
shared_attributes={
|
||||
'tenant_a': audit_a.azure_tenant_name,
|
||||
'tenant_b': audit_b.azure_tenant_name
|
||||
}
|
||||
))
|
||||
|
||||
# Shared Monitoring (Zabbix)
|
||||
monitoring_a = (audit_a.monitoring_solution or '').lower()
|
||||
monitoring_b = (audit_b.monitoring_solution or '').lower()
|
||||
has_zabbix_a = 'zabbix' in monitoring_a
|
||||
has_zabbix_b = 'zabbix' in monitoring_b
|
||||
|
||||
if (has_zabbix_a and has_zabbix_b and
|
||||
audit_a.open_to_shared_monitoring and audit_b.open_to_shared_monitoring):
|
||||
matches.append(CollaborationMatch(
|
||||
company_a_id=company_a.id,
|
||||
company_b_id=company_b.id,
|
||||
company_a_name=company_a.name,
|
||||
company_b_name=company_b.name,
|
||||
match_type='shared_monitoring',
|
||||
match_reason='Obie firmy używają Zabbix i są otwarte na wspólny monitoring',
|
||||
match_score=75,
|
||||
shared_attributes={'monitoring': 'zabbix'}
|
||||
))
|
||||
|
||||
# Collective Purchasing (similar size)
|
||||
if (audit_a.open_to_collective_purchasing and audit_b.open_to_collective_purchasing):
|
||||
# Compare employee count ranges
|
||||
size_a = self._parse_count_range(audit_a.employee_count)
|
||||
size_b = self._parse_count_range(audit_b.employee_count)
|
||||
|
||||
if size_a and size_b and self._sizes_similar(size_a, size_b):
|
||||
matches.append(CollaborationMatch(
|
||||
company_a_id=company_a.id,
|
||||
company_b_id=company_b.id,
|
||||
company_a_name=company_a.name,
|
||||
company_b_name=company_b.name,
|
||||
match_type='collective_purchasing',
|
||||
match_reason='Firmy o podobnej wielkości, otwarte na zakupy grupowe',
|
||||
match_score=70,
|
||||
shared_attributes={
|
||||
'size_a': audit_a.employee_count,
|
||||
'size_b': audit_b.employee_count
|
||||
}
|
||||
))
|
||||
|
||||
# Knowledge Sharing (similar tech stack)
|
||||
if (audit_a.open_to_knowledge_sharing and audit_b.open_to_knowledge_sharing):
|
||||
common_tech = self._find_common_tech(audit_a, audit_b)
|
||||
if len(common_tech) >= 2:
|
||||
matches.append(CollaborationMatch(
|
||||
company_a_id=company_a.id,
|
||||
company_b_id=company_b.id,
|
||||
company_a_name=company_a.name,
|
||||
company_b_name=company_b.name,
|
||||
match_type='knowledge_sharing',
|
||||
match_reason=f'Wspólny stack technologiczny: {", ".join(common_tech)}',
|
||||
match_score=65,
|
||||
shared_attributes={'common_tech': common_tech}
|
||||
))
|
||||
|
||||
return matches
|
||||
|
||||
def _parse_count_range(self, count_str: Optional[str]) -> Optional[int]:
|
||||
"""Parse employee count string to approximate number."""
|
||||
if not count_str:
|
||||
return None
|
||||
|
||||
# Handle ranges like "1-10", "11-50", etc.
|
||||
ranges = {
|
||||
'1-10': 5,
|
||||
'11-50': 30,
|
||||
'51-100': 75,
|
||||
'101-250': 175,
|
||||
'251-500': 375,
|
||||
'500+': 750,
|
||||
}
|
||||
|
||||
return ranges.get(count_str, None)
|
||||
|
||||
def _sizes_similar(self, size_a: int, size_b: int) -> bool:
|
||||
"""Check if two company sizes are similar (within 2x)."""
|
||||
if size_a == 0 or size_b == 0:
|
||||
return False
|
||||
ratio = max(size_a, size_b) / min(size_a, size_b)
|
||||
return ratio <= 3 # Within 3x difference
|
||||
|
||||
def _find_common_tech(self, audit_a: ITAudit, audit_b: ITAudit) -> List[str]:
|
||||
"""Find common technologies between two audits."""
|
||||
common = []
|
||||
|
||||
# Check virtualization platform
|
||||
if audit_a.virtualization_platform and audit_b.virtualization_platform:
|
||||
if audit_a.virtualization_platform.lower() == audit_b.virtualization_platform.lower():
|
||||
common.append(audit_a.virtualization_platform)
|
||||
|
||||
# Check backup solution
|
||||
if audit_a.backup_solution and audit_b.backup_solution:
|
||||
if audit_a.backup_solution.lower() == audit_b.backup_solution.lower():
|
||||
common.append(f"Backup: {audit_a.backup_solution}")
|
||||
|
||||
# Check firewall brand
|
||||
if audit_a.network_firewall_brand and audit_b.network_firewall_brand:
|
||||
if audit_a.network_firewall_brand.lower() == audit_b.network_firewall_brand.lower():
|
||||
common.append(f"Firewall: {audit_a.network_firewall_brand}")
|
||||
|
||||
# Check ERP
|
||||
if audit_a.erp_system and audit_b.erp_system:
|
||||
if audit_a.erp_system.lower() == audit_b.erp_system.lower():
|
||||
common.append(f"ERP: {audit_a.erp_system}")
|
||||
|
||||
# Check cloud platforms
|
||||
if audit_a.has_azure_ad and audit_b.has_azure_ad:
|
||||
common.append("Azure AD")
|
||||
|
||||
if audit_a.has_m365 and audit_b.has_m365:
|
||||
common.append("Microsoft 365")
|
||||
|
||||
return common
|
||||
|
||||
def save_collaboration_match(self, match: CollaborationMatch) -> ITCollaborationMatch:
|
||||
"""
|
||||
Save a collaboration match to the database.
|
||||
|
||||
Args:
|
||||
match: CollaborationMatch to save
|
||||
|
||||
Returns:
|
||||
Saved ITCollaborationMatch record
|
||||
"""
|
||||
db_match = ITCollaborationMatch(
|
||||
company_a_id=match.company_a_id,
|
||||
company_b_id=match.company_b_id,
|
||||
match_type=match.match_type,
|
||||
match_reason=match.match_reason,
|
||||
match_score=match.match_score,
|
||||
status='suggested',
|
||||
shared_attributes=match.shared_attributes,
|
||||
)
|
||||
|
||||
self.db.add(db_match)
|
||||
self.db.commit()
|
||||
self.db.refresh(db_match)
|
||||
|
||||
logger.info(
|
||||
f"Collaboration match saved: {match.company_a_name} <-> {match.company_b_name} "
|
||||
f"({match.match_type})"
|
||||
)
|
||||
|
||||
return db_match
|
||||
|
||||
def get_matches_for_company(self, company_id: int) -> List[ITCollaborationMatch]:
|
||||
"""
|
||||
Get all collaboration matches for a company.
|
||||
|
||||
Args:
|
||||
company_id: Company ID
|
||||
|
||||
Returns:
|
||||
List of ITCollaborationMatch records where company is either A or B
|
||||
"""
|
||||
return self.db.query(ITCollaborationMatch).filter(
|
||||
(ITCollaborationMatch.company_a_id == company_id) |
|
||||
(ITCollaborationMatch.company_b_id == company_id)
|
||||
).order_by(ITCollaborationMatch.match_score.desc()).all()
|
||||
|
||||
|
||||
# === Convenience Functions ===
|
||||
|
||||
def audit_company(db: Session, company_id: int, audit_data: dict) -> ITAudit:
|
||||
"""
|
||||
Audit a company's IT infrastructure.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
company_id: Company ID to audit
|
||||
audit_data: Dictionary with form field values
|
||||
|
||||
Returns:
|
||||
Saved ITAudit record
|
||||
"""
|
||||
service = ITAuditService(db)
|
||||
return service.save_audit(company_id, audit_data)
|
||||
|
||||
|
||||
def get_company_audit(db: Session, company_id: int) -> Optional[ITAudit]:
|
||||
"""
|
||||
Get the latest audit for a company.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
company_id: Company ID
|
||||
|
||||
Returns:
|
||||
Latest ITAudit or None
|
||||
"""
|
||||
service = ITAuditService(db)
|
||||
return service.get_latest_audit(company_id)
|
||||
|
||||
|
||||
def calculate_scores(audit_data: dict) -> ITAuditResult:
|
||||
"""
|
||||
Calculate IT audit scores without saving.
|
||||
|
||||
Args:
|
||||
audit_data: Dictionary with form field values
|
||||
|
||||
Returns:
|
||||
ITAuditResult with all scores
|
||||
"""
|
||||
# Create a temporary service without DB connection
|
||||
service = ITAuditService.__new__(ITAuditService)
|
||||
return service.calculate_scores(audit_data)
|
||||
|
||||
|
||||
# === Main for Testing ===
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
# Test scoring calculation
|
||||
print("Testing IT Audit Service")
|
||||
print("-" * 50)
|
||||
|
||||
# Test security scoring
|
||||
test_data = {
|
||||
'has_edr': True,
|
||||
'has_mfa': True,
|
||||
'network_firewall_brand': 'Fortinet',
|
||||
'backup_solution': 'Veeam',
|
||||
'has_dr_plan': True,
|
||||
'has_vpn': True,
|
||||
'monitoring_solution': 'Zabbix',
|
||||
}
|
||||
|
||||
service = ITAuditService.__new__(ITAuditService)
|
||||
security_score = service._calculate_security_score(test_data)
|
||||
print(f"Security score (all elements): {security_score}/100")
|
||||
|
||||
# Test with minimal data
|
||||
minimal_data = {'has_edr': True, 'has_mfa': True}
|
||||
security_minimal = service._calculate_security_score(minimal_data)
|
||||
print(f"Security score (EDR + MFA only): {security_minimal}/100")
|
||||
|
||||
# Test collaboration scoring
|
||||
collab_data = {
|
||||
'has_azure_ad': True,
|
||||
'has_m365': True,
|
||||
'open_to_shared_licensing': True,
|
||||
'open_to_teams_federation': True,
|
||||
}
|
||||
collab_score = service._calculate_collaboration_score(collab_data)
|
||||
print(f"Collaboration score: {collab_score}/100")
|
||||
|
||||
# Test maturity levels
|
||||
print("\nMaturity levels:")
|
||||
for score in [25, 50, 70, 90]:
|
||||
level = get_maturity_level(score)
|
||||
label = get_maturity_level_label(level)
|
||||
print(f" Score {score}: {level} ({label})")
|
||||
|
||||
print("\n" + "-" * 50)
|
||||
print("IT Audit Service loaded successfully!")
|
||||
Loading…
Reference in New Issue
Block a user