130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
"""File upload service for message attachments."""
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from werkzeug.utils import secure_filename
|
|
|
|
|
|
# Allowed file types and their limits
|
|
IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif'}
|
|
DOCUMENT_EXTENSIONS = {'pdf', 'docx', 'xlsx'}
|
|
ALLOWED_EXTENSIONS = IMAGE_EXTENSIONS | DOCUMENT_EXTENSIONS
|
|
|
|
MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image
|
|
MAX_DOCUMENT_SIZE = 10 * 1024 * 1024 # 10MB per document
|
|
MAX_TOTAL_SIZE = 15 * 1024 * 1024 # 15MB total per message
|
|
MAX_FILES_PER_MESSAGE = 3
|
|
|
|
# Magic bytes signatures for validation
|
|
FILE_SIGNATURES = {
|
|
'jpg': [b'\xff\xd8\xff'],
|
|
'jpeg': [b'\xff\xd8\xff'],
|
|
'png': [b'\x89PNG\r\n\x1a\n'],
|
|
'gif': [b'GIF87a', b'GIF89a'],
|
|
'pdf': [b'%PDF'],
|
|
'docx': [b'PK\x03\x04'], # ZIP-based format
|
|
'xlsx': [b'PK\x03\x04'], # ZIP-based format
|
|
}
|
|
|
|
MIME_TYPES = {
|
|
'jpg': 'image/jpeg',
|
|
'jpeg': 'image/jpeg',
|
|
'png': 'image/png',
|
|
'gif': 'image/gif',
|
|
'pdf': 'application/pdf',
|
|
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
}
|
|
|
|
UPLOAD_BASE = 'static/uploads/messages'
|
|
|
|
|
|
class MessageUploadService:
|
|
"""Handles file uploads for message attachments."""
|
|
|
|
def __init__(self, app_root: str):
|
|
self.app_root = app_root
|
|
|
|
def validate_files(self, files: list) -> tuple:
|
|
"""Validate list of uploaded files. Returns (valid_files, errors)."""
|
|
errors = []
|
|
valid = []
|
|
|
|
if len(files) > MAX_FILES_PER_MESSAGE:
|
|
errors.append(f'Maksymalnie {MAX_FILES_PER_MESSAGE} pliki na wiadomość.')
|
|
return [], errors
|
|
|
|
total_size = 0
|
|
for f in files:
|
|
if not f or not f.filename:
|
|
continue
|
|
|
|
filename = secure_filename(f.filename)
|
|
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
|
|
|
|
if ext not in ALLOWED_EXTENSIONS:
|
|
errors.append(f'Niedozwolony typ pliku: {filename}')
|
|
continue
|
|
|
|
# Read file content for size + magic bytes check
|
|
content = f.read()
|
|
f.seek(0)
|
|
size = len(content)
|
|
|
|
max_size = MAX_IMAGE_SIZE if ext in IMAGE_EXTENSIONS else MAX_DOCUMENT_SIZE
|
|
if size > max_size:
|
|
limit_mb = max_size // (1024 * 1024)
|
|
errors.append(f'Plik {filename} przekracza limit {limit_mb}MB.')
|
|
continue
|
|
|
|
total_size += size
|
|
|
|
# Magic bytes validation
|
|
if ext in FILE_SIGNATURES:
|
|
valid_sig = False
|
|
for sig in FILE_SIGNATURES[ext]:
|
|
if content[:len(sig)] == sig:
|
|
valid_sig = True
|
|
break
|
|
if not valid_sig:
|
|
errors.append(f'Plik {filename} ma nieprawidłowy format.')
|
|
continue
|
|
|
|
valid.append((f, filename, ext, size, content))
|
|
|
|
if total_size > MAX_TOTAL_SIZE:
|
|
errors.append(f'Łączny rozmiar plików przekracza {MAX_TOTAL_SIZE // (1024*1024)}MB.')
|
|
return [], errors
|
|
|
|
return valid, errors
|
|
|
|
def save_file(self, content: bytes, ext: str) -> tuple:
|
|
"""Save file to disk. Returns (stored_filename, relative_path)."""
|
|
now = datetime.now()
|
|
subdir = os.path.join(UPLOAD_BASE, str(now.year), f'{now.month:02d}')
|
|
full_dir = os.path.join(self.app_root, subdir)
|
|
os.makedirs(full_dir, exist_ok=True)
|
|
|
|
stored_filename = f'{uuid.uuid4().hex}.{ext}'
|
|
full_path = os.path.join(full_dir, stored_filename)
|
|
|
|
with open(full_path, 'wb') as out:
|
|
out.write(content)
|
|
|
|
relative_path = os.path.join(subdir, stored_filename)
|
|
return stored_filename, relative_path
|
|
|
|
def get_mime_type(self, ext: str) -> str:
|
|
"""Get MIME type for extension."""
|
|
return MIME_TYPES.get(ext, 'application/octet-stream')
|
|
|
|
def is_image(self, ext: str) -> bool:
|
|
"""Check if extension is an image type."""
|
|
return ext in IMAGE_EXTENSIONS
|
|
|
|
def delete_file(self, relative_path: str):
|
|
"""Delete a file from disk."""
|
|
full_path = os.path.join(self.app_root, relative_path)
|
|
if os.path.exists(full_path):
|
|
os.remove(full_path)
|