Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
995 lines
34 KiB
Python
995 lines
34 KiB
Python
"""
|
|
IT Audit Service for Norda Biznes Partner
|
|
======================================
|
|
|
|
IT infrastructure audit service with:
|
|
- Security posture assessment (50% weight)
|
|
- Collaboration readiness scoring (30% weight)
|
|
- Completeness scoring (20% weight)
|
|
- Maturity level classification
|
|
- Cross-company collaboration matching
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-01-09
|
|
"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import Company, ITAudit, ITCollaborationMatch, SessionLocal
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# === Scoring Configuration ===
|
|
|
|
# Security score elements (max 65 raw points, normalized to 100, then weighted 50%)
|
|
SECURITY_SCORES = {
|
|
'has_edr': 15, # EDR solution - highest security value
|
|
'has_mfa': 10, # Multi-factor authentication
|
|
'has_firewall': 10, # Network firewall
|
|
'has_backup': 10, # Backup solution
|
|
'has_dr_plan': 10, # Disaster recovery plan
|
|
'has_vpn': 5, # VPN solution
|
|
'has_monitoring': 5, # Monitoring solution
|
|
}
|
|
SECURITY_MAX_RAW = 65
|
|
|
|
# Collaboration score elements (max 100 raw points, weighted 30%)
|
|
COLLABORATION_SCORES = {
|
|
'open_to_shared_licensing': 10, # Open to shared M365/software licensing
|
|
'open_to_backup_replication': 10, # Open to backup replication partnership
|
|
'open_to_teams_federation': 10, # Open to Teams federation
|
|
'open_to_shared_monitoring': 10, # Open to shared Zabbix monitoring
|
|
'open_to_collective_purchasing': 10, # Open to collective purchasing
|
|
'open_to_knowledge_sharing': 10, # Open to knowledge exchange
|
|
'has_azure_ad': 10, # Azure AD (enables federation)
|
|
'has_m365': 10, # Microsoft 365 (enables shared licensing)
|
|
'has_proxmox_pbs': 10, # Proxmox PBS (enables backup replication)
|
|
'has_zabbix': 10, # Zabbix monitoring (enables shared monitoring)
|
|
}
|
|
COLLABORATION_MAX_RAW = 100
|
|
|
|
# Completeness weights for different field categories
|
|
COMPLETENESS_WEIGHTS = {
|
|
# IT Contact (weight: 10)
|
|
'it_contact_name': 3,
|
|
'it_contact_email': 3,
|
|
'has_it_manager': 2,
|
|
'it_provider_name': 2,
|
|
|
|
# Cloud & Identity (weight: 15)
|
|
'has_azure_ad': 4,
|
|
'azure_tenant_name': 3,
|
|
'azure_user_count': 3,
|
|
'has_m365': 3,
|
|
'has_google_workspace': 2,
|
|
|
|
# Server Infrastructure (weight: 15)
|
|
'server_count': 3,
|
|
'server_types': 3,
|
|
'virtualization_platform': 3,
|
|
'server_os': 3,
|
|
'network_firewall_brand': 3,
|
|
|
|
# Endpoints (weight: 10)
|
|
'employee_count': 3,
|
|
'computer_count': 3,
|
|
'desktop_os': 2,
|
|
'mdm_solution': 2,
|
|
|
|
# Security (weight: 20)
|
|
'antivirus_solution': 4,
|
|
'has_edr': 4,
|
|
'has_vpn': 4,
|
|
'has_mfa': 4,
|
|
'mfa_scope': 4,
|
|
|
|
# Backup & DR (weight: 15)
|
|
'backup_solution': 4,
|
|
'backup_targets': 3,
|
|
'backup_frequency': 3,
|
|
'has_proxmox_pbs': 3,
|
|
'has_dr_plan': 2,
|
|
|
|
# Monitoring (weight: 5)
|
|
'monitoring_solution': 3,
|
|
'zabbix_integration': 2,
|
|
|
|
# Business Apps (weight: 5)
|
|
'ticketing_system': 2,
|
|
'erp_system': 2,
|
|
'crm_system': 1,
|
|
|
|
# Collaboration (weight: 5)
|
|
'open_to_shared_licensing': 1,
|
|
'open_to_backup_replication': 1,
|
|
'open_to_teams_federation': 1,
|
|
'open_to_shared_monitoring': 1,
|
|
'open_to_collective_purchasing': 1,
|
|
'open_to_knowledge_sharing': 0, # Bonus, not required for completeness
|
|
}
|
|
|
|
# Maturity level thresholds
|
|
MATURITY_LEVELS = {
|
|
'basic': (0, 39),
|
|
'developing': (40, 59),
|
|
'established': (60, 79),
|
|
'advanced': (80, 100),
|
|
}
|
|
|
|
# Collaboration match types with Polish labels
|
|
MATCH_TYPES = {
|
|
'shared_licensing': 'Wspólne licencje',
|
|
'backup_replication': 'Replikacja backupów',
|
|
'teams_federation': 'Federacja Teams',
|
|
'shared_monitoring': 'Wspólny monitoring',
|
|
'collective_purchasing': 'Zakupy grupowe',
|
|
'knowledge_sharing': 'Wymiana wiedzy',
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class FieldStatus:
|
|
"""Status of a single IT audit field"""
|
|
field_name: str
|
|
status: str # 'complete', 'partial', 'missing'
|
|
value: Optional[Any] = None
|
|
score: float = 0.0
|
|
max_score: float = 0.0
|
|
recommendation: Optional[str] = None
|
|
details: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
@dataclass
|
|
class ITAuditResult:
|
|
"""Complete IT audit result"""
|
|
company_id: int
|
|
overall_score: int
|
|
security_score: int
|
|
collaboration_score: int
|
|
completeness_score: int
|
|
maturity_level: str # basic, developing, established, advanced
|
|
fields: Dict[str, FieldStatus] = field(default_factory=dict)
|
|
recommendations: List[Dict[str, Any]] = field(default_factory=list)
|
|
audit_errors: Optional[str] = None
|
|
|
|
# Section summaries
|
|
section_scores: Dict[str, int] = field(default_factory=dict)
|
|
|
|
# Technology flags for quick access
|
|
has_azure_ad: bool = False
|
|
has_m365: bool = False
|
|
has_proxmox_pbs: bool = False
|
|
has_zabbix: bool = False
|
|
has_edr: bool = False
|
|
has_mfa: bool = False
|
|
|
|
|
|
@dataclass
|
|
class CollaborationMatch:
|
|
"""A potential collaboration match between two companies"""
|
|
company_a_id: int
|
|
company_b_id: int
|
|
company_a_name: str
|
|
company_b_name: str
|
|
match_type: str
|
|
match_reason: str
|
|
match_score: int
|
|
shared_attributes: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
def get_maturity_level(score: int) -> str:
|
|
"""
|
|
Get maturity level label based on overall score.
|
|
|
|
Args:
|
|
score: Overall score (0-100)
|
|
|
|
Returns:
|
|
Maturity level: 'basic', 'developing', 'established', or 'advanced'
|
|
"""
|
|
for level, (min_score, max_score) in MATURITY_LEVELS.items():
|
|
if min_score <= score <= max_score:
|
|
return level
|
|
return 'basic'
|
|
|
|
|
|
def get_maturity_level_label(level: str) -> str:
|
|
"""
|
|
Get Polish label for maturity level.
|
|
|
|
Args:
|
|
level: Maturity level key
|
|
|
|
Returns:
|
|
Polish label for the maturity level
|
|
"""
|
|
labels = {
|
|
'basic': 'Podstawowy',
|
|
'developing': 'Rozwijający się',
|
|
'established': 'Dojrzały',
|
|
'advanced': 'Zaawansowany',
|
|
}
|
|
return labels.get(level, 'Nieznany')
|
|
|
|
|
|
class ITAuditService:
|
|
"""Service for IT infrastructure auditing and collaboration matching"""
|
|
|
|
def __init__(self, db: Session):
|
|
"""
|
|
Initialize IT Audit service.
|
|
|
|
Args:
|
|
db: SQLAlchemy database session
|
|
"""
|
|
self.db = db
|
|
|
|
# === Scoring Methods ===
|
|
|
|
def _calculate_security_score(self, audit_data: dict) -> int:
|
|
"""
|
|
Calculate security score from audit data.
|
|
|
|
Security elements and their point values (max 65 points):
|
|
- has_edr: +15 pts (EDR solution - highest security value)
|
|
- has_mfa: +10 pts (Multi-factor authentication)
|
|
- has_firewall: +10 pts (Network firewall)
|
|
- has_backup: +10 pts (Backup solution)
|
|
- has_dr_plan: +10 pts (Disaster recovery plan)
|
|
- has_vpn: +5 pts (VPN solution)
|
|
- has_monitoring: +5 pts (Monitoring solution)
|
|
|
|
The raw score (0-65) is returned directly. When calculating
|
|
the overall score, this is weighted at 50% and normalized.
|
|
|
|
Args:
|
|
audit_data: Dictionary with audit field values
|
|
|
|
Returns:
|
|
Raw security score (0-65)
|
|
"""
|
|
score = 0
|
|
|
|
# EDR - highest security value
|
|
if audit_data.get('has_edr'):
|
|
score += SECURITY_SCORES['has_edr']
|
|
|
|
# MFA - critical for identity security
|
|
if audit_data.get('has_mfa'):
|
|
score += SECURITY_SCORES['has_mfa']
|
|
|
|
# Firewall (check for brand name presence or explicit flag)
|
|
if audit_data.get('network_firewall_brand') or audit_data.get('has_firewall'):
|
|
score += SECURITY_SCORES['has_firewall']
|
|
|
|
# Backup solution (check for solution name or explicit flag)
|
|
if audit_data.get('backup_solution') or audit_data.get('has_backup'):
|
|
score += SECURITY_SCORES['has_backup']
|
|
|
|
# DR plan - disaster recovery readiness
|
|
if audit_data.get('has_dr_plan'):
|
|
score += SECURITY_SCORES['has_dr_plan']
|
|
|
|
# VPN - secure remote access
|
|
if audit_data.get('has_vpn'):
|
|
score += SECURITY_SCORES['has_vpn']
|
|
|
|
# Monitoring (check for solution name or explicit flag)
|
|
if audit_data.get('monitoring_solution') or audit_data.get('has_monitoring'):
|
|
score += SECURITY_SCORES['has_monitoring']
|
|
|
|
return score
|
|
|
|
def _calculate_collaboration_score(self, audit_data: dict) -> int:
|
|
"""
|
|
Calculate collaboration readiness score from audit data.
|
|
|
|
Collaboration elements and their point values:
|
|
- Each open_to_* flag: +10 pts (max 60 from flags)
|
|
- has_azure_ad: +10 pts (enables Teams federation)
|
|
- has_m365: +10 pts (enables shared licensing)
|
|
- has_proxmox_pbs: +10 pts (enables backup replication)
|
|
- has_zabbix: +10 pts (enables shared monitoring)
|
|
|
|
Max raw: 100 points
|
|
|
|
Args:
|
|
audit_data: Dictionary with audit field values
|
|
|
|
Returns:
|
|
Collaboration score (0-100)
|
|
"""
|
|
raw_score = 0
|
|
|
|
# Collaboration flags
|
|
collaboration_flags = [
|
|
'open_to_shared_licensing',
|
|
'open_to_backup_replication',
|
|
'open_to_teams_federation',
|
|
'open_to_shared_monitoring',
|
|
'open_to_collective_purchasing',
|
|
'open_to_knowledge_sharing',
|
|
]
|
|
|
|
for flag in collaboration_flags:
|
|
if audit_data.get(flag):
|
|
raw_score += COLLABORATION_SCORES[flag]
|
|
|
|
# Technology bonuses
|
|
if audit_data.get('has_azure_ad'):
|
|
raw_score += COLLABORATION_SCORES['has_azure_ad']
|
|
|
|
if audit_data.get('has_m365'):
|
|
raw_score += COLLABORATION_SCORES['has_m365']
|
|
|
|
if audit_data.get('has_proxmox_pbs'):
|
|
raw_score += COLLABORATION_SCORES['has_proxmox_pbs']
|
|
|
|
# Zabbix check (can be in monitoring_solution or zabbix_integration)
|
|
monitoring = audit_data.get('monitoring_solution', '').lower() if audit_data.get('monitoring_solution') else ''
|
|
has_zabbix = audit_data.get('has_zabbix') or 'zabbix' in monitoring
|
|
if has_zabbix:
|
|
raw_score += COLLABORATION_SCORES['has_zabbix']
|
|
|
|
# Cap at 100
|
|
return min(raw_score, 100)
|
|
|
|
def _calculate_completeness_score(self, audit_data: dict) -> int:
|
|
"""
|
|
Calculate completeness score based on filled fields with weights.
|
|
|
|
Args:
|
|
audit_data: Dictionary with audit field values
|
|
|
|
Returns:
|
|
Completeness percentage (0-100)
|
|
"""
|
|
total_weight = sum(COMPLETENESS_WEIGHTS.values())
|
|
achieved_weight = 0
|
|
|
|
for field_name, weight in COMPLETENESS_WEIGHTS.items():
|
|
value = audit_data.get(field_name)
|
|
|
|
# Check if field has a meaningful value
|
|
if value is not None:
|
|
if isinstance(value, bool):
|
|
# Boolean fields count if True
|
|
if value:
|
|
achieved_weight += weight
|
|
elif isinstance(value, str):
|
|
# String fields count if non-empty
|
|
if value.strip():
|
|
achieved_weight += weight
|
|
elif isinstance(value, (list, dict)):
|
|
# List/dict fields count if non-empty
|
|
if len(value) > 0:
|
|
achieved_weight += weight
|
|
elif isinstance(value, (int, float)):
|
|
# Numeric fields count if > 0
|
|
if value > 0:
|
|
achieved_weight += weight
|
|
else:
|
|
# Any other truthy value counts
|
|
achieved_weight += weight
|
|
|
|
if total_weight == 0:
|
|
return 0
|
|
|
|
return round(achieved_weight / total_weight * 100)
|
|
|
|
def calculate_scores(self, audit_data: dict) -> ITAuditResult:
|
|
"""
|
|
Calculate all scores from form data.
|
|
|
|
Overall score formula:
|
|
- Security: 50% weight (normalized from 0-65 to 0-100)
|
|
- Collaboration: 30% weight (already 0-100)
|
|
- Completeness: 20% weight (already 0-100)
|
|
|
|
Args:
|
|
audit_data: Dictionary with all form field values
|
|
|
|
Returns:
|
|
ITAuditResult with all scores and maturity level
|
|
"""
|
|
company_id = audit_data.get('company_id', 0)
|
|
|
|
# Calculate component scores
|
|
security_score = self._calculate_security_score(audit_data) # Returns 0-65
|
|
collaboration_score = self._calculate_collaboration_score(audit_data) # Returns 0-100
|
|
completeness_score = self._calculate_completeness_score(audit_data) # Returns 0-100
|
|
|
|
# Normalize security score to 0-100 for overall calculation
|
|
security_normalized = (security_score / SECURITY_MAX_RAW) * 100
|
|
|
|
# Calculate weighted overall score
|
|
overall_score = round(
|
|
security_normalized * 0.50 +
|
|
collaboration_score * 0.30 +
|
|
completeness_score * 0.20
|
|
)
|
|
|
|
# Determine maturity level
|
|
maturity_level = get_maturity_level(overall_score)
|
|
|
|
# Build result
|
|
result = ITAuditResult(
|
|
company_id=company_id,
|
|
overall_score=overall_score,
|
|
security_score=security_score,
|
|
collaboration_score=collaboration_score,
|
|
completeness_score=completeness_score,
|
|
maturity_level=maturity_level,
|
|
has_azure_ad=bool(audit_data.get('has_azure_ad')),
|
|
has_m365=bool(audit_data.get('has_m365')),
|
|
has_proxmox_pbs=bool(audit_data.get('has_proxmox_pbs')),
|
|
has_zabbix=self._has_zabbix(audit_data),
|
|
has_edr=bool(audit_data.get('has_edr')),
|
|
has_mfa=bool(audit_data.get('has_mfa')),
|
|
)
|
|
|
|
return result
|
|
|
|
def _has_zabbix(self, audit_data: dict) -> bool:
|
|
"""Check if company uses Zabbix monitoring."""
|
|
monitoring = audit_data.get('monitoring_solution', '').lower() if audit_data.get('monitoring_solution') else ''
|
|
return audit_data.get('has_zabbix') or 'zabbix' in monitoring
|
|
|
|
# === CRUD Methods ===
|
|
|
|
def save_audit(self, company_id: int, audit_data: dict) -> ITAudit:
|
|
"""
|
|
Save audit to database.
|
|
|
|
Args:
|
|
company_id: Company ID to save audit for
|
|
audit_data: Dictionary with all form field values
|
|
|
|
Returns:
|
|
Saved ITAudit record
|
|
"""
|
|
# Calculate scores
|
|
audit_data['company_id'] = company_id
|
|
result = self.calculate_scores(audit_data)
|
|
|
|
# Create audit record
|
|
audit = ITAudit(
|
|
company_id=company_id,
|
|
audit_date=datetime.now(),
|
|
audit_source=audit_data.get('audit_source', 'form'),
|
|
audited_by=audit_data.get('audited_by'),
|
|
|
|
# Scores
|
|
overall_score=result.overall_score,
|
|
security_score=result.security_score,
|
|
collaboration_score=result.collaboration_score,
|
|
completeness_score=result.completeness_score,
|
|
maturity_level=result.maturity_level,
|
|
|
|
# Cloud & Identity
|
|
has_azure_ad=audit_data.get('has_azure_ad', False),
|
|
azure_tenant_name=audit_data.get('azure_tenant_name'),
|
|
azure_user_count=audit_data.get('azure_user_count'),
|
|
has_m365=audit_data.get('has_m365', False),
|
|
m365_plans=audit_data.get('m365_plans'),
|
|
teams_usage=audit_data.get('teams_usage'),
|
|
has_google_workspace=audit_data.get('has_google_workspace', False),
|
|
|
|
# Infrastructure
|
|
server_count=audit_data.get('server_count'),
|
|
server_types=audit_data.get('server_types'),
|
|
virtualization_platform=audit_data.get('virtualization_platform'),
|
|
server_os=audit_data.get('server_os'),
|
|
network_firewall_brand=audit_data.get('network_firewall_brand'),
|
|
|
|
# Endpoints
|
|
employee_count=audit_data.get('employee_count'),
|
|
computer_count=audit_data.get('computer_count'),
|
|
desktop_os=audit_data.get('desktop_os'),
|
|
has_mdm=audit_data.get('has_mdm', False),
|
|
mdm_solution=audit_data.get('mdm_solution'),
|
|
|
|
# Security
|
|
antivirus_solution=audit_data.get('antivirus_solution'),
|
|
has_edr=audit_data.get('has_edr', False),
|
|
edr_solution=audit_data.get('edr_solution'),
|
|
has_vpn=audit_data.get('has_vpn', False),
|
|
vpn_solution=audit_data.get('vpn_solution'),
|
|
has_mfa=audit_data.get('has_mfa', False),
|
|
mfa_scope=audit_data.get('mfa_scope'),
|
|
|
|
# Backup & DR
|
|
backup_solution=audit_data.get('backup_solution'),
|
|
backup_targets=audit_data.get('backup_targets'),
|
|
backup_frequency=audit_data.get('backup_frequency'),
|
|
has_proxmox_pbs=audit_data.get('has_proxmox_pbs', False),
|
|
has_dr_plan=audit_data.get('has_dr_plan', False),
|
|
|
|
# Monitoring
|
|
monitoring_solution=audit_data.get('monitoring_solution'),
|
|
zabbix_integration=audit_data.get('zabbix_integration'),
|
|
|
|
# Business Apps
|
|
ticketing_system=audit_data.get('ticketing_system'),
|
|
erp_system=audit_data.get('erp_system'),
|
|
crm_system=audit_data.get('crm_system'),
|
|
|
|
# Active Directory
|
|
has_local_ad=audit_data.get('has_local_ad', False),
|
|
ad_domain_name=audit_data.get('ad_domain_name'),
|
|
has_ad_azure_sync=audit_data.get('has_ad_azure_sync', False),
|
|
|
|
# Collaboration Flags
|
|
open_to_shared_licensing=audit_data.get('open_to_shared_licensing', False),
|
|
open_to_backup_replication=audit_data.get('open_to_backup_replication', False),
|
|
open_to_teams_federation=audit_data.get('open_to_teams_federation', False),
|
|
open_to_shared_monitoring=audit_data.get('open_to_shared_monitoring', False),
|
|
open_to_collective_purchasing=audit_data.get('open_to_collective_purchasing', False),
|
|
open_to_knowledge_sharing=audit_data.get('open_to_knowledge_sharing', False),
|
|
|
|
# IT Contact
|
|
it_contact_name=audit_data.get('it_contact_name'),
|
|
it_contact_email=audit_data.get('it_contact_email'),
|
|
has_it_manager=audit_data.get('has_it_manager', False),
|
|
it_outsourced=audit_data.get('it_outsourced', False),
|
|
it_provider_name=audit_data.get('it_provider_name'),
|
|
|
|
# Raw Data
|
|
form_data=audit_data,
|
|
recommendations=result.recommendations if result.recommendations else None,
|
|
audit_errors=result.audit_errors,
|
|
)
|
|
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(audit)
|
|
|
|
logger.info(
|
|
f"IT audit saved for company {company_id}: "
|
|
f"overall={result.overall_score}, security={result.security_score}, "
|
|
f"collaboration={result.collaboration_score}, completeness={result.completeness_score}"
|
|
)
|
|
|
|
return audit
|
|
|
|
def get_latest_audit(self, company_id: int) -> Optional[ITAudit]:
|
|
"""
|
|
Get the most recent audit for a company.
|
|
|
|
Args:
|
|
company_id: Company ID
|
|
|
|
Returns:
|
|
Latest ITAudit or None
|
|
"""
|
|
return self.db.query(ITAudit).filter(
|
|
ITAudit.company_id == company_id
|
|
).order_by(ITAudit.audit_date.desc()).first()
|
|
|
|
def get_audit_history(self, company_id: int, limit: int = 10) -> List[ITAudit]:
|
|
"""
|
|
Get audit history for a company.
|
|
|
|
Args:
|
|
company_id: Company ID
|
|
limit: Maximum number of audits to return
|
|
|
|
Returns:
|
|
List of ITAudit records ordered by date descending
|
|
"""
|
|
return self.db.query(ITAudit).filter(
|
|
ITAudit.company_id == company_id
|
|
).order_by(ITAudit.audit_date.desc()).limit(limit).all()
|
|
|
|
# === Collaboration Matching Methods ===
|
|
|
|
def find_collaboration_matches(self, company_id: int) -> List[CollaborationMatch]:
|
|
"""
|
|
Find potential collaboration partners for a company.
|
|
|
|
Match types:
|
|
- shared_licensing: Both have M365 with same plans
|
|
- backup_replication: Both have Proxmox PBS + open flag
|
|
- teams_federation: Both have Azure AD + open flag
|
|
- shared_monitoring: Both use Zabbix + open flag
|
|
- collective_purchasing: Similar company size + open flag
|
|
- knowledge_sharing: Similar tech stack + open flag
|
|
|
|
Args:
|
|
company_id: Company ID to find matches for
|
|
|
|
Returns:
|
|
List of CollaborationMatch objects
|
|
"""
|
|
# Get the company's audit
|
|
company_audit = self.get_latest_audit(company_id)
|
|
if not company_audit:
|
|
return []
|
|
|
|
company = self.db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return []
|
|
|
|
# Get all other companies with audits
|
|
other_audits = self.db.query(ITAudit).filter(
|
|
ITAudit.company_id != company_id
|
|
).order_by(ITAudit.audit_date.desc()).all()
|
|
|
|
# Deduplicate by company_id (keep latest audit per company)
|
|
seen_companies = set()
|
|
latest_audits = []
|
|
for audit in other_audits:
|
|
if audit.company_id not in seen_companies:
|
|
seen_companies.add(audit.company_id)
|
|
latest_audits.append(audit)
|
|
|
|
matches = []
|
|
|
|
for other_audit in latest_audits:
|
|
other_company = self.db.query(Company).filter(
|
|
Company.id == other_audit.company_id
|
|
).first()
|
|
|
|
if not other_company:
|
|
continue
|
|
|
|
# Check each match type
|
|
match_list = self._check_all_match_types(
|
|
company, company_audit, other_company, other_audit
|
|
)
|
|
matches.extend(match_list)
|
|
|
|
return matches
|
|
|
|
def _check_all_match_types(
|
|
self,
|
|
company_a: Company,
|
|
audit_a: ITAudit,
|
|
company_b: Company,
|
|
audit_b: ITAudit
|
|
) -> List[CollaborationMatch]:
|
|
"""Check all match types between two companies."""
|
|
matches = []
|
|
|
|
# Shared Licensing (M365)
|
|
if (audit_a.has_m365 and audit_b.has_m365 and
|
|
audit_a.open_to_shared_licensing and audit_b.open_to_shared_licensing):
|
|
matches.append(CollaborationMatch(
|
|
company_a_id=company_a.id,
|
|
company_b_id=company_b.id,
|
|
company_a_name=company_a.name,
|
|
company_b_name=company_b.name,
|
|
match_type='shared_licensing',
|
|
match_reason='Obie firmy korzystają z Microsoft 365 i są otwarte na wspólne licencjonowanie',
|
|
match_score=80,
|
|
shared_attributes={'m365_plans_a': audit_a.m365_plans, 'm365_plans_b': audit_b.m365_plans}
|
|
))
|
|
|
|
# Backup Replication (Proxmox PBS)
|
|
if (audit_a.has_proxmox_pbs and audit_b.has_proxmox_pbs and
|
|
audit_a.open_to_backup_replication and audit_b.open_to_backup_replication):
|
|
matches.append(CollaborationMatch(
|
|
company_a_id=company_a.id,
|
|
company_b_id=company_b.id,
|
|
company_a_name=company_a.name,
|
|
company_b_name=company_b.name,
|
|
match_type='backup_replication',
|
|
match_reason='Obie firmy używają Proxmox PBS i są otwarte na replikację backupów',
|
|
match_score=90,
|
|
shared_attributes={'pbs': True}
|
|
))
|
|
|
|
# Teams Federation (Azure AD)
|
|
if (audit_a.has_azure_ad and audit_b.has_azure_ad and
|
|
audit_a.open_to_teams_federation and audit_b.open_to_teams_federation):
|
|
matches.append(CollaborationMatch(
|
|
company_a_id=company_a.id,
|
|
company_b_id=company_b.id,
|
|
company_a_name=company_a.name,
|
|
company_b_name=company_b.name,
|
|
match_type='teams_federation',
|
|
match_reason='Obie firmy mają Azure AD i są otwarte na federację Teams',
|
|
match_score=85,
|
|
shared_attributes={
|
|
'tenant_a': audit_a.azure_tenant_name,
|
|
'tenant_b': audit_b.azure_tenant_name
|
|
}
|
|
))
|
|
|
|
# Shared Monitoring (Zabbix)
|
|
monitoring_a = (audit_a.monitoring_solution or '').lower()
|
|
monitoring_b = (audit_b.monitoring_solution or '').lower()
|
|
has_zabbix_a = 'zabbix' in monitoring_a
|
|
has_zabbix_b = 'zabbix' in monitoring_b
|
|
|
|
if (has_zabbix_a and has_zabbix_b and
|
|
audit_a.open_to_shared_monitoring and audit_b.open_to_shared_monitoring):
|
|
matches.append(CollaborationMatch(
|
|
company_a_id=company_a.id,
|
|
company_b_id=company_b.id,
|
|
company_a_name=company_a.name,
|
|
company_b_name=company_b.name,
|
|
match_type='shared_monitoring',
|
|
match_reason='Obie firmy używają Zabbix i są otwarte na wspólny monitoring',
|
|
match_score=75,
|
|
shared_attributes={'monitoring': 'zabbix'}
|
|
))
|
|
|
|
# Collective Purchasing (similar size)
|
|
if (audit_a.open_to_collective_purchasing and audit_b.open_to_collective_purchasing):
|
|
# Compare employee count ranges
|
|
size_a = self._parse_count_range(audit_a.employee_count)
|
|
size_b = self._parse_count_range(audit_b.employee_count)
|
|
|
|
if size_a and size_b and self._sizes_similar(size_a, size_b):
|
|
matches.append(CollaborationMatch(
|
|
company_a_id=company_a.id,
|
|
company_b_id=company_b.id,
|
|
company_a_name=company_a.name,
|
|
company_b_name=company_b.name,
|
|
match_type='collective_purchasing',
|
|
match_reason='Firmy o podobnej wielkości, otwarte na zakupy grupowe',
|
|
match_score=70,
|
|
shared_attributes={
|
|
'size_a': audit_a.employee_count,
|
|
'size_b': audit_b.employee_count
|
|
}
|
|
))
|
|
|
|
# Knowledge Sharing (similar tech stack)
|
|
if (audit_a.open_to_knowledge_sharing and audit_b.open_to_knowledge_sharing):
|
|
common_tech = self._find_common_tech(audit_a, audit_b)
|
|
if len(common_tech) >= 2:
|
|
matches.append(CollaborationMatch(
|
|
company_a_id=company_a.id,
|
|
company_b_id=company_b.id,
|
|
company_a_name=company_a.name,
|
|
company_b_name=company_b.name,
|
|
match_type='knowledge_sharing',
|
|
match_reason=f'Wspólny stack technologiczny: {", ".join(common_tech)}',
|
|
match_score=65,
|
|
shared_attributes={'common_tech': common_tech}
|
|
))
|
|
|
|
return matches
|
|
|
|
def _parse_count_range(self, count_str: Optional[str]) -> Optional[int]:
|
|
"""Parse employee count string to approximate number."""
|
|
if not count_str:
|
|
return None
|
|
|
|
# Handle ranges like "1-10", "11-50", etc.
|
|
ranges = {
|
|
'1-10': 5,
|
|
'11-50': 30,
|
|
'51-100': 75,
|
|
'101-250': 175,
|
|
'251-500': 375,
|
|
'500+': 750,
|
|
}
|
|
|
|
return ranges.get(count_str, None)
|
|
|
|
def _sizes_similar(self, size_a: int, size_b: int) -> bool:
|
|
"""Check if two company sizes are similar (within 2x)."""
|
|
if size_a == 0 or size_b == 0:
|
|
return False
|
|
ratio = max(size_a, size_b) / min(size_a, size_b)
|
|
return ratio <= 3 # Within 3x difference
|
|
|
|
def _find_common_tech(self, audit_a: ITAudit, audit_b: ITAudit) -> List[str]:
|
|
"""Find common technologies between two audits."""
|
|
common = []
|
|
|
|
# Check virtualization platform
|
|
if audit_a.virtualization_platform and audit_b.virtualization_platform:
|
|
if audit_a.virtualization_platform.lower() == audit_b.virtualization_platform.lower():
|
|
common.append(audit_a.virtualization_platform)
|
|
|
|
# Check backup solution
|
|
if audit_a.backup_solution and audit_b.backup_solution:
|
|
if audit_a.backup_solution.lower() == audit_b.backup_solution.lower():
|
|
common.append(f"Backup: {audit_a.backup_solution}")
|
|
|
|
# Check firewall brand
|
|
if audit_a.network_firewall_brand and audit_b.network_firewall_brand:
|
|
if audit_a.network_firewall_brand.lower() == audit_b.network_firewall_brand.lower():
|
|
common.append(f"Firewall: {audit_a.network_firewall_brand}")
|
|
|
|
# Check ERP
|
|
if audit_a.erp_system and audit_b.erp_system:
|
|
if audit_a.erp_system.lower() == audit_b.erp_system.lower():
|
|
common.append(f"ERP: {audit_a.erp_system}")
|
|
|
|
# Check cloud platforms
|
|
if audit_a.has_azure_ad and audit_b.has_azure_ad:
|
|
common.append("Azure AD")
|
|
|
|
if audit_a.has_m365 and audit_b.has_m365:
|
|
common.append("Microsoft 365")
|
|
|
|
return common
|
|
|
|
def save_collaboration_match(self, match: CollaborationMatch) -> ITCollaborationMatch:
|
|
"""
|
|
Save a collaboration match to the database.
|
|
|
|
Args:
|
|
match: CollaborationMatch to save
|
|
|
|
Returns:
|
|
Saved ITCollaborationMatch record
|
|
"""
|
|
db_match = ITCollaborationMatch(
|
|
company_a_id=match.company_a_id,
|
|
company_b_id=match.company_b_id,
|
|
match_type=match.match_type,
|
|
match_reason=match.match_reason,
|
|
match_score=match.match_score,
|
|
status='suggested',
|
|
shared_attributes=match.shared_attributes,
|
|
)
|
|
|
|
self.db.add(db_match)
|
|
self.db.commit()
|
|
self.db.refresh(db_match)
|
|
|
|
logger.info(
|
|
f"Collaboration match saved: {match.company_a_name} <-> {match.company_b_name} "
|
|
f"({match.match_type})"
|
|
)
|
|
|
|
return db_match
|
|
|
|
def get_matches_for_company(self, company_id: int) -> List[ITCollaborationMatch]:
|
|
"""
|
|
Get all collaboration matches for a company.
|
|
|
|
Args:
|
|
company_id: Company ID
|
|
|
|
Returns:
|
|
List of ITCollaborationMatch records where company is either A or B
|
|
"""
|
|
return self.db.query(ITCollaborationMatch).filter(
|
|
(ITCollaborationMatch.company_a_id == company_id) |
|
|
(ITCollaborationMatch.company_b_id == company_id)
|
|
).order_by(ITCollaborationMatch.match_score.desc()).all()
|
|
|
|
|
|
# === Convenience Functions ===
|
|
|
|
def audit_company(db: Session, company_id: int, audit_data: dict) -> ITAudit:
|
|
"""
|
|
Audit a company's IT infrastructure.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID to audit
|
|
audit_data: Dictionary with form field values
|
|
|
|
Returns:
|
|
Saved ITAudit record
|
|
"""
|
|
service = ITAuditService(db)
|
|
return service.save_audit(company_id, audit_data)
|
|
|
|
|
|
def get_company_audit(db: Session, company_id: int) -> Optional[ITAudit]:
|
|
"""
|
|
Get the latest audit for a company.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
|
|
Returns:
|
|
Latest ITAudit or None
|
|
"""
|
|
service = ITAuditService(db)
|
|
return service.get_latest_audit(company_id)
|
|
|
|
|
|
def calculate_scores(audit_data: dict) -> ITAuditResult:
|
|
"""
|
|
Calculate IT audit scores without saving.
|
|
|
|
Args:
|
|
audit_data: Dictionary with form field values
|
|
|
|
Returns:
|
|
ITAuditResult with all scores
|
|
"""
|
|
# Create a temporary service without DB connection
|
|
service = ITAuditService.__new__(ITAuditService)
|
|
return service.calculate_scores(audit_data)
|
|
|
|
|
|
def get_company_audit_history(db: Session, company_id: int, limit: int = 10) -> List[ITAudit]:
|
|
"""
|
|
Get audit history for a company.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
limit: Maximum number of audits to return
|
|
|
|
Returns:
|
|
List of ITAudit records ordered by date descending
|
|
"""
|
|
service = ITAuditService(db)
|
|
return service.get_audit_history(company_id, limit)
|
|
|
|
|
|
def has_company_audit(db: Session, company_id: int) -> bool:
|
|
"""
|
|
Check if a company has any IT audit.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
|
|
Returns:
|
|
True if company has at least one audit, False otherwise
|
|
"""
|
|
return db.query(ITAudit).filter(ITAudit.company_id == company_id).first() is not None
|
|
|
|
|
|
# === Main for Testing ===
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Test scoring calculation
|
|
print("Testing IT Audit Service")
|
|
print("-" * 50)
|
|
|
|
# Test security scoring
|
|
test_data = {
|
|
'has_edr': True,
|
|
'has_mfa': True,
|
|
'network_firewall_brand': 'Fortinet',
|
|
'backup_solution': 'Veeam',
|
|
'has_dr_plan': True,
|
|
'has_vpn': True,
|
|
'monitoring_solution': 'Zabbix',
|
|
}
|
|
|
|
service = ITAuditService.__new__(ITAuditService)
|
|
security_score = service._calculate_security_score(test_data)
|
|
print(f"Security score (all elements): {security_score}/100")
|
|
|
|
# Test with minimal data
|
|
minimal_data = {'has_edr': True, 'has_mfa': True}
|
|
security_minimal = service._calculate_security_score(minimal_data)
|
|
print(f"Security score (EDR + MFA only): {security_minimal}/100")
|
|
|
|
# Test collaboration scoring
|
|
collab_data = {
|
|
'has_azure_ad': True,
|
|
'has_m365': True,
|
|
'open_to_shared_licensing': True,
|
|
'open_to_teams_federation': True,
|
|
}
|
|
collab_score = service._calculate_collaboration_score(collab_data)
|
|
print(f"Collaboration score: {collab_score}/100")
|
|
|
|
# Test maturity levels
|
|
print("\nMaturity levels:")
|
|
for score in [25, 50, 70, 90]:
|
|
level = get_maturity_level(score)
|
|
label = get_maturity_level_label(level)
|
|
print(f" Score {score}: {level} ({label})")
|
|
|
|
print("\n" + "-" * 50)
|
|
print("IT Audit Service loaded successfully!")
|