feat(messages): add message upload service for file attachments
This commit is contained in:
parent
02ebc7db00
commit
a2ca231e6e
129
message_upload_service.py
Normal file
129
message_upload_service.py
Normal file
@ -0,0 +1,129 @@
|
||||
"""File upload service for message attachments."""
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
|
||||
# Allowed file types and their limits
|
||||
IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif'}
|
||||
DOCUMENT_EXTENSIONS = {'pdf', 'docx', 'xlsx'}
|
||||
ALLOWED_EXTENSIONS = IMAGE_EXTENSIONS | DOCUMENT_EXTENSIONS
|
||||
|
||||
MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image
|
||||
MAX_DOCUMENT_SIZE = 10 * 1024 * 1024 # 10MB per document
|
||||
MAX_TOTAL_SIZE = 15 * 1024 * 1024 # 15MB total per message
|
||||
MAX_FILES_PER_MESSAGE = 3
|
||||
|
||||
# Magic bytes signatures for validation
|
||||
FILE_SIGNATURES = {
|
||||
'jpg': [b'\xff\xd8\xff'],
|
||||
'jpeg': [b'\xff\xd8\xff'],
|
||||
'png': [b'\x89PNG\r\n\x1a\n'],
|
||||
'gif': [b'GIF87a', b'GIF89a'],
|
||||
'pdf': [b'%PDF'],
|
||||
'docx': [b'PK\x03\x04'], # ZIP-based format
|
||||
'xlsx': [b'PK\x03\x04'], # ZIP-based format
|
||||
}
|
||||
|
||||
MIME_TYPES = {
|
||||
'jpg': 'image/jpeg',
|
||||
'jpeg': 'image/jpeg',
|
||||
'png': 'image/png',
|
||||
'gif': 'image/gif',
|
||||
'pdf': 'application/pdf',
|
||||
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||||
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||||
}
|
||||
|
||||
UPLOAD_BASE = 'static/uploads/messages'
|
||||
|
||||
|
||||
class MessageUploadService:
|
||||
"""Handles file uploads for message attachments."""
|
||||
|
||||
def __init__(self, app_root: str):
|
||||
self.app_root = app_root
|
||||
|
||||
def validate_files(self, files: list) -> tuple:
|
||||
"""Validate list of uploaded files. Returns (valid_files, errors)."""
|
||||
errors = []
|
||||
valid = []
|
||||
|
||||
if len(files) > MAX_FILES_PER_MESSAGE:
|
||||
errors.append(f'Maksymalnie {MAX_FILES_PER_MESSAGE} pliki na wiadomość.')
|
||||
return [], errors
|
||||
|
||||
total_size = 0
|
||||
for f in files:
|
||||
if not f or not f.filename:
|
||||
continue
|
||||
|
||||
filename = secure_filename(f.filename)
|
||||
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
|
||||
|
||||
if ext not in ALLOWED_EXTENSIONS:
|
||||
errors.append(f'Niedozwolony typ pliku: {filename}')
|
||||
continue
|
||||
|
||||
# Read file content for size + magic bytes check
|
||||
content = f.read()
|
||||
f.seek(0)
|
||||
size = len(content)
|
||||
|
||||
max_size = MAX_IMAGE_SIZE if ext in IMAGE_EXTENSIONS else MAX_DOCUMENT_SIZE
|
||||
if size > max_size:
|
||||
limit_mb = max_size // (1024 * 1024)
|
||||
errors.append(f'Plik {filename} przekracza limit {limit_mb}MB.')
|
||||
continue
|
||||
|
||||
total_size += size
|
||||
|
||||
# Magic bytes validation
|
||||
if ext in FILE_SIGNATURES:
|
||||
valid_sig = False
|
||||
for sig in FILE_SIGNATURES[ext]:
|
||||
if content[:len(sig)] == sig:
|
||||
valid_sig = True
|
||||
break
|
||||
if not valid_sig:
|
||||
errors.append(f'Plik {filename} ma nieprawidłowy format.')
|
||||
continue
|
||||
|
||||
valid.append((f, filename, ext, size, content))
|
||||
|
||||
if total_size > MAX_TOTAL_SIZE:
|
||||
errors.append(f'Łączny rozmiar plików przekracza {MAX_TOTAL_SIZE // (1024*1024)}MB.')
|
||||
return [], errors
|
||||
|
||||
return valid, errors
|
||||
|
||||
def save_file(self, content: bytes, ext: str) -> tuple:
|
||||
"""Save file to disk. Returns (stored_filename, relative_path)."""
|
||||
now = datetime.now()
|
||||
subdir = os.path.join(UPLOAD_BASE, str(now.year), f'{now.month:02d}')
|
||||
full_dir = os.path.join(self.app_root, subdir)
|
||||
os.makedirs(full_dir, exist_ok=True)
|
||||
|
||||
stored_filename = f'{uuid.uuid4().hex}.{ext}'
|
||||
full_path = os.path.join(full_dir, stored_filename)
|
||||
|
||||
with open(full_path, 'wb') as out:
|
||||
out.write(content)
|
||||
|
||||
relative_path = os.path.join(subdir, stored_filename)
|
||||
return stored_filename, relative_path
|
||||
|
||||
def get_mime_type(self, ext: str) -> str:
|
||||
"""Get MIME type for extension."""
|
||||
return MIME_TYPES.get(ext, 'application/octet-stream')
|
||||
|
||||
def is_image(self, ext: str) -> bool:
|
||||
"""Check if extension is an image type."""
|
||||
return ext in IMAGE_EXTENSIONS
|
||||
|
||||
def delete_file(self, relative_path: str):
|
||||
"""Delete a file from disk."""
|
||||
full_path = os.path.join(self.app_root, relative_path)
|
||||
if os.path.exists(full_path):
|
||||
os.remove(full_path)
|
||||
Loading…
Reference in New Issue
Block a user