nordabiz/blueprints/messages/routes.py
Maciej Pienczyn fcea91fb2a
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: unread badge counts messages from new conversation system
The badge endpoint api_unread_count only counted legacy private_messages
and group_messages. Now also counts unread conv_messages from the new
conversations system, fixing phantom unread counts for users.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 14:16:05 +02:00

813 lines
30 KiB
Python

"""
Messages Routes
===============
Private messages and notifications API.
"""
from datetime import datetime
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from sqlalchemy.orm import joinedload
from database import SessionLocal, User, Company, UserCompanyPermissions, PrivateMessage, UserNotification, UserBlock, Classified
from extensions import limiter
from utils.helpers import sanitize_input, sanitize_html
from utils.decorators import member_required
from email_service import send_email, build_message_notification_email
from message_upload_service import MessageUploadService
# ============================================================
# PRIVATE MESSAGES ROUTES
# ============================================================
@bp.route('/wiadomosci/stare')
@login_required
@member_required
def messages_inbox():
"""Skrzynka odbiorcza"""
page = request.args.get('page', 1, type=int)
per_page = 20
search_query = request.args.get('q', '').strip()
db = SessionLocal()
try:
query = db.query(PrivateMessage).options(
joinedload(PrivateMessage.attachments)
).filter(
PrivateMessage.recipient_id == current_user.id
)
# Search filtering
if search_query:
from sqlalchemy import or_, func
search_pattern = f'%{search_query}%'
query = query.join(User, PrivateMessage.sender_id == User.id).filter(
or_(
PrivateMessage.subject.ilike(search_pattern),
func.regexp_replace(PrivateMessage.content, '<[^>]+>', '', 'g').ilike(search_pattern),
User.name.ilike(search_pattern),
User.email.ilike(search_pattern)
)
)
query = query.order_by(PrivateMessage.created_at.desc())
total = query.count()
messages = query.limit(per_page).offset((page - 1) * per_page).all()
unread_count = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id,
PrivateMessage.is_read == False
).count()
# Fetch group chats for current user
from database import MessageGroupMember, MessageGroup, GroupMessage
group_items = []
group_memberships = db.query(MessageGroupMember).filter(
MessageGroupMember.user_id == current_user.id
).all()
for ms in group_memberships:
group = db.query(MessageGroup).filter(MessageGroup.id == ms.group_id).first()
if not group:
continue
# Apply search filter to groups
if search_query:
from sqlalchemy import or_, func
search_pattern = f'%{search_query}%'
# Check group name or any message content
name_match = group.name and search_query.lower() in group.name.lower()
content_match = db.query(GroupMessage).filter(
GroupMessage.group_id == group.id,
func.regexp_replace(GroupMessage.content, '<[^>]+>', '', 'g').ilike(search_pattern)
).first()
if not name_match and not content_match:
continue
last_msg = db.query(GroupMessage).filter(
GroupMessage.group_id == group.id
).order_by(GroupMessage.created_at.desc()).first()
grp_unread = 0
if ms.last_read_at:
grp_unread = db.query(GroupMessage).filter(
GroupMessage.group_id == group.id,
GroupMessage.sender_id != current_user.id,
GroupMessage.created_at > ms.last_read_at
).count()
else:
grp_unread = db.query(GroupMessage).filter(
GroupMessage.group_id == group.id,
GroupMessage.sender_id != current_user.id
).count()
group_items.append({
'type': 'group',
'group': group,
'last_message': last_msg,
'unread_count': grp_unread,
'membership': ms,
'sort_date': last_msg.created_at if last_msg else group.created_at
})
return render_template('messages/inbox.html',
messages=messages,
page=page,
total_pages=(total + per_page - 1) // per_page,
unread_count=unread_count,
search_query=search_query,
group_items=group_items
)
finally:
db.close()
@bp.route('/wiadomosci/stare/wyslane')
@login_required
@member_required
def messages_sent():
"""Wysłane wiadomości"""
page = request.args.get('page', 1, type=int)
per_page = 20
search_query = request.args.get('q', '').strip()
db = SessionLocal()
try:
query = db.query(PrivateMessage).options(
joinedload(PrivateMessage.attachments)
).filter(
PrivateMessage.sender_id == current_user.id
)
if search_query:
from sqlalchemy import or_, func
search_pattern = f'%{search_query}%'
query = query.join(User, PrivateMessage.recipient_id == User.id).filter(
or_(
PrivateMessage.subject.ilike(search_pattern),
func.regexp_replace(PrivateMessage.content, '<[^>]+>', '', 'g').ilike(search_pattern),
User.name.ilike(search_pattern),
User.email.ilike(search_pattern)
)
)
query = query.order_by(PrivateMessage.created_at.desc())
total = query.count()
messages = query.limit(per_page).offset((page - 1) * per_page).all()
return render_template('messages/sent.html',
messages=messages,
page=page,
total_pages=(total + per_page - 1) // per_page,
search_query=search_query
)
finally:
db.close()
@bp.route('/wiadomosci/nowa')
@login_required
@member_required
def messages_new():
"""Formularz nowej wiadomości"""
recipient_id = request.args.get('to', type=int)
context_type = request.args.get('context_type')
context_id = request.args.get('context_id', type=int)
from_company_id = request.args.get('company', type=int)
db = SessionLocal()
try:
# Lista użytkowników do wyboru
users_with_companies = db.query(
User,
Company.name.label('company_name'),
Company.slug.label('company_slug')
).outerjoin(
UserCompanyPermissions,
(UserCompanyPermissions.user_id == User.id)
).outerjoin(
Company,
(Company.id == UserCompanyPermissions.company_id) & (Company.status == 'active')
).filter(
User.is_active == True,
User.is_verified == True,
User.id != current_user.id
).order_by(User.name).all()
# Deduplicate users (one user may have multiple company permissions)
seen_ids = set()
users = []
for user, company_name, company_slug in users_with_companies:
if user.id not in seen_ids:
seen_ids.add(user.id)
user._company_name = company_name
user._company_slug = company_slug
user._position = user.company_role if user.company_role != 'NONE' else None
users.append(user)
recipient = None
if recipient_id:
recipient = db.query(User).filter(User.id == recipient_id).first()
# Pobierz firmę źródłową (jeśli użytkownik przyszedł z profilu firmy)
from_company = None
if from_company_id:
from_company = db.query(Company).filter(Company.id == from_company_id).first()
# Pobierz kontekst (np. ogłoszenie B2B)
context = None
context_subject = None
if context_type == 'classified' and context_id:
classified = db.query(Classified).filter(Classified.id == context_id).first()
if classified:
context = {
'type': 'classified',
'id': classified.id,
'title': classified.title,
'url': url_for('classifieds.classifieds_view', classified_id=classified.id)
}
context_subject = f"Dotyczy: {classified.title}"
return render_template('messages/compose.html',
users=users,
recipient=recipient,
from_company=from_company,
context=context,
context_type=context_type,
context_id=context_id,
context_subject=context_subject
)
finally:
db.close()
@bp.route('/wiadomosci/wyslij', methods=['POST'])
@login_required
@member_required
def messages_send():
"""Wyślij wiadomość"""
recipient_id = request.form.get('recipient_id', type=int)
subject = sanitize_input(request.form.get('subject', ''), 255)
content = sanitize_html(request.form.get('content', '').strip())
context_type = request.form.get('context_type')
context_id = request.form.get('context_id', type=int)
if not recipient_id or not content:
flash('Odbiorca i treść są wymagane.', 'error')
return redirect(url_for('.messages_new'))
db = SessionLocal()
try:
recipient = db.query(User).filter(User.id == recipient_id).first()
if not recipient:
flash('Odbiorca nie istnieje.', 'error')
return redirect(url_for('.messages_new'))
# Check if either user has blocked the other
block_exists = db.query(UserBlock).filter(
((UserBlock.user_id == current_user.id) & (UserBlock.blocked_user_id == recipient_id)) |
((UserBlock.user_id == recipient_id) & (UserBlock.blocked_user_id == current_user.id))
).first()
if block_exists:
flash('Nie można wysłać wiadomości do tego użytkownika.', 'error')
return redirect(url_for('.messages_new'))
message = PrivateMessage(
sender_id=current_user.id,
recipient_id=recipient_id,
subject=subject,
content=content,
context_type=context_type if context_type else None,
context_id=context_id if context_id else None
)
db.add(message)
db.flush() # get message.id before creating notification
# Process file attachments
if request.files.getlist('attachments'):
import os
from database import MessageAttachment
upload_service = MessageUploadService(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
files = [f for f in request.files.getlist('attachments') if f and f.filename]
if files:
valid_files, attachment_errors = upload_service.validate_files(files)
if attachment_errors:
db.rollback()
for err in attachment_errors:
flash(err, 'error')
return redirect(url_for('.messages_new'))
for f, filename, ext, size, content in valid_files:
stored_filename, relative_path = upload_service.save_file(content, ext)
attachment = MessageAttachment(
message_id=message.id,
filename=filename,
stored_filename=stored_filename,
file_size=size,
mime_type=upload_service.get_mime_type(ext)
)
db.add(attachment)
# Create notification for recipient
sender_name = current_user.name or current_user.email.split('@')[0]
notif_title = f'Nowa wiadomość od {sender_name}'
notif_message = subject if subject else (content[:100] + '...' if len(content) > 100 else content)
notification = UserNotification(
user_id=recipient_id,
title=notif_title,
message=notif_message,
notification_type='message',
related_type='message',
related_id=message.id,
action_url=url_for('.messages_view', message_id=message.id)
)
db.add(notification)
db.commit()
# Send email notification if recipient has it enabled
if recipient.notify_email_messages != False and recipient.email:
try:
message_url = url_for('.messages_view', message_id=message.id, _external=True)
settings_url = url_for('auth.konto_prywatnosc', _external=True)
sender_name = current_user.name or current_user.email.split('@')[0]
preview = (content[:200] + '...') if len(content) > 200 else content
subject_line = f'Nowa wiadomość od {sender_name} — Norda Biznes'
email_html, email_text = build_message_notification_email(
sender_name=sender_name,
subject=subject,
content_preview=preview,
message_url=message_url,
settings_url=settings_url
)
send_email(
to=[recipient.email],
subject=subject_line,
body_text=email_text,
body_html=email_html,
email_type='message_notification',
user_id=recipient.id,
recipient_name=recipient.name
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"Failed to send message email notification: {e}")
if recipient.notify_email_messages != False and recipient.email:
flash('Wiadomość wysłana! Odbiorca zostanie powiadomiony emailem.', 'success')
else:
flash('Wiadomość wysłana!', 'success')
return redirect(url_for('.messages_sent'))
finally:
db.close()
@bp.route('/wiadomosci/<int:message_id>')
@login_required
@member_required
def messages_view(message_id):
"""Czytaj wiadomość"""
db = SessionLocal()
try:
message = db.query(PrivateMessage).options(
joinedload(PrivateMessage.attachments)
).filter(
PrivateMessage.id == message_id
).first()
if not message:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('.messages_inbox'))
# Sprawdź dostęp
if message.recipient_id != current_user.id and message.sender_id != current_user.id:
flash('Brak dostępu do tej wiadomości.', 'error')
return redirect(url_for('.messages_inbox'))
# Oznacz jako przeczytaną
if message.recipient_id == current_user.id and not message.is_read:
message.is_read = True
message.read_at = datetime.now()
db.commit()
# Pobierz kontekst (np. ogłoszenie B2B)
context = None
if message.context_type == 'classified' and message.context_id:
classified = db.query(Classified).filter(Classified.id == message.context_id).first()
if classified:
context = {
'type': 'classified',
'id': classified.id,
'title': classified.title,
'url': url_for('classifieds.classifieds_view', classified_id=classified.id),
'is_active': classified.is_active
}
# Pobierz cały wątek (oryginał + odpowiedzi)
root_id = message.parent_id or message.id
thread = db.query(PrivateMessage).options(
joinedload(PrivateMessage.attachments)
).filter(
(PrivateMessage.id == root_id) | (PrivateMessage.parent_id == root_id)
).order_by(PrivateMessage.created_at.asc()).all()
# Jeśli wątek ma tylko jedną wiadomość, nie pokazuj jako wątek
if len(thread) <= 1:
thread = None
return render_template('messages/view.html', message=message, context=context, thread=thread)
finally:
db.close()
@bp.route('/wiadomosci/<int:message_id>/usun', methods=['POST'])
@login_required
@member_required
def messages_delete(message_id):
"""Usuń wiadomość (nadawca lub odbiorca)"""
db = SessionLocal()
try:
message = db.query(PrivateMessage).options(
joinedload(PrivateMessage.attachments)
).filter(PrivateMessage.id == message_id).first()
if not message:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('.messages_inbox'))
if message.sender_id != current_user.id and message.recipient_id != current_user.id:
flash('Brak dostępu do tej wiadomości.', 'error')
return redirect(url_for('.messages_inbox'))
# Delete attachments from disk
import os
from database import MessageAttachment
for att in message.attachments:
try:
filepath = os.path.join('static', 'uploads', 'messages',
att.created_at.strftime('%Y'), att.created_at.strftime('%m'), att.stored_filename)
if os.path.exists(filepath):
os.remove(filepath)
except Exception:
pass
# Delete replies if this is root message
replies = db.query(PrivateMessage).filter(PrivateMessage.parent_id == message_id).all()
for reply in replies:
for att in reply.attachments:
try:
filepath = os.path.join('static', 'uploads', 'messages',
att.created_at.strftime('%Y'), att.created_at.strftime('%m'), att.stored_filename)
if os.path.exists(filepath):
os.remove(filepath)
except Exception:
pass
db.delete(reply)
db.delete(message)
db.commit()
flash('Wiadomość usunięta.', 'success')
return redirect(url_for('.messages_inbox'))
finally:
db.close()
@bp.route('/api/messages/upload-image', methods=['POST'])
@login_required
@member_required
def messages_upload_image():
"""Upload inline image for Quill editor in messages."""
import os
import uuid
from datetime import datetime as dt
from werkzeug.utils import secure_filename
file = request.files.get('image')
if not file or not file.filename:
return jsonify({'error': 'Brak pliku'}), 400
filename = secure_filename(file.filename)
ext = os.path.splitext(filename)[1].lower()
if ext not in ('.jpg', '.jpeg', '.png', '.gif', '.webp'):
return jsonify({'error': 'Dozwolone formaty: JPG, PNG, GIF, WebP'}), 400
# Read and check size (max 5MB)
data = file.read()
if len(data) > 5 * 1024 * 1024:
return jsonify({'error': 'Maksymalny rozmiar obrazka: 5 MB'}), 400
# Save to uploads
now = dt.now()
upload_dir = os.path.join('static', 'uploads', 'messages', str(now.year), f'{now.month:02d}')
os.makedirs(upload_dir, exist_ok=True)
stored_name = f'{uuid.uuid4().hex}{ext}'
filepath = os.path.join(upload_dir, stored_name)
with open(filepath, 'wb') as f:
f.write(data)
url = f'/{filepath}'
return jsonify({'url': url})
@bp.route('/wiadomosci/<int:message_id>/odpowiedz', methods=['POST'])
@login_required
@member_required
def messages_reply(message_id):
"""Odpowiedz na wiadomość"""
content = sanitize_html(request.form.get('content', '').strip())
if not content:
flash('Treść jest wymagana.', 'error')
return redirect(url_for('.messages_view', message_id=message_id))
db = SessionLocal()
try:
original = db.query(PrivateMessage).filter(
PrivateMessage.id == message_id
).first()
if not original:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('.messages_inbox'))
# Odpowiedz do nadawcy oryginalnej wiadomości
recipient_id = original.sender_id if original.sender_id != current_user.id else original.recipient_id
# Check if either user has blocked the other
block_exists = db.query(UserBlock).filter(
((UserBlock.user_id == current_user.id) & (UserBlock.blocked_user_id == recipient_id)) |
((UserBlock.user_id == recipient_id) & (UserBlock.blocked_user_id == current_user.id))
).first()
if block_exists:
flash('Nie można wysłać wiadomości do tego użytkownika.', 'error')
return redirect(url_for('.messages_inbox'))
recipient = db.query(User).filter(User.id == recipient_id).first()
reply = PrivateMessage(
sender_id=current_user.id,
recipient_id=recipient_id,
subject=f"Re: {original.subject}" if original.subject else None,
content=content,
parent_id=message_id
)
db.add(reply)
db.flush() # Get reply.id for notification link
# Process file attachments
if request.files.getlist('attachments'):
import os
from database import MessageAttachment
upload_service = MessageUploadService(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
files = [f for f in request.files.getlist('attachments') if f and f.filename]
if files:
valid_files, attachment_errors = upload_service.validate_files(files)
if attachment_errors:
db.rollback()
for err in attachment_errors:
flash(err, 'error')
return redirect(url_for('.messages_view', message_id=message_id))
for f, filename, ext, size, content in valid_files:
stored_filename, relative_path = upload_service.save_file(content, ext)
attachment = MessageAttachment(
message_id=reply.id,
filename=filename,
stored_filename=stored_filename,
file_size=size,
mime_type=upload_service.get_mime_type(ext)
)
db.add(attachment)
# Create in-app notification
sender_name = current_user.name or current_user.email.split('@')[0]
notification = UserNotification(
user_id=recipient_id,
title=f'Nowa odpowiedź od {sender_name}',
message=f'{sender_name} odpowiedział(a) na wiadomość' + (f': {original.subject}' if original.subject else ''),
notification_type='message',
related_type='message',
related_id=reply.id,
action_url=url_for('.messages_view', message_id=message_id)
)
db.add(notification)
# Send email notification
if recipient and recipient.notify_email_messages != False and recipient.email:
try:
message_url = url_for('.messages_view', message_id=message_id, _external=True)
settings_url = url_for('auth.konto_prywatnosc', _external=True)
preview = (content[:200] + '...') if len(content) > 200 else content
subject_line = f'Nowa odpowiedź od {sender_name} — Norda Biznes'
email_html, email_text = build_message_notification_email(
sender_name=sender_name,
subject=f"Re: {original.subject}" if original.subject else None,
content_preview=preview,
message_url=message_url,
settings_url=settings_url
)
send_email(
to=[recipient.email],
subject=subject_line,
body_text=email_text,
body_html=email_html,
email_type='message_notification',
user_id=recipient_id,
recipient_name=recipient.name
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"Failed to send reply email notification: {e}")
db.commit()
if recipient and recipient.notify_email_messages != False and recipient.email:
flash('Odpowiedź wysłana! Odbiorca zostanie powiadomiony emailem.', 'success')
else:
flash('Odpowiedź wysłana!', 'success')
return redirect(url_for('.messages_view', message_id=message_id))
finally:
db.close()
@bp.route('/api/messages/unread-count')
@limiter.exempt
@login_required
@member_required
def api_unread_count():
"""API: Liczba nieprzeczytanych wiadomości (1:1 + grupowe)"""
db = SessionLocal()
try:
from sqlalchemy import func
from database import MessageGroupMember, GroupMessage
# 1:1 unread
pm_count = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id,
PrivateMessage.is_read == False
).count()
# Group unread
group_count = 0
memberships = db.query(MessageGroupMember).filter(
MessageGroupMember.user_id == current_user.id
).all()
for m in memberships:
q = db.query(func.count(GroupMessage.id)).filter(
GroupMessage.group_id == m.group_id,
GroupMessage.sender_id != current_user.id
)
if m.last_read_at:
q = q.filter(GroupMessage.created_at > m.last_read_at)
group_count += q.scalar() or 0
# Conversations unread (new messaging system)
from database import ConversationMember, ConvMessage
conv_count = 0
conv_memberships = db.query(ConversationMember).filter(
ConversationMember.user_id == current_user.id,
ConversationMember.is_archived == False
).all()
for cm in conv_memberships:
q = db.query(func.count(ConvMessage.id)).filter(
ConvMessage.conversation_id == cm.conversation_id,
ConvMessage.sender_id != current_user.id
)
if cm.last_read_at:
q = q.filter(ConvMessage.created_at > cm.last_read_at)
conv_count += q.scalar() or 0
return jsonify({'count': pm_count + group_count + conv_count})
finally:
db.close()
# ============================================================
# NOTIFICATIONS API ROUTES
# ============================================================
@bp.route('/api/notifications')
@login_required
@member_required
def api_notifications():
"""API: Get user notifications"""
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
unread_only = request.args.get('unread_only', 'false').lower() == 'true'
db = SessionLocal()
try:
query = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id
)
if unread_only:
query = query.filter(UserNotification.is_read == False)
# Order by most recent first
query = query.order_by(UserNotification.created_at.desc())
total = query.count()
notifications = query.limit(limit).offset(offset).all()
return jsonify({
'success': True,
'notifications': [
{
'id': n.id,
'title': n.title,
'message': n.message,
'notification_type': n.notification_type,
'related_type': n.related_type,
'related_id': n.related_id,
'action_url': n.action_url,
'is_read': n.is_read,
'created_at': n.created_at.isoformat() if n.created_at else None
}
for n in notifications
],
'total': total,
'unread_count': db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
})
finally:
db.close()
@bp.route('/api/notifications/<int:notification_id>/read', methods=['POST'])
@login_required
@member_required
def api_notification_mark_read(notification_id):
"""API: Mark notification as read"""
db = SessionLocal()
try:
notification = db.query(UserNotification).filter(
UserNotification.id == notification_id,
UserNotification.user_id == current_user.id
).first()
if not notification:
return jsonify({'success': False, 'error': 'Powiadomienie nie znalezione'}), 404
notification.mark_as_read()
db.commit()
return jsonify({
'success': True,
'message': 'Oznaczono jako przeczytane'
})
finally:
db.close()
@bp.route('/api/notifications/read-all', methods=['POST'])
@login_required
@member_required
def api_notifications_mark_all_read():
"""API: Mark all notifications as read"""
db = SessionLocal()
try:
updated = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).update({
UserNotification.is_read: True,
UserNotification.read_at: datetime.now()
})
db.commit()
return jsonify({
'success': True,
'message': f'Oznaczono {updated} powiadomien jako przeczytane',
'count': updated
})
finally:
db.close()
@bp.route('/api/notifications/unread-count')
@limiter.exempt
@login_required
@member_required
def api_notifications_unread_count():
"""API: Get unread notifications count"""
db = SessionLocal()
try:
count = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
return jsonify({'count': count})
finally:
db.close()