Singleton Redis client module with graceful fallback when Redis is unavailable. Provides pub/sub (SSE events) and presence tracking (online status, last_seen) for the messaging redesign. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
142 lines
4.7 KiB
Python
142 lines
4.7 KiB
Python
"""
|
|
Redis service for pub/sub (SSE events) and presence tracking.
|
|
Uses same Redis instance as Flask-Limiter (localhost:6379/0).
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_redis_client = None
|
|
_redis_available = False
|
|
|
|
|
|
def init_redis(app=None):
|
|
"""Connect to Redis, set global _redis_client and _redis_available.
|
|
Graceful fallback if Redis unavailable.
|
|
"""
|
|
global _redis_client, _redis_available
|
|
try:
|
|
import redis
|
|
client = redis.Redis(host='localhost', port=6379, db=0,
|
|
decode_responses=True,
|
|
socket_connect_timeout=2,
|
|
socket_timeout=2)
|
|
client.ping()
|
|
_redis_client = client
|
|
_redis_available = True
|
|
logger.info("Redis service initialized (localhost:6379/0)")
|
|
except Exception as e:
|
|
_redis_client = None
|
|
_redis_available = False
|
|
logger.warning(f"Redis service unavailable, pub/sub and presence disabled: {e}")
|
|
|
|
|
|
def get_redis():
|
|
"""Return Redis client or None if unavailable."""
|
|
return _redis_client
|
|
|
|
|
|
def is_available():
|
|
"""Return True if Redis is available."""
|
|
return _redis_available
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pub/Sub helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def publish_event(user_id, event_type, data):
|
|
"""Publish a JSON event to user:{user_id}:events channel.
|
|
|
|
Args:
|
|
user_id: Target user ID.
|
|
event_type: String event type (e.g. 'new_message', 'typing', 'reaction').
|
|
data: Dict payload to include in the event.
|
|
"""
|
|
if not _redis_available or _redis_client is None:
|
|
return
|
|
try:
|
|
channel = f"user:{user_id}:events"
|
|
payload = json.dumps({"type": event_type, **data})
|
|
_redis_client.publish(channel, payload)
|
|
except Exception as e:
|
|
logger.error(f"publish_event failed for user {user_id}: {e}")
|
|
|
|
|
|
def publish_to_conversation(db, conversation_id, event_type, data, exclude_user_id=None):
|
|
"""Publish an event to all members of a conversation.
|
|
|
|
Queries ConversationMember for all members of the conversation and
|
|
calls publish_event for each (except excluded user).
|
|
|
|
Args:
|
|
db: SQLAlchemy db instance.
|
|
conversation_id: ID of the conversation.
|
|
event_type: String event type.
|
|
data: Dict payload.
|
|
exclude_user_id: Optional user ID to skip (e.g. the sender).
|
|
"""
|
|
if not _redis_available or _redis_client is None:
|
|
return
|
|
try:
|
|
from database import ConversationMember
|
|
members = db.session.query(ConversationMember).filter_by(
|
|
conversation_id=conversation_id
|
|
).all()
|
|
for member in members:
|
|
if exclude_user_id is not None and member.user_id == exclude_user_id:
|
|
continue
|
|
publish_event(member.user_id, event_type, data)
|
|
except Exception as e:
|
|
logger.error(f"publish_to_conversation failed for conversation {conversation_id}: {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Presence helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def set_user_online(user_id):
|
|
"""Mark user as online with 60s TTL via SETEX."""
|
|
if not _redis_available or _redis_client is None:
|
|
return
|
|
try:
|
|
_redis_client.setex(f"user:{user_id}:online", 60, "1")
|
|
except Exception as e:
|
|
logger.error(f"set_user_online failed for user {user_id}: {e}")
|
|
|
|
|
|
def is_user_online(user_id):
|
|
"""Return True if user has an active online key in Redis."""
|
|
if not _redis_available or _redis_client is None:
|
|
return False
|
|
try:
|
|
return _redis_client.exists(f"user:{user_id}:online") > 0
|
|
except Exception as e:
|
|
logger.error(f"is_user_online failed for user {user_id}: {e}")
|
|
return False
|
|
|
|
|
|
def get_user_last_seen(user_id):
|
|
"""Return the stored last_seen ISO timestamp string, or None."""
|
|
if not _redis_available or _redis_client is None:
|
|
return None
|
|
try:
|
|
return _redis_client.get(f"user:{user_id}:last_seen")
|
|
except Exception as e:
|
|
logger.error(f"get_user_last_seen failed for user {user_id}: {e}")
|
|
return None
|
|
|
|
|
|
def update_last_seen(user_id):
|
|
"""Set user:{user_id}:last_seen to current UTC ISO timestamp."""
|
|
if not _redis_available or _redis_client is None:
|
|
return
|
|
try:
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
_redis_client.set(f"user:{user_id}:last_seen", ts)
|
|
except Exception as e:
|
|
logger.error(f"update_last_seen failed for user {user_id}: {e}")
|