nordabiz/file_upload_service.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

302 lines
11 KiB
Python

"""
File Upload Service
===================
Secure file upload handling for forum and classified attachments.
Supports JPG, PNG, GIF images up to 5MB.
Features:
- File type validation (magic bytes + extension)
- Size limits
- EXIF data stripping for privacy
- UUID-based filenames for security
- Date-organized storage structure
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-01-10
"""
import os
import uuid
import logging
from datetime import datetime
from typing import Tuple, Optional
from werkzeug.datastructures import FileStorage
logger = logging.getLogger(__name__)
# Configuration
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif'}
ALLOWED_MIME_TYPES = {'image/jpeg', 'image/png', 'image/gif'}
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
MAX_IMAGE_DIMENSIONS = (4096, 4096) # Max 4K resolution
# Get absolute path based on this file's location
_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
UPLOAD_BASE_PATH = os.path.join(_BASE_DIR, 'static', 'uploads', 'forum')
UPLOAD_CLASSIFIEDS_PATH = os.path.join(_BASE_DIR, 'static', 'uploads', 'classifieds')
# Magic bytes for image validation
IMAGE_SIGNATURES = {
b'\xff\xd8\xff': 'jpg', # JPEG
b'\x89PNG\r\n\x1a\n': 'png', # PNG
b'GIF87a': 'gif', # GIF87a
b'GIF89a': 'gif', # GIF89a
}
class FileUploadService:
"""Secure file upload service for forum attachments"""
@staticmethod
def validate_file(file: FileStorage) -> Tuple[bool, str]:
"""
Validate uploaded file.
Args:
file: Werkzeug FileStorage object
Returns:
Tuple of (is_valid, error_message)
"""
# Check if file exists
if not file or file.filename == '':
return False, 'Nie wybrano pliku'
# Check extension
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
if ext not in ALLOWED_EXTENSIONS:
return False, f'Niedozwolony format pliku. Dozwolone: {", ".join(sorted(ALLOWED_EXTENSIONS))}'
# Check file size
file.seek(0, 2) # Seek to end
size = file.tell()
file.seek(0) # Reset to beginning
if size > MAX_FILE_SIZE:
return False, f'Plik jest za duży (max {MAX_FILE_SIZE // 1024 // 1024}MB)'
if size == 0:
return False, 'Plik jest pusty'
# Verify magic bytes (actual file type)
header = file.read(16)
file.seek(0)
detected_type = None
for signature, file_type in IMAGE_SIGNATURES.items():
if header.startswith(signature):
detected_type = file_type
break
if not detected_type:
return False, 'Plik nie jest prawidłowym obrazem'
# Check if extension matches detected type
if ext == 'jpg':
ext = 'jpeg' # Normalize
if detected_type == 'jpg':
detected_type = 'jpeg'
if detected_type not in (ext, 'jpeg' if ext == 'jpg' else ext):
# Allow jpg/jpeg mismatch
if not (detected_type == 'jpeg' and ext in ('jpg', 'jpeg')):
return False, f'Rozszerzenie pliku ({ext}) nie odpowiada zawartości ({detected_type})'
# Validate image dimensions using PIL (if available)
try:
from PIL import Image
img = Image.open(file)
width, height = img.size
file.seek(0)
if width > MAX_IMAGE_DIMENSIONS[0] or height > MAX_IMAGE_DIMENSIONS[1]:
return False, f'Obraz jest za duży (max {MAX_IMAGE_DIMENSIONS[0]}x{MAX_IMAGE_DIMENSIONS[1]}px)'
except ImportError:
# PIL not available, skip dimension check
logger.warning("PIL not available, skipping image dimension validation")
except Exception as e:
file.seek(0)
return False, f'Nie można odczytać obrazu: {str(e)}'
return True, ''
@staticmethod
def generate_stored_filename(original_filename: str) -> str:
"""
Generate secure UUID-based filename preserving extension.
Args:
original_filename: Original filename from upload
Returns:
UUID-based filename with original extension
"""
ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'bin'
if ext == 'jpeg':
ext = 'jpg' # Normalize to jpg
return f"{uuid.uuid4()}.{ext}"
@staticmethod
def get_upload_path(attachment_type: str) -> str:
"""
Get upload directory path with date-based organization.
Args:
attachment_type: 'topic' or 'reply'
Returns:
Full path to upload directory
"""
now = datetime.now()
if attachment_type == 'classified':
path = os.path.join(UPLOAD_CLASSIFIEDS_PATH, str(now.year), f"{now.month:02d}")
else:
subdir = 'topics' if attachment_type == 'topic' else 'replies'
path = os.path.join(UPLOAD_BASE_PATH, subdir, str(now.year), f"{now.month:02d}")
os.makedirs(path, exist_ok=True)
return path
@staticmethod
def save_file(file: FileStorage, attachment_type: str) -> Tuple[str, str, int, str]:
"""
Save file securely with EXIF stripping.
Args:
file: Werkzeug FileStorage object
attachment_type: 'topic' or 'reply'
Returns:
Tuple of (stored_filename, relative_path, file_size, mime_type)
"""
stored_filename = FileUploadService.generate_stored_filename(file.filename)
upload_dir = FileUploadService.get_upload_path(attachment_type)
file_path = os.path.join(upload_dir, stored_filename)
# Determine mime type
ext = stored_filename.rsplit('.', 1)[-1].lower()
mime_types = {
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif'
}
mime_type = mime_types.get(ext, 'application/octet-stream')
try:
from PIL import Image
# Open and process image
img = Image.open(file)
# For GIF, preserve animation
if ext == 'gif' and getattr(img, 'is_animated', False):
# Save animated GIF without modification
file.seek(0)
file.save(file_path)
else:
# Strip EXIF data by creating new image
if img.mode in ('RGBA', 'LA', 'P'):
# Keep transparency for PNG
clean_img = Image.new(img.mode, img.size)
clean_img.putdata(list(img.getdata()))
else:
# Convert to RGB for JPEG
if img.mode != 'RGB':
img = img.convert('RGB')
clean_img = Image.new('RGB', img.size)
clean_img.putdata(list(img.getdata()))
# Save with optimization
save_kwargs = {'optimize': True}
if ext in ('jpg', 'jpeg'):
save_kwargs['quality'] = 85
elif ext == 'png':
save_kwargs['compress_level'] = 6
clean_img.save(file_path, **save_kwargs)
file_size = os.path.getsize(file_path)
relative_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static'))
logger.info(f"Saved forum attachment: {stored_filename} ({file_size} bytes)")
return stored_filename, relative_path, file_size, mime_type
except ImportError:
# PIL not available, save without processing
logger.warning("PIL not available, saving file without EXIF stripping")
file.seek(0)
file.save(file_path)
file_size = os.path.getsize(file_path)
relative_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static'))
return stored_filename, relative_path, file_size, mime_type
@staticmethod
def delete_file(stored_filename: str, attachment_type: str, created_at: Optional[datetime] = None) -> bool:
"""
Delete file from storage.
Args:
stored_filename: UUID-based filename
attachment_type: 'topic' or 'reply'
created_at: Creation timestamp to determine path
Returns:
True if deleted, False otherwise
"""
if attachment_type == 'classified':
base_path = UPLOAD_CLASSIFIEDS_PATH
subdir = None
else:
base_path = UPLOAD_BASE_PATH
subdir = 'topics' if attachment_type == 'topic' else 'replies'
if created_at:
# Try exact path first
if subdir:
path = os.path.join(base_path, subdir, str(created_at.year), f"{created_at.month:02d}", stored_filename)
else:
path = os.path.join(base_path, str(created_at.year), f"{created_at.month:02d}", stored_filename)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"Deleted forum attachment: {stored_filename}")
return True
except OSError as e:
logger.error(f"Failed to delete {stored_filename}: {e}")
return False
# Search in all date directories
search_path = os.path.join(base_path, subdir) if subdir else base_path
for root, dirs, files in os.walk(search_path):
if stored_filename in files:
try:
os.remove(os.path.join(root, stored_filename))
logger.info(f"Deleted forum attachment: {stored_filename}")
return True
except OSError as e:
logger.error(f"Failed to delete {stored_filename}: {e}")
return False
logger.warning(f"Attachment not found for deletion: {stored_filename}")
return False
@staticmethod
def get_file_url(stored_filename: str, attachment_type: str, created_at: datetime) -> str:
"""
Get URL for serving the file.
Args:
stored_filename: UUID-based filename
attachment_type: 'topic' or 'reply'
created_at: Creation timestamp
Returns:
URL path to the file
"""
if attachment_type == 'classified':
return f"/static/uploads/classifieds/{created_at.year}/{created_at.month:02d}/{stored_filename}"
subdir = 'topics' if attachment_type == 'topic' else 'replies'
return f"/static/uploads/forum/{subdir}/{created_at.year}/{created_at.month:02d}/{stored_filename}"