feat(messages): add Redis service for pub/sub and presence
Singleton Redis client module with graceful fallback when Redis is unavailable. Provides pub/sub (SSE events) and presence tracking (online status, last_seen) for the messaging redesign. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d4fd1f3b06
commit
ccbbf0f141
3
app.py
3
app.py
@ -262,6 +262,9 @@ else:
|
||||
limiter._storage_uri = "memory://"
|
||||
limiter.init_app(app)
|
||||
|
||||
from redis_service import init_redis
|
||||
init_redis(app)
|
||||
|
||||
@limiter.request_filter
|
||||
def is_admin_exempt():
|
||||
"""Exempt logged-in admins from rate limiting."""
|
||||
|
||||
141
redis_service.py
Normal file
141
redis_service.py
Normal file
@ -0,0 +1,141 @@
|
||||
"""
|
||||
Redis service for pub/sub (SSE events) and presence tracking.
|
||||
Uses same Redis instance as Flask-Limiter (localhost:6379/0).
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_redis_client = None
|
||||
_redis_available = False
|
||||
|
||||
|
||||
def init_redis(app=None):
|
||||
"""Connect to Redis, set global _redis_client and _redis_available.
|
||||
Graceful fallback if Redis unavailable.
|
||||
"""
|
||||
global _redis_client, _redis_available
|
||||
try:
|
||||
import redis
|
||||
client = redis.Redis(host='localhost', port=6379, db=0,
|
||||
decode_responses=True,
|
||||
socket_connect_timeout=2,
|
||||
socket_timeout=2)
|
||||
client.ping()
|
||||
_redis_client = client
|
||||
_redis_available = True
|
||||
logger.info("Redis service initialized (localhost:6379/0)")
|
||||
except Exception as e:
|
||||
_redis_client = None
|
||||
_redis_available = False
|
||||
logger.warning(f"Redis service unavailable, pub/sub and presence disabled: {e}")
|
||||
|
||||
|
||||
def get_redis():
|
||||
"""Return Redis client or None if unavailable."""
|
||||
return _redis_client
|
||||
|
||||
|
||||
def is_available():
|
||||
"""Return True if Redis is available."""
|
||||
return _redis_available
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pub/Sub helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def publish_event(user_id, event_type, data):
|
||||
"""Publish a JSON event to user:{user_id}:events channel.
|
||||
|
||||
Args:
|
||||
user_id: Target user ID.
|
||||
event_type: String event type (e.g. 'new_message', 'typing', 'reaction').
|
||||
data: Dict payload to include in the event.
|
||||
"""
|
||||
if not _redis_available or _redis_client is None:
|
||||
return
|
||||
try:
|
||||
channel = f"user:{user_id}:events"
|
||||
payload = json.dumps({"type": event_type, **data})
|
||||
_redis_client.publish(channel, payload)
|
||||
except Exception as e:
|
||||
logger.error(f"publish_event failed for user {user_id}: {e}")
|
||||
|
||||
|
||||
def publish_to_conversation(db, conversation_id, event_type, data, exclude_user_id=None):
|
||||
"""Publish an event to all members of a conversation.
|
||||
|
||||
Queries ConversationMember for all members of the conversation and
|
||||
calls publish_event for each (except excluded user).
|
||||
|
||||
Args:
|
||||
db: SQLAlchemy db instance.
|
||||
conversation_id: ID of the conversation.
|
||||
event_type: String event type.
|
||||
data: Dict payload.
|
||||
exclude_user_id: Optional user ID to skip (e.g. the sender).
|
||||
"""
|
||||
if not _redis_available or _redis_client is None:
|
||||
return
|
||||
try:
|
||||
from database import ConversationMember
|
||||
members = db.session.query(ConversationMember).filter_by(
|
||||
conversation_id=conversation_id
|
||||
).all()
|
||||
for member in members:
|
||||
if exclude_user_id is not None and member.user_id == exclude_user_id:
|
||||
continue
|
||||
publish_event(member.user_id, event_type, data)
|
||||
except Exception as e:
|
||||
logger.error(f"publish_to_conversation failed for conversation {conversation_id}: {e}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Presence helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def set_user_online(user_id):
|
||||
"""Mark user as online with 60s TTL via SETEX."""
|
||||
if not _redis_available or _redis_client is None:
|
||||
return
|
||||
try:
|
||||
_redis_client.setex(f"user:{user_id}:online", 60, "1")
|
||||
except Exception as e:
|
||||
logger.error(f"set_user_online failed for user {user_id}: {e}")
|
||||
|
||||
|
||||
def is_user_online(user_id):
|
||||
"""Return True if user has an active online key in Redis."""
|
||||
if not _redis_available or _redis_client is None:
|
||||
return False
|
||||
try:
|
||||
return _redis_client.exists(f"user:{user_id}:online") > 0
|
||||
except Exception as e:
|
||||
logger.error(f"is_user_online failed for user {user_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_user_last_seen(user_id):
|
||||
"""Return the stored last_seen ISO timestamp string, or None."""
|
||||
if not _redis_available or _redis_client is None:
|
||||
return None
|
||||
try:
|
||||
return _redis_client.get(f"user:{user_id}:last_seen")
|
||||
except Exception as e:
|
||||
logger.error(f"get_user_last_seen failed for user {user_id}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def update_last_seen(user_id):
|
||||
"""Set user:{user_id}:last_seen to current UTC ISO timestamp."""
|
||||
if not _redis_available or _redis_client is None:
|
||||
return
|
||||
try:
|
||||
ts = datetime.now(timezone.utc).isoformat()
|
||||
_redis_client.set(f"user:{user_id}:last_seen", ts)
|
||||
except Exception as e:
|
||||
logger.error(f"update_last_seen failed for user {user_id}: {e}")
|
||||
Loading…
Reference in New Issue
Block a user