Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
339 lines
11 KiB
Python
339 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sensitive Data Detection and Sanitization Service
|
|
==================================================
|
|
|
|
Automatically detects and masks sensitive data in user messages.
|
|
RODO/GDPR compliant - prevents storage of sensitive personal data.
|
|
|
|
Detected data types:
|
|
- PESEL (Polish national ID)
|
|
- Credit card numbers (Luhn validated)
|
|
- IBAN bank account numbers
|
|
- Passwords (contextual detection)
|
|
- Phone numbers (optional)
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-01-28
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
from typing import Dict, List, Tuple, Optional
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SensitiveDataType(Enum):
|
|
"""Types of sensitive data that can be detected"""
|
|
PESEL = "pesel"
|
|
CREDIT_CARD = "credit_card"
|
|
IBAN = "iban"
|
|
PASSWORD = "password"
|
|
NIP = "nip"
|
|
REGON = "regon"
|
|
ID_CARD = "id_card"
|
|
PASSPORT = "passport"
|
|
|
|
|
|
@dataclass
|
|
class SensitiveDataMatch:
|
|
"""Represents a detected sensitive data match"""
|
|
data_type: SensitiveDataType
|
|
original: str
|
|
masked: str
|
|
start_pos: int
|
|
end_pos: int
|
|
confidence: float # 0.0 to 1.0
|
|
|
|
|
|
class SensitiveDataService:
|
|
"""
|
|
Service for detecting and sanitizing sensitive data in text.
|
|
|
|
Usage:
|
|
service = SensitiveDataService()
|
|
sanitized, matches = service.sanitize("Mój PESEL to 12345678901")
|
|
# sanitized = "Mój PESEL to [PESEL UKRYTY]"
|
|
"""
|
|
|
|
# Masking templates
|
|
MASKS = {
|
|
SensitiveDataType.PESEL: "[PESEL UKRYTY]",
|
|
SensitiveDataType.CREDIT_CARD: "[KARTA UKRYTA]",
|
|
SensitiveDataType.IBAN: "[KONTO UKRYTE]",
|
|
SensitiveDataType.PASSWORD: "[HASŁO UKRYTE]",
|
|
SensitiveDataType.NIP: "[NIP UKRYTY]",
|
|
SensitiveDataType.REGON: "[REGON UKRYTY]",
|
|
SensitiveDataType.ID_CARD: "[DOWÓD UKRYTY]",
|
|
SensitiveDataType.PASSPORT: "[PASZPORT UKRYTY]",
|
|
}
|
|
|
|
# Regex patterns
|
|
PATTERNS = {
|
|
# PESEL: 11 digits, often written with spaces
|
|
SensitiveDataType.PESEL: r'\b(\d{2})[\s-]?(\d{2})[\s-]?(\d{2})[\s-]?(\d{5})\b',
|
|
|
|
# Credit cards: 13-19 digits, often grouped by 4
|
|
SensitiveDataType.CREDIT_CARD: r'\b(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{1,7})\b',
|
|
|
|
# IBAN Poland: PL + 26 digits
|
|
SensitiveDataType.IBAN: r'\b(PL)?\s?(\d{2})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})\b',
|
|
|
|
# Password patterns (contextual)
|
|
SensitiveDataType.PASSWORD: r'(?:hasło|password|pass|pwd|pin)[\s:=]+["\']?([^\s"\']{4,})["\']?',
|
|
|
|
# NIP: 10 digits
|
|
SensitiveDataType.NIP: r'\b(\d{3})[\s-]?(\d{3})[\s-]?(\d{2})[\s-]?(\d{2})\b',
|
|
|
|
# REGON: 9 or 14 digits
|
|
SensitiveDataType.REGON: r'\b(\d{9}|\d{14})\b',
|
|
|
|
# Polish ID card: 3 letters + 6 digits
|
|
SensitiveDataType.ID_CARD: r'\b([A-Z]{3})[\s-]?(\d{6})\b',
|
|
|
|
# Passport: 2 letters + 7 digits
|
|
SensitiveDataType.PASSPORT: r'\b([A-Z]{2})[\s-]?(\d{7})\b',
|
|
}
|
|
|
|
# Context keywords that increase confidence
|
|
CONTEXT_KEYWORDS = {
|
|
SensitiveDataType.PESEL: ['pesel', 'numer pesel', 'nr pesel', 'identyfikacyjny'],
|
|
SensitiveDataType.CREDIT_CARD: ['karta', 'kredytowa', 'debetowa', 'visa', 'mastercard', 'card'],
|
|
SensitiveDataType.IBAN: ['konto', 'bankowe', 'przelew', 'iban', 'numer konta', 'rachunek'],
|
|
SensitiveDataType.PASSWORD: ['hasło', 'password', 'login', 'logowanie'],
|
|
SensitiveDataType.NIP: ['nip', 'podatnik', 'faktura'],
|
|
SensitiveDataType.REGON: ['regon', 'rejestr'],
|
|
SensitiveDataType.ID_CARD: ['dowód', 'osobisty', 'dokument'],
|
|
SensitiveDataType.PASSPORT: ['paszport', 'passport'],
|
|
}
|
|
|
|
def __init__(self, enabled_types: Optional[List[SensitiveDataType]] = None):
|
|
"""
|
|
Initialize service with optional list of data types to detect.
|
|
|
|
Args:
|
|
enabled_types: List of SensitiveDataType to detect.
|
|
If None, detects all types except NIP (often public in business context).
|
|
"""
|
|
if enabled_types is None:
|
|
# Default: detect all except NIP (public for companies)
|
|
self.enabled_types = [
|
|
SensitiveDataType.PESEL,
|
|
SensitiveDataType.CREDIT_CARD,
|
|
SensitiveDataType.IBAN,
|
|
SensitiveDataType.PASSWORD,
|
|
SensitiveDataType.ID_CARD,
|
|
SensitiveDataType.PASSPORT,
|
|
]
|
|
else:
|
|
self.enabled_types = enabled_types
|
|
|
|
def detect(self, text: str) -> List[SensitiveDataMatch]:
|
|
"""
|
|
Detect all sensitive data in text.
|
|
|
|
Args:
|
|
text: Input text to scan
|
|
|
|
Returns:
|
|
List of SensitiveDataMatch objects
|
|
"""
|
|
matches = []
|
|
text_lower = text.lower()
|
|
|
|
for data_type in self.enabled_types:
|
|
pattern = self.PATTERNS.get(data_type)
|
|
if not pattern:
|
|
continue
|
|
|
|
for match in re.finditer(pattern, text, re.IGNORECASE):
|
|
original = match.group(0)
|
|
|
|
# Calculate confidence based on context and validation
|
|
confidence = self._calculate_confidence(data_type, original, text_lower, match.start())
|
|
|
|
# Skip low-confidence matches
|
|
if confidence < 0.5:
|
|
continue
|
|
|
|
matches.append(SensitiveDataMatch(
|
|
data_type=data_type,
|
|
original=original,
|
|
masked=self.MASKS[data_type],
|
|
start_pos=match.start(),
|
|
end_pos=match.end(),
|
|
confidence=confidence
|
|
))
|
|
|
|
# Sort by position (reverse for safe replacement)
|
|
matches.sort(key=lambda m: m.start_pos, reverse=True)
|
|
|
|
return matches
|
|
|
|
def sanitize(self, text: str) -> Tuple[str, List[SensitiveDataMatch]]:
|
|
"""
|
|
Detect and mask sensitive data in text.
|
|
|
|
Args:
|
|
text: Input text to sanitize
|
|
|
|
Returns:
|
|
Tuple of (sanitized_text, list_of_matches)
|
|
"""
|
|
matches = self.detect(text)
|
|
|
|
sanitized = text
|
|
for match in matches:
|
|
sanitized = (
|
|
sanitized[:match.start_pos] +
|
|
match.masked +
|
|
sanitized[match.end_pos:]
|
|
)
|
|
|
|
if matches:
|
|
logger.info(
|
|
f"SENSITIVE_DATA: Sanitized {len(matches)} sensitive data items: "
|
|
f"{[m.data_type.value for m in matches]}"
|
|
)
|
|
|
|
return sanitized, matches
|
|
|
|
def _calculate_confidence(
|
|
self,
|
|
data_type: SensitiveDataType,
|
|
value: str,
|
|
text_lower: str,
|
|
position: int
|
|
) -> float:
|
|
"""
|
|
Calculate confidence score for a match.
|
|
|
|
Args:
|
|
data_type: Type of detected data
|
|
value: The matched value
|
|
text_lower: Lowercase version of full text (for context search)
|
|
position: Position of match in text
|
|
|
|
Returns:
|
|
Confidence score 0.0 to 1.0
|
|
"""
|
|
confidence = 0.5 # Base confidence
|
|
|
|
# Check for context keywords nearby (within 50 chars before match)
|
|
context_start = max(0, position - 50)
|
|
context = text_lower[context_start:position]
|
|
|
|
keywords = self.CONTEXT_KEYWORDS.get(data_type, [])
|
|
for keyword in keywords:
|
|
if keyword in context:
|
|
confidence += 0.3
|
|
break
|
|
|
|
# Validate specific formats
|
|
clean_value = re.sub(r'[\s-]', '', value)
|
|
|
|
if data_type == SensitiveDataType.PESEL:
|
|
if self._validate_pesel(clean_value):
|
|
confidence += 0.2
|
|
|
|
elif data_type == SensitiveDataType.CREDIT_CARD:
|
|
if self._validate_luhn(clean_value):
|
|
confidence += 0.3
|
|
|
|
elif data_type == SensitiveDataType.IBAN:
|
|
if clean_value.upper().startswith('PL') or len(clean_value) == 26:
|
|
confidence += 0.2
|
|
|
|
elif data_type == SensitiveDataType.NIP:
|
|
if self._validate_nip(clean_value):
|
|
confidence += 0.2
|
|
|
|
return min(confidence, 1.0)
|
|
|
|
def _validate_pesel(self, pesel: str) -> bool:
|
|
"""Validate PESEL checksum"""
|
|
if len(pesel) != 11 or not pesel.isdigit():
|
|
return False
|
|
|
|
weights = [1, 3, 7, 9, 1, 3, 7, 9, 1, 3]
|
|
checksum = sum(int(pesel[i]) * weights[i] for i in range(10))
|
|
control = (10 - (checksum % 10)) % 10
|
|
|
|
return control == int(pesel[10])
|
|
|
|
def _validate_luhn(self, number: str) -> bool:
|
|
"""Validate credit card number using Luhn algorithm"""
|
|
if not number.isdigit() or len(number) < 13 or len(number) > 19:
|
|
return False
|
|
|
|
digits = [int(d) for d in number]
|
|
odd_digits = digits[-1::-2]
|
|
even_digits = digits[-2::-2]
|
|
|
|
checksum = sum(odd_digits)
|
|
for d in even_digits:
|
|
checksum += sum(divmod(d * 2, 10))
|
|
|
|
return checksum % 10 == 0
|
|
|
|
def _validate_nip(self, nip: str) -> bool:
|
|
"""Validate Polish NIP checksum"""
|
|
if len(nip) != 10 or not nip.isdigit():
|
|
return False
|
|
|
|
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
|
checksum = sum(int(nip[i]) * weights[i] for i in range(9))
|
|
control = checksum % 11
|
|
|
|
return control == int(nip[9])
|
|
|
|
|
|
# Global instance for easy import
|
|
_service_instance: Optional[SensitiveDataService] = None
|
|
|
|
|
|
def get_sensitive_data_service() -> SensitiveDataService:
|
|
"""Get or create global SensitiveDataService instance"""
|
|
global _service_instance
|
|
if _service_instance is None:
|
|
_service_instance = SensitiveDataService()
|
|
return _service_instance
|
|
|
|
|
|
def sanitize_message(text: str) -> Tuple[str, List[SensitiveDataMatch]]:
|
|
"""
|
|
Convenience function to sanitize text using global service.
|
|
|
|
Args:
|
|
text: Input text to sanitize
|
|
|
|
Returns:
|
|
Tuple of (sanitized_text, list_of_matches)
|
|
"""
|
|
return get_sensitive_data_service().sanitize(text)
|
|
|
|
|
|
# Quick test
|
|
if __name__ == "__main__":
|
|
service = SensitiveDataService()
|
|
|
|
test_cases = [
|
|
"Mój PESEL to 44051401359",
|
|
"Przelej na konto PL61 1090 1014 0000 0712 1981 2874",
|
|
"Numer karty: 4532015112830366",
|
|
"Moje hasło: SuperSecret123!",
|
|
"Dowód osobisty: ABC123456",
|
|
"Napisz na email@example.com", # Should NOT be masked (intentional)
|
|
]
|
|
|
|
for test in test_cases:
|
|
sanitized, matches = service.sanitize(test)
|
|
print(f"Input: {test}")
|
|
print(f"Output: {sanitized}")
|
|
if matches:
|
|
print(f"Found: {[(m.data_type.value, m.confidence) for m in matches]}")
|
|
print()
|