From ccbbf0f1415f2461b9824c011be2a12e16804c55 Mon Sep 17 00:00:00 2001 From: Maciej Pienczyn Date: Fri, 27 Mar 2026 12:59:21 +0100 Subject: [PATCH] feat(messages): add Redis service for pub/sub and presence Singleton Redis client module with graceful fallback when Redis is unavailable. Provides pub/sub (SSE events) and presence tracking (online status, last_seen) for the messaging redesign. Co-Authored-By: Claude Sonnet 4.6 --- app.py | 3 + redis_service.py | 141 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 redis_service.py diff --git a/app.py b/app.py index 679c61c..5437722 100644 --- a/app.py +++ b/app.py @@ -262,6 +262,9 @@ else: limiter._storage_uri = "memory://" limiter.init_app(app) +from redis_service import init_redis +init_redis(app) + @limiter.request_filter def is_admin_exempt(): """Exempt logged-in admins from rate limiting.""" diff --git a/redis_service.py b/redis_service.py new file mode 100644 index 0000000..11adc23 --- /dev/null +++ b/redis_service.py @@ -0,0 +1,141 @@ +""" +Redis service for pub/sub (SSE events) and presence tracking. +Uses same Redis instance as Flask-Limiter (localhost:6379/0). +""" + +import json +import logging +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + +_redis_client = None +_redis_available = False + + +def init_redis(app=None): + """Connect to Redis, set global _redis_client and _redis_available. + Graceful fallback if Redis unavailable. + """ + global _redis_client, _redis_available + try: + import redis + client = redis.Redis(host='localhost', port=6379, db=0, + decode_responses=True, + socket_connect_timeout=2, + socket_timeout=2) + client.ping() + _redis_client = client + _redis_available = True + logger.info("Redis service initialized (localhost:6379/0)") + except Exception as e: + _redis_client = None + _redis_available = False + logger.warning(f"Redis service unavailable, pub/sub and presence disabled: {e}") + + +def get_redis(): + """Return Redis client or None if unavailable.""" + return _redis_client + + +def is_available(): + """Return True if Redis is available.""" + return _redis_available + + +# --------------------------------------------------------------------------- +# Pub/Sub helpers +# --------------------------------------------------------------------------- + +def publish_event(user_id, event_type, data): + """Publish a JSON event to user:{user_id}:events channel. + + Args: + user_id: Target user ID. + event_type: String event type (e.g. 'new_message', 'typing', 'reaction'). + data: Dict payload to include in the event. + """ + if not _redis_available or _redis_client is None: + return + try: + channel = f"user:{user_id}:events" + payload = json.dumps({"type": event_type, **data}) + _redis_client.publish(channel, payload) + except Exception as e: + logger.error(f"publish_event failed for user {user_id}: {e}") + + +def publish_to_conversation(db, conversation_id, event_type, data, exclude_user_id=None): + """Publish an event to all members of a conversation. + + Queries ConversationMember for all members of the conversation and + calls publish_event for each (except excluded user). + + Args: + db: SQLAlchemy db instance. + conversation_id: ID of the conversation. + event_type: String event type. + data: Dict payload. + exclude_user_id: Optional user ID to skip (e.g. the sender). + """ + if not _redis_available or _redis_client is None: + return + try: + from database import ConversationMember + members = db.session.query(ConversationMember).filter_by( + conversation_id=conversation_id + ).all() + for member in members: + if exclude_user_id is not None and member.user_id == exclude_user_id: + continue + publish_event(member.user_id, event_type, data) + except Exception as e: + logger.error(f"publish_to_conversation failed for conversation {conversation_id}: {e}") + + +# --------------------------------------------------------------------------- +# Presence helpers +# --------------------------------------------------------------------------- + +def set_user_online(user_id): + """Mark user as online with 60s TTL via SETEX.""" + if not _redis_available or _redis_client is None: + return + try: + _redis_client.setex(f"user:{user_id}:online", 60, "1") + except Exception as e: + logger.error(f"set_user_online failed for user {user_id}: {e}") + + +def is_user_online(user_id): + """Return True if user has an active online key in Redis.""" + if not _redis_available or _redis_client is None: + return False + try: + return _redis_client.exists(f"user:{user_id}:online") > 0 + except Exception as e: + logger.error(f"is_user_online failed for user {user_id}: {e}") + return False + + +def get_user_last_seen(user_id): + """Return the stored last_seen ISO timestamp string, or None.""" + if not _redis_available or _redis_client is None: + return None + try: + return _redis_client.get(f"user:{user_id}:last_seen") + except Exception as e: + logger.error(f"get_user_last_seen failed for user {user_id}: {e}") + return None + + +def update_last_seen(user_id): + """Set user:{user_id}:last_seen to current UTC ISO timestamp.""" + if not _redis_available or _redis_client is None: + return + try: + ts = datetime.now(timezone.utc).isoformat() + _redis_client.set(f"user:{user_id}:last_seen", ts) + except Exception as e: + logger.error(f"update_last_seen failed for user {user_id}: {e}")