""" Shared OAuth 2.0 Service for NordaBiz ===================================== Supports: - Google OAuth 2.0 (GBP Business Profile API, Search Console API) - Meta OAuth 2.0 (Facebook Graph API, Instagram Graph API) Config via environment variables: - GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET - META_APP_ID, META_APP_SECRET - OAUTH_REDIRECT_BASE_URL (e.g., https://nordabiznes.pl) """ import os import logging from datetime import datetime, timedelta from typing import Optional, Dict, Tuple import requests logger = logging.getLogger(__name__) # OAuth Provider Configurations OAUTH_PROVIDERS = { 'google': { 'auth_url': 'https://accounts.google.com/o/oauth2/v2/auth', 'token_url': 'https://oauth2.googleapis.com/token', 'scopes': { 'gbp': 'https://www.googleapis.com/auth/business.manage', 'search_console': 'https://www.googleapis.com/auth/webmasters', }, }, 'meta': { 'auth_url': 'https://www.facebook.com/v21.0/dialog/oauth', 'token_url': 'https://graph.facebook.com/v21.0/oauth/access_token', 'scopes': { 'facebook': 'pages_show_list,pages_read_engagement,pages_manage_posts,read_insights', 'instagram': 'instagram_basic,instagram_manage_insights,pages_show_list', }, }, } class OAuthService: """Shared OAuth 2.0 service for external API integrations.""" def __init__(self): self.google_client_id = os.getenv('GOOGLE_OAUTH_CLIENT_ID') self.google_client_secret = os.getenv('GOOGLE_OAUTH_CLIENT_SECRET') self.meta_app_id = os.getenv('META_APP_ID') self.meta_app_secret = os.getenv('META_APP_SECRET') self.redirect_base = os.getenv('OAUTH_REDIRECT_BASE_URL', 'https://nordabiznes.pl') def get_authorization_url(self, provider: str, service: str, state: str) -> Optional[str]: """Generate OAuth authorization URL for a provider/service. Args: provider: 'google' or 'meta' service: 'gbp', 'search_console', 'facebook', 'instagram' state: CSRF state token (include company_id and user_id encoded) Returns: Authorization URL or None if provider not configured """ config = OAUTH_PROVIDERS.get(provider) if not config: logger.error(f"Unknown OAuth provider: {provider}") return None scopes = config['scopes'].get(service, '') redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}" if provider == 'google': if not self.google_client_id: logger.warning("Google OAuth not configured (GOOGLE_OAUTH_CLIENT_ID)") return None params = { 'client_id': self.google_client_id, 'redirect_uri': redirect_uri, 'response_type': 'code', 'scope': scopes, 'state': state, 'access_type': 'offline', 'prompt': 'consent', } elif provider == 'meta': if not self.meta_app_id: logger.warning("Meta OAuth not configured (META_APP_ID)") return None params = { 'client_id': self.meta_app_id, 'redirect_uri': redirect_uri, 'response_type': 'code', 'scope': scopes, 'state': state, } else: return None query = '&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items()) return f"{config['auth_url']}?{query}" def exchange_code(self, provider: str, code: str) -> Optional[Dict]: """Exchange authorization code for access token. Args: provider: 'google' or 'meta' code: Authorization code from callback Returns: Dict with access_token, refresh_token, expires_in or None on error """ config = OAUTH_PROVIDERS.get(provider) if not config: return None redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}" if provider == 'google': data = { 'code': code, 'client_id': self.google_client_id, 'client_secret': self.google_client_secret, 'redirect_uri': redirect_uri, 'grant_type': 'authorization_code', } elif provider == 'meta': data = { 'code': code, 'client_id': self.meta_app_id, 'client_secret': self.meta_app_secret, 'redirect_uri': redirect_uri, } else: return None try: response = requests.post(config['token_url'], data=data, timeout=15) if response.status_code != 200: logger.error(f"OAuth token exchange failed for {provider}: {response.status_code} - {response.text}") return None return response.json() except Exception as e: logger.error(f"OAuth token exchange error for {provider}: {e}") return None def refresh_access_token(self, provider: str, refresh_token: str) -> Optional[Dict]: """Refresh an expired access token. Args: provider: 'google' or 'meta' refresh_token: The refresh token Returns: Dict with new access_token, expires_in or None """ if provider == 'google': data = { 'client_id': self.google_client_id, 'client_secret': self.google_client_secret, 'refresh_token': refresh_token, 'grant_type': 'refresh_token', } token_url = OAUTH_PROVIDERS['google']['token_url'] elif provider == 'meta': # Meta long-lived tokens: exchange short-lived for long-lived params = { 'grant_type': 'fb_exchange_token', 'client_id': self.meta_app_id, 'client_secret': self.meta_app_secret, 'fb_exchange_token': refresh_token, } try: response = requests.get( OAUTH_PROVIDERS['meta']['token_url'], params=params, timeout=15 ) if response.status_code == 200: return response.json() return None except Exception as e: logger.error(f"Meta token refresh error: {e}") return None else: return None try: response = requests.post(token_url, data=data, timeout=15) if response.status_code == 200: return response.json() logger.error(f"Token refresh failed for {provider}: {response.status_code}") return None except Exception as e: logger.error(f"Token refresh error for {provider}: {e}") return None def _try_refresh_token(self, db, token) -> bool: """Attempt to refresh an expired token in-place. Returns True on success.""" if not token.refresh_token: return False try: new_data = self.refresh_access_token(token.provider, token.refresh_token) if new_data and new_data.get('access_token'): token.access_token = new_data['access_token'] expires_in = new_data.get('expires_in') if expires_in: token.expires_at = datetime.now() + timedelta(seconds=int(expires_in)) token.updated_at = datetime.now() db.commit() logger.info(f"Auto-refreshed {token.provider}/{token.service} for company {token.company_id}") return True return False except Exception as e: logger.error(f"Token auto-refresh error {token.provider}/{token.service}: {e}") db.rollback() return False def save_token(self, db, company_id: int, user_id: int, provider: str, service: str, token_data: Dict) -> bool: """Save or update OAuth token in database. Args: db: Database session company_id: Company ID user_id: User who authorized provider: 'google' or 'meta' service: 'gbp', 'search_console', 'facebook', 'instagram' token_data: Dict from exchange_code() or refresh_access_token() Returns: True if saved successfully """ try: from database import OAuthToken # Find existing token or create new token = db.query(OAuthToken).filter( OAuthToken.company_id == company_id, OAuthToken.provider == provider, OAuthToken.service == service, ).first() if not token: token = OAuthToken( company_id=company_id, user_id=user_id, provider=provider, service=service, ) db.add(token) token.access_token = token_data.get('access_token', '') token.refresh_token = token_data.get('refresh_token', token.refresh_token) token.token_type = token_data.get('token_type', 'Bearer') expires_in = token_data.get('expires_in') if expires_in: token.expires_at = datetime.now() + timedelta(seconds=int(expires_in)) token.scopes = OAUTH_PROVIDERS.get(provider, {}).get('scopes', {}).get(service, '') token.is_active = True token.updated_at = datetime.now() db.commit() logger.info(f"OAuth token saved: {provider}/{service} for company {company_id}") return True except Exception as e: db.rollback() logger.error(f"Error saving OAuth token: {e}") return False def get_valid_token(self, db, company_id: int, provider: str, service: str) -> Optional[str]: """Get a valid access token, refreshing if needed. Args: db: Database session company_id: Company ID provider: 'google' or 'meta' service: Service name Returns: Valid access token string or None """ try: from database import OAuthToken token = db.query(OAuthToken).filter( OAuthToken.company_id == company_id, OAuthToken.provider == provider, OAuthToken.service == service, OAuthToken.is_active == True, ).first() if not token: return None # Check if token is expired if token.is_expired: if token.refresh_token: if not self._try_refresh_token(db, token): token.is_active = False db.commit() return None else: return None return token.access_token except Exception as e: logger.error(f"Error getting valid token: {e}") return None def get_connected_services(self, db, company_id: int) -> Dict[str, Dict]: """Get status of all connected OAuth services for a company. Returns: Dict like {'google/gbp': {'connected': True, 'account_name': '...', 'expires_at': ...}, ...} """ try: from database import OAuthToken tokens = db.query(OAuthToken).filter( OAuthToken.company_id == company_id, OAuthToken.is_active == True, ).all() result = {} for token in tokens: key = f"{token.provider}/{token.service}" # Auto-refresh expired tokens refresh_failed = False if token.is_expired and token.refresh_token: if not self._try_refresh_token(db, token): refresh_failed = True result[key] = { 'connected': True, 'account_name': token.account_name, 'account_id': token.account_id, 'expires_at': token.expires_at.isoformat() if token.expires_at else None, 'is_expired': token.is_expired, 'refresh_failed': refresh_failed, 'updated_at': token.updated_at.isoformat() if token.updated_at else None, } return result except Exception as e: logger.error(f"Error getting connected services: {e}") return {}