"""Web Push service — send_push() oraz obsługa whitelist / kill switch. send_push() nigdy nie rzuca w górę — wywołujące (trigger wiadomości) nie może awaritu z powodu błędu pusha. Wszystkie błędy logowane, martwe subskrypcje automatycznie kasowane (410/404/403). """ import base64 import json import logging import os from datetime import datetime from database import PushSubscription, SessionLocal logger = logging.getLogger(__name__) def _vapid_claims_email(): return 'mailto:' + os.getenv('VAPID_CONTACT_EMAIL', 'noreply@nordabiznes.pl') def _vapid_private_key(): """Return VAPID private key jako PEM (preferowane przez pywebpush) albo base64url raw scalar.""" raw = os.getenv('VAPID_PRIVATE_KEY', '').strip() return raw or None def _vapid_public_key(): return os.getenv('VAPID_PUBLIC_KEY', '').strip() or None def _kill_switch_enabled(): return os.getenv('PUSH_KILL_SWITCH', '').strip() in ('1', 'true', 'True', 'yes') def _parse_whitelist(): raw = os.getenv('PUSH_USER_WHITELIST', '').strip() if not raw: return set() return {int(x) for x in raw.split(',') if x.strip().isdigit()} def _user_allowed(user_id: int) -> bool: whitelist = _parse_whitelist() if not whitelist: return True return user_id in whitelist def send_push(user_id: int, title: str, body: str, url: str = '/', icon: str = '/static/img/favicon-192.png', tag: str | None = None, data: dict | None = None) -> dict: """Wyślij Web Push do wszystkich subskrypcji usera. Zwraca dict {'sent': int, 'failed': int, 'pruned': int}. Nigdy nie rzuca wyjątku. """ stats = {'sent': 0, 'failed': 0, 'pruned': 0} if _kill_switch_enabled(): logger.info("push kill switch ON, skipping user=%s", user_id) return stats if not _user_allowed(user_id): return stats # cichy opt-out — faza B: whitelist blokuje resztę userów private_key = _vapid_private_key() if not private_key: logger.warning("VAPID_PRIVATE_KEY not configured, skipping push") return stats try: from pywebpush import webpush, WebPushException except ImportError: logger.warning("pywebpush not installed, skipping push") return stats payload = { 'title': title, 'body': body, 'url': url, 'icon': icon, } if tag: payload['tag'] = tag if data: payload['data'] = data db = SessionLocal() try: subs = db.query(PushSubscription).filter_by(user_id=user_id).all() if not subs: return stats to_prune = [] for sub in subs: try: webpush( subscription_info={ 'endpoint': sub.endpoint, 'keys': {'p256dh': sub.p256dh, 'auth': sub.auth}, }, data=json.dumps(payload, ensure_ascii=False), vapid_private_key=private_key, vapid_claims={'sub': _vapid_claims_email()}, timeout=10, ) sub.last_used_at = datetime.now() stats['sent'] += 1 except WebPushException as e: status = getattr(e.response, 'status_code', None) if getattr(e, 'response', None) else None msg = str(e) if status in (404, 410) or 'InvalidRegistration' in msg or 'expired' in msg.lower(): to_prune.append(sub) stats['pruned'] += 1 elif status == 403: logger.warning("push VAPID rejected (403) sub=%s, pruning", sub.id) to_prune.append(sub) stats['pruned'] += 1 elif status == 413: logger.warning("push payload too large sub=%s", sub.id) stats['failed'] += 1 else: logger.warning("push transient error sub=%s status=%s err=%s", sub.id, status, msg[:200]) stats['failed'] += 1 except Exception as e: logger.warning("push unexpected error sub=%s err=%s", sub.id, str(e)[:200]) stats['failed'] += 1 for sub in to_prune: try: db.delete(sub) except Exception: pass db.commit() except Exception as e: logger.error("push service db error: %s", e) try: db.rollback() except Exception: pass finally: db.close() if stats['sent'] or stats['failed'] or stats['pruned']: logger.info("push sent user=%s sent=%d failed=%d pruned=%d", user_id, stats['sent'], stats['failed'], stats['pruned']) return stats # === iOS pending URL helper (Redis-backed, TTL 5 min) === def set_pending_url(user_id: int, url: str) -> bool: try: from redis_service import get_redis, is_available if not is_available(): return False r = get_redis() r.setex(f'push:pending_url:{user_id}', 300, url) return True except Exception as e: logger.debug("pending_url set failed: %s", e) return False def pop_pending_url(user_id: int) -> str | None: try: from redis_service import get_redis, is_available if not is_available(): return None r = get_redis() key = f'push:pending_url:{user_id}' val = r.get(key) if val: r.delete(key) return val.decode() if isinstance(val, (bytes, bytearray)) else val return None except Exception as e: logger.debug("pending_url pop failed: %s", e) return None