nordabiz/redis_service.py
Maciej Pienczyn ccbbf0f141 feat(messages): add Redis service for pub/sub and presence
Singleton Redis client module with graceful fallback when Redis is
unavailable. Provides pub/sub (SSE events) and presence tracking
(online status, last_seen) for the messaging redesign.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 12:59:21 +01:00

142 lines
4.7 KiB
Python

"""
Redis service for pub/sub (SSE events) and presence tracking.
Uses same Redis instance as Flask-Limiter (localhost:6379/0).
"""
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
_redis_client = None
_redis_available = False
def init_redis(app=None):
"""Connect to Redis, set global _redis_client and _redis_available.
Graceful fallback if Redis unavailable.
"""
global _redis_client, _redis_available
try:
import redis
client = redis.Redis(host='localhost', port=6379, db=0,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2)
client.ping()
_redis_client = client
_redis_available = True
logger.info("Redis service initialized (localhost:6379/0)")
except Exception as e:
_redis_client = None
_redis_available = False
logger.warning(f"Redis service unavailable, pub/sub and presence disabled: {e}")
def get_redis():
"""Return Redis client or None if unavailable."""
return _redis_client
def is_available():
"""Return True if Redis is available."""
return _redis_available
# ---------------------------------------------------------------------------
# Pub/Sub helpers
# ---------------------------------------------------------------------------
def publish_event(user_id, event_type, data):
"""Publish a JSON event to user:{user_id}:events channel.
Args:
user_id: Target user ID.
event_type: String event type (e.g. 'new_message', 'typing', 'reaction').
data: Dict payload to include in the event.
"""
if not _redis_available or _redis_client is None:
return
try:
channel = f"user:{user_id}:events"
payload = json.dumps({"type": event_type, **data})
_redis_client.publish(channel, payload)
except Exception as e:
logger.error(f"publish_event failed for user {user_id}: {e}")
def publish_to_conversation(db, conversation_id, event_type, data, exclude_user_id=None):
"""Publish an event to all members of a conversation.
Queries ConversationMember for all members of the conversation and
calls publish_event for each (except excluded user).
Args:
db: SQLAlchemy db instance.
conversation_id: ID of the conversation.
event_type: String event type.
data: Dict payload.
exclude_user_id: Optional user ID to skip (e.g. the sender).
"""
if not _redis_available or _redis_client is None:
return
try:
from database import ConversationMember
members = db.session.query(ConversationMember).filter_by(
conversation_id=conversation_id
).all()
for member in members:
if exclude_user_id is not None and member.user_id == exclude_user_id:
continue
publish_event(member.user_id, event_type, data)
except Exception as e:
logger.error(f"publish_to_conversation failed for conversation {conversation_id}: {e}")
# ---------------------------------------------------------------------------
# Presence helpers
# ---------------------------------------------------------------------------
def set_user_online(user_id):
"""Mark user as online with 60s TTL via SETEX."""
if not _redis_available or _redis_client is None:
return
try:
_redis_client.setex(f"user:{user_id}:online", 60, "1")
except Exception as e:
logger.error(f"set_user_online failed for user {user_id}: {e}")
def is_user_online(user_id):
"""Return True if user has an active online key in Redis."""
if not _redis_available or _redis_client is None:
return False
try:
return _redis_client.exists(f"user:{user_id}:online") > 0
except Exception as e:
logger.error(f"is_user_online failed for user {user_id}: {e}")
return False
def get_user_last_seen(user_id):
"""Return the stored last_seen ISO timestamp string, or None."""
if not _redis_available or _redis_client is None:
return None
try:
return _redis_client.get(f"user:{user_id}:last_seen")
except Exception as e:
logger.error(f"get_user_last_seen failed for user {user_id}: {e}")
return None
def update_last_seen(user_id):
"""Set user:{user_id}:last_seen to current UTC ISO timestamp."""
if not _redis_available or _redis_client is None:
return
try:
ts = datetime.now(timezone.utc).isoformat()
_redis_client.set(f"user:{user_id}:last_seen", ts)
except Exception as e:
logger.error(f"update_last_seen failed for user {user_id}: {e}")