Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
302 lines
11 KiB
Python
302 lines
11 KiB
Python
"""
|
|
File Upload Service
|
|
===================
|
|
|
|
Secure file upload handling for forum and classified attachments.
|
|
Supports JPG, PNG, GIF images up to 5MB.
|
|
|
|
Features:
|
|
- File type validation (magic bytes + extension)
|
|
- Size limits
|
|
- EXIF data stripping for privacy
|
|
- UUID-based filenames for security
|
|
- Date-organized storage structure
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-01-10
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Tuple, Optional
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif'}
|
|
ALLOWED_MIME_TYPES = {'image/jpeg', 'image/png', 'image/gif'}
|
|
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
|
|
MAX_IMAGE_DIMENSIONS = (4096, 4096) # Max 4K resolution
|
|
|
|
# Get absolute path based on this file's location
|
|
_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
UPLOAD_BASE_PATH = os.path.join(_BASE_DIR, 'static', 'uploads', 'forum')
|
|
UPLOAD_CLASSIFIEDS_PATH = os.path.join(_BASE_DIR, 'static', 'uploads', 'classifieds')
|
|
|
|
# Magic bytes for image validation
|
|
IMAGE_SIGNATURES = {
|
|
b'\xff\xd8\xff': 'jpg', # JPEG
|
|
b'\x89PNG\r\n\x1a\n': 'png', # PNG
|
|
b'GIF87a': 'gif', # GIF87a
|
|
b'GIF89a': 'gif', # GIF89a
|
|
}
|
|
|
|
|
|
class FileUploadService:
|
|
"""Secure file upload service for forum attachments"""
|
|
|
|
@staticmethod
|
|
def validate_file(file: FileStorage) -> Tuple[bool, str]:
|
|
"""
|
|
Validate uploaded file.
|
|
|
|
Args:
|
|
file: Werkzeug FileStorage object
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
# Check if file exists
|
|
if not file or file.filename == '':
|
|
return False, 'Nie wybrano pliku'
|
|
|
|
# Check extension
|
|
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
|
|
if ext not in ALLOWED_EXTENSIONS:
|
|
return False, f'Niedozwolony format pliku. Dozwolone: {", ".join(sorted(ALLOWED_EXTENSIONS))}'
|
|
|
|
# Check file size
|
|
file.seek(0, 2) # Seek to end
|
|
size = file.tell()
|
|
file.seek(0) # Reset to beginning
|
|
|
|
if size > MAX_FILE_SIZE:
|
|
return False, f'Plik jest za duży (max {MAX_FILE_SIZE // 1024 // 1024}MB)'
|
|
|
|
if size == 0:
|
|
return False, 'Plik jest pusty'
|
|
|
|
# Verify magic bytes (actual file type)
|
|
header = file.read(16)
|
|
file.seek(0)
|
|
|
|
detected_type = None
|
|
for signature, file_type in IMAGE_SIGNATURES.items():
|
|
if header.startswith(signature):
|
|
detected_type = file_type
|
|
break
|
|
|
|
if not detected_type:
|
|
return False, 'Plik nie jest prawidłowym obrazem'
|
|
|
|
# Check if extension matches detected type
|
|
if ext == 'jpg':
|
|
ext = 'jpeg' # Normalize
|
|
if detected_type == 'jpg':
|
|
detected_type = 'jpeg'
|
|
|
|
if detected_type not in (ext, 'jpeg' if ext == 'jpg' else ext):
|
|
# Allow jpg/jpeg mismatch
|
|
if not (detected_type == 'jpeg' and ext in ('jpg', 'jpeg')):
|
|
return False, f'Rozszerzenie pliku ({ext}) nie odpowiada zawartości ({detected_type})'
|
|
|
|
# Validate image dimensions using PIL (if available)
|
|
try:
|
|
from PIL import Image
|
|
img = Image.open(file)
|
|
width, height = img.size
|
|
file.seek(0)
|
|
|
|
if width > MAX_IMAGE_DIMENSIONS[0] or height > MAX_IMAGE_DIMENSIONS[1]:
|
|
return False, f'Obraz jest za duży (max {MAX_IMAGE_DIMENSIONS[0]}x{MAX_IMAGE_DIMENSIONS[1]}px)'
|
|
|
|
except ImportError:
|
|
# PIL not available, skip dimension check
|
|
logger.warning("PIL not available, skipping image dimension validation")
|
|
except Exception as e:
|
|
file.seek(0)
|
|
return False, f'Nie można odczytać obrazu: {str(e)}'
|
|
|
|
return True, ''
|
|
|
|
@staticmethod
|
|
def generate_stored_filename(original_filename: str) -> str:
|
|
"""
|
|
Generate secure UUID-based filename preserving extension.
|
|
|
|
Args:
|
|
original_filename: Original filename from upload
|
|
|
|
Returns:
|
|
UUID-based filename with original extension
|
|
"""
|
|
ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'bin'
|
|
if ext == 'jpeg':
|
|
ext = 'jpg' # Normalize to jpg
|
|
return f"{uuid.uuid4()}.{ext}"
|
|
|
|
@staticmethod
|
|
def get_upload_path(attachment_type: str) -> str:
|
|
"""
|
|
Get upload directory path with date-based organization.
|
|
|
|
Args:
|
|
attachment_type: 'topic' or 'reply'
|
|
|
|
Returns:
|
|
Full path to upload directory
|
|
"""
|
|
now = datetime.now()
|
|
if attachment_type == 'classified':
|
|
path = os.path.join(UPLOAD_CLASSIFIEDS_PATH, str(now.year), f"{now.month:02d}")
|
|
else:
|
|
subdir = 'topics' if attachment_type == 'topic' else 'replies'
|
|
path = os.path.join(UPLOAD_BASE_PATH, subdir, str(now.year), f"{now.month:02d}")
|
|
os.makedirs(path, exist_ok=True)
|
|
return path
|
|
|
|
@staticmethod
|
|
def save_file(file: FileStorage, attachment_type: str) -> Tuple[str, str, int, str]:
|
|
"""
|
|
Save file securely with EXIF stripping.
|
|
|
|
Args:
|
|
file: Werkzeug FileStorage object
|
|
attachment_type: 'topic' or 'reply'
|
|
|
|
Returns:
|
|
Tuple of (stored_filename, relative_path, file_size, mime_type)
|
|
"""
|
|
stored_filename = FileUploadService.generate_stored_filename(file.filename)
|
|
upload_dir = FileUploadService.get_upload_path(attachment_type)
|
|
file_path = os.path.join(upload_dir, stored_filename)
|
|
|
|
# Determine mime type
|
|
ext = stored_filename.rsplit('.', 1)[-1].lower()
|
|
mime_types = {
|
|
'jpg': 'image/jpeg',
|
|
'jpeg': 'image/jpeg',
|
|
'png': 'image/png',
|
|
'gif': 'image/gif'
|
|
}
|
|
mime_type = mime_types.get(ext, 'application/octet-stream')
|
|
|
|
try:
|
|
from PIL import Image
|
|
|
|
# Open and process image
|
|
img = Image.open(file)
|
|
|
|
# For GIF, preserve animation
|
|
if ext == 'gif' and getattr(img, 'is_animated', False):
|
|
# Save animated GIF without modification
|
|
file.seek(0)
|
|
file.save(file_path)
|
|
else:
|
|
# Strip EXIF data by creating new image
|
|
if img.mode in ('RGBA', 'LA', 'P'):
|
|
# Keep transparency for PNG
|
|
clean_img = Image.new(img.mode, img.size)
|
|
clean_img.putdata(list(img.getdata()))
|
|
else:
|
|
# Convert to RGB for JPEG
|
|
if img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
clean_img = Image.new('RGB', img.size)
|
|
clean_img.putdata(list(img.getdata()))
|
|
|
|
# Save with optimization
|
|
save_kwargs = {'optimize': True}
|
|
if ext in ('jpg', 'jpeg'):
|
|
save_kwargs['quality'] = 85
|
|
elif ext == 'png':
|
|
save_kwargs['compress_level'] = 6
|
|
|
|
clean_img.save(file_path, **save_kwargs)
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
relative_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static'))
|
|
|
|
logger.info(f"Saved forum attachment: {stored_filename} ({file_size} bytes)")
|
|
return stored_filename, relative_path, file_size, mime_type
|
|
|
|
except ImportError:
|
|
# PIL not available, save without processing
|
|
logger.warning("PIL not available, saving file without EXIF stripping")
|
|
file.seek(0)
|
|
file.save(file_path)
|
|
file_size = os.path.getsize(file_path)
|
|
relative_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static'))
|
|
return stored_filename, relative_path, file_size, mime_type
|
|
|
|
@staticmethod
|
|
def delete_file(stored_filename: str, attachment_type: str, created_at: Optional[datetime] = None) -> bool:
|
|
"""
|
|
Delete file from storage.
|
|
|
|
Args:
|
|
stored_filename: UUID-based filename
|
|
attachment_type: 'topic' or 'reply'
|
|
created_at: Creation timestamp to determine path
|
|
|
|
Returns:
|
|
True if deleted, False otherwise
|
|
"""
|
|
if attachment_type == 'classified':
|
|
base_path = UPLOAD_CLASSIFIEDS_PATH
|
|
subdir = None
|
|
else:
|
|
base_path = UPLOAD_BASE_PATH
|
|
subdir = 'topics' if attachment_type == 'topic' else 'replies'
|
|
|
|
if created_at:
|
|
# Try exact path first
|
|
if subdir:
|
|
path = os.path.join(base_path, subdir, str(created_at.year), f"{created_at.month:02d}", stored_filename)
|
|
else:
|
|
path = os.path.join(base_path, str(created_at.year), f"{created_at.month:02d}", stored_filename)
|
|
if os.path.exists(path):
|
|
try:
|
|
os.remove(path)
|
|
logger.info(f"Deleted forum attachment: {stored_filename}")
|
|
return True
|
|
except OSError as e:
|
|
logger.error(f"Failed to delete {stored_filename}: {e}")
|
|
return False
|
|
|
|
# Search in all date directories
|
|
search_path = os.path.join(base_path, subdir) if subdir else base_path
|
|
for root, dirs, files in os.walk(search_path):
|
|
if stored_filename in files:
|
|
try:
|
|
os.remove(os.path.join(root, stored_filename))
|
|
logger.info(f"Deleted forum attachment: {stored_filename}")
|
|
return True
|
|
except OSError as e:
|
|
logger.error(f"Failed to delete {stored_filename}: {e}")
|
|
return False
|
|
|
|
logger.warning(f"Attachment not found for deletion: {stored_filename}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_file_url(stored_filename: str, attachment_type: str, created_at: datetime) -> str:
|
|
"""
|
|
Get URL for serving the file.
|
|
|
|
Args:
|
|
stored_filename: UUID-based filename
|
|
attachment_type: 'topic' or 'reply'
|
|
created_at: Creation timestamp
|
|
|
|
Returns:
|
|
URL path to the file
|
|
"""
|
|
if attachment_type == 'classified':
|
|
return f"/static/uploads/classifieds/{created_at.year}/{created_at.month:02d}/{stored_filename}"
|
|
subdir = 'topics' if attachment_type == 'topic' else 'replies'
|
|
return f"/static/uploads/forum/{subdir}/{created_at.year}/{created_at.month:02d}/{stored_filename}"
|