nordabiz/blueprints/messages/sse_routes.py
2026-03-27 13:01:37 +01:00

173 lines
5.1 KiB
Python

"""
SSE Stream, Typing Indicator, and Presence Routes
===================================================
Server-Sent Events stream for real-time messaging, presence tracking,
and typing indicators.
"""
import json
import logging
from flask import Response, jsonify, request, stream_with_context
from flask_login import current_user, login_required
from . import bp
from database import SessionLocal, ConversationMember
from utils.decorators import member_required
from redis_service import (
get_redis, is_available,
publish_event, publish_to_conversation,
set_user_online, update_last_seen,
is_user_online, get_user_last_seen,
)
logger = logging.getLogger(__name__)
# ============================================================
# SSE STREAM
# ============================================================
@bp.route('/api/messages/stream')
@login_required
def messages_stream():
"""SSE stream — real-time events for current user.
Subscribes to user:{user_id}:events Redis channel and streams
events to the client via Server-Sent Events.
"""
if not is_available():
return jsonify({'error': 'Real-time streaming unavailable (Redis offline)'}), 503
redis = get_redis()
user_id = current_user.id
set_user_online(user_id)
update_last_seen(user_id)
def generate():
pubsub = redis.pubsub()
channel = f"user:{user_id}:events"
pubsub.subscribe(channel)
try:
# Retry interval and initial connected event
yield "retry: 30000\n"
connected_event = json.dumps({"type": "connected", "user_id": user_id})
yield f"event: connected\ndata: {connected_event}\n\n"
for message in pubsub.listen():
if message['type'] != 'message':
continue
try:
payload = json.loads(message['data'])
event_type = payload.get('type', 'message')
yield f"event: {event_type}\ndata: {json.dumps(payload)}\n\n"
except (json.JSONDecodeError, TypeError):
logger.warning(f"SSE: malformed message on channel {channel}")
continue
except GeneratorExit:
pubsub.unsubscribe(channel)
pubsub.close()
headers = {
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive',
}
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers=headers,
)
# ============================================================
# TYPING INDICATOR
# ============================================================
@bp.route('/api/conversations/<int:conversation_id>/typing', methods=['POST'])
@login_required
@member_required
def typing_indicator(conversation_id):
"""Broadcast a typing indicator to other conversation members."""
db = SessionLocal()
try:
membership = db.query(ConversationMember).filter_by(
conversation_id=conversation_id,
user_id=current_user.id,
).first()
if not membership:
return jsonify({'error': 'Access denied'}), 403
data = {
'conversation_id': conversation_id,
'user_id': current_user.id,
'user_name': current_user.name,
}
# Publish to all members except the current user
members = db.query(ConversationMember).filter_by(
conversation_id=conversation_id,
).all()
for member in members:
if member.user_id == current_user.id:
continue
publish_event(member.user_id, 'typing', data)
finally:
db.close()
return jsonify({'ok': True})
# ============================================================
# PRESENCE HEARTBEAT
# ============================================================
@bp.route('/api/messages/heartbeat', methods=['POST'])
@login_required
def messages_heartbeat():
"""Presence heartbeat — keeps the current user marked as online."""
set_user_online(current_user.id)
update_last_seen(current_user.id)
return jsonify({'ok': True})
# ============================================================
# BATCH PRESENCE CHECK
# ============================================================
@bp.route('/api/users/presence')
@login_required
@member_required
def users_presence():
"""Return online status and last-seen timestamp for a list of user IDs.
Query param: ids — comma-separated or repeated user IDs (max 50).
"""
raw_ids = request.args.getlist('ids')
# Support both ?ids=1&ids=2 and ?ids=1,2
user_ids = []
for item in raw_ids:
for part in item.split(','):
part = part.strip()
if part.isdigit():
user_ids.append(int(part))
user_ids = list(dict.fromkeys(user_ids))[:50] # deduplicate, cap at 50
result = {}
for uid in user_ids:
online = is_user_online(uid)
last_seen = get_user_last_seen(uid)
result[uid] = {
'online': online,
'last_seen': last_seen,
}
return jsonify(result)