nordabiz/oauth_service.py
Maciej Pienczyn 6b9b8467b6
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: add openid email scopes to Google OAuth for account identification
Without email scope, userinfo endpoint returns 401 and account email
cannot be captured during OAuth flow. Added openid+email to both GBP
and Search Console scopes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 08:26:49 +01:00

377 lines
14 KiB
Python

"""
Shared OAuth 2.0 Service for NordaBiz
=====================================
Supports:
- Google OAuth 2.0 (GBP Business Profile API, Search Console API)
- Meta OAuth 2.0 (Facebook Graph API, Instagram Graph API)
Config via environment variables:
- GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET
- META_APP_ID, META_APP_SECRET
- OAUTH_REDIRECT_BASE_URL (e.g., https://nordabiznes.pl)
"""
import os
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Tuple
import requests
logger = logging.getLogger(__name__)
# OAuth Provider Configurations
OAUTH_PROVIDERS = {
'google': {
'auth_url': 'https://accounts.google.com/o/oauth2/v2/auth',
'token_url': 'https://oauth2.googleapis.com/token',
'scopes': {
'gbp': 'https://www.googleapis.com/auth/business.manage openid email',
'search_console': 'https://www.googleapis.com/auth/webmasters openid email',
},
},
'meta': {
'auth_url': 'https://www.facebook.com/v21.0/dialog/oauth',
'token_url': 'https://graph.facebook.com/v21.0/oauth/access_token',
'scopes': {
'facebook': 'pages_show_list,pages_read_engagement,pages_manage_posts,read_insights',
'instagram': 'instagram_basic,instagram_manage_insights,pages_show_list',
},
},
}
class OAuthService:
"""Shared OAuth 2.0 service for external API integrations."""
def __init__(self):
self.google_client_id = os.getenv('GOOGLE_OAUTH_CLIENT_ID')
self.google_client_secret = os.getenv('GOOGLE_OAUTH_CLIENT_SECRET')
self.meta_app_id = os.getenv('META_APP_ID')
self.meta_app_secret = os.getenv('META_APP_SECRET')
self.redirect_base = os.getenv('OAUTH_REDIRECT_BASE_URL', 'https://nordabiznes.pl')
def get_authorization_url(self, provider: str, service: str, state: str) -> Optional[str]:
"""Generate OAuth authorization URL for a provider/service.
Args:
provider: 'google' or 'meta'
service: 'gbp', 'search_console', 'facebook', 'instagram'
state: CSRF state token (include company_id and user_id encoded)
Returns:
Authorization URL or None if provider not configured
"""
config = OAUTH_PROVIDERS.get(provider)
if not config:
logger.error(f"Unknown OAuth provider: {provider}")
return None
scopes = config['scopes'].get(service, '')
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
if provider == 'google':
if not self.google_client_id:
logger.warning("Google OAuth not configured (GOOGLE_OAUTH_CLIENT_ID)")
return None
params = {
'client_id': self.google_client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': scopes,
'state': state,
'access_type': 'offline',
'prompt': 'consent',
}
elif provider == 'meta':
if not self.meta_app_id:
logger.warning("Meta OAuth not configured (META_APP_ID)")
return None
params = {
'client_id': self.meta_app_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': scopes,
'state': state,
}
else:
return None
query = '&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())
return f"{config['auth_url']}?{query}"
def exchange_code(self, provider: str, code: str) -> Optional[Dict]:
"""Exchange authorization code for access token.
Args:
provider: 'google' or 'meta'
code: Authorization code from callback
Returns:
Dict with access_token, refresh_token, expires_in or None on error
"""
config = OAUTH_PROVIDERS.get(provider)
if not config:
return None
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
if provider == 'google':
data = {
'code': code,
'client_id': self.google_client_id,
'client_secret': self.google_client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
elif provider == 'meta':
data = {
'code': code,
'client_id': self.meta_app_id,
'client_secret': self.meta_app_secret,
'redirect_uri': redirect_uri,
}
else:
return None
try:
response = requests.post(config['token_url'], data=data, timeout=15)
if response.status_code != 200:
logger.error(f"OAuth token exchange failed for {provider}: {response.status_code} - {response.text}")
return None
token_data = response.json()
# For Google: fetch user email to identify which account was used
if provider == 'google' and token_data.get('access_token'):
try:
userinfo_resp = requests.get(
'https://www.googleapis.com/oauth2/v2/userinfo',
headers={'Authorization': f"Bearer {token_data['access_token']}"},
timeout=10
)
if userinfo_resp.status_code == 200:
userinfo = userinfo_resp.json()
token_data['google_email'] = userinfo.get('email')
token_data['google_name'] = userinfo.get('name')
except Exception as e:
logger.warning(f"Could not fetch Google userinfo: {e}")
return token_data
except Exception as e:
logger.error(f"OAuth token exchange error for {provider}: {e}")
return None
def refresh_access_token(self, provider: str, refresh_token: str) -> Optional[Dict]:
"""Refresh an expired access token.
Args:
provider: 'google' or 'meta'
refresh_token: The refresh token
Returns:
Dict with new access_token, expires_in or None
"""
if provider == 'google':
data = {
'client_id': self.google_client_id,
'client_secret': self.google_client_secret,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
}
token_url = OAUTH_PROVIDERS['google']['token_url']
elif provider == 'meta':
# Meta long-lived tokens: exchange short-lived for long-lived
params = {
'grant_type': 'fb_exchange_token',
'client_id': self.meta_app_id,
'client_secret': self.meta_app_secret,
'fb_exchange_token': refresh_token,
}
try:
response = requests.get(
OAUTH_PROVIDERS['meta']['token_url'],
params=params,
timeout=15
)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.error(f"Meta token refresh error: {e}")
return None
else:
return None
try:
response = requests.post(token_url, data=data, timeout=15)
if response.status_code == 200:
return response.json()
logger.error(f"Token refresh failed for {provider}: {response.status_code}")
return None
except Exception as e:
logger.error(f"Token refresh error for {provider}: {e}")
return None
def _try_refresh_token(self, db, token) -> bool:
"""Attempt to refresh an expired token in-place. Returns True on success."""
if not token.refresh_token:
return False
try:
new_data = self.refresh_access_token(token.provider, token.refresh_token)
if new_data and new_data.get('access_token'):
token.access_token = new_data['access_token']
expires_in = new_data.get('expires_in')
if expires_in:
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
token.updated_at = datetime.now()
db.commit()
logger.info(f"Auto-refreshed {token.provider}/{token.service} for company {token.company_id}")
return True
return False
except Exception as e:
logger.error(f"Token auto-refresh error {token.provider}/{token.service}: {e}")
db.rollback()
return False
def save_token(self, db, company_id: int, user_id: int, provider: str,
service: str, token_data: Dict) -> bool:
"""Save or update OAuth token in database.
Args:
db: Database session
company_id: Company ID
user_id: User who authorized
provider: 'google' or 'meta'
service: 'gbp', 'search_console', 'facebook', 'instagram'
token_data: Dict from exchange_code() or refresh_access_token()
Returns:
True if saved successfully
"""
try:
from database import OAuthToken
# Find existing token or create new
token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == provider,
OAuthToken.service == service,
).first()
if not token:
token = OAuthToken(
company_id=company_id,
user_id=user_id,
provider=provider,
service=service,
)
db.add(token)
token.access_token = token_data.get('access_token', '')
token.refresh_token = token_data.get('refresh_token', token.refresh_token)
token.token_type = token_data.get('token_type', 'Bearer')
expires_in = token_data.get('expires_in')
if expires_in:
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
token.scopes = OAUTH_PROVIDERS.get(provider, {}).get('scopes', {}).get(service, '')
token.is_active = True
token.updated_at = datetime.now()
# Save Google account identity if available
if token_data.get('google_email'):
token.account_id = token_data['google_email']
token.account_name = token_data.get('google_name', token_data['google_email'])
db.commit()
logger.info(f"OAuth token saved: {provider}/{service} for company {company_id}")
return True
except Exception as e:
db.rollback()
logger.error(f"Error saving OAuth token: {e}")
return False
def get_valid_token(self, db, company_id: int, provider: str, service: str) -> Optional[str]:
"""Get a valid access token, refreshing if needed.
Args:
db: Database session
company_id: Company ID
provider: 'google' or 'meta'
service: Service name
Returns:
Valid access token string or None
"""
try:
from database import OAuthToken
token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == provider,
OAuthToken.service == service,
OAuthToken.is_active == True,
).first()
if not token:
return None
# Check if token is expired
if token.is_expired:
if token.refresh_token:
if not self._try_refresh_token(db, token):
token.is_active = False
db.commit()
return None
else:
return None
return token.access_token
except Exception as e:
logger.error(f"Error getting valid token: {e}")
return None
def get_connected_services(self, db, company_id: int) -> Dict[str, Dict]:
"""Get status of all connected OAuth services for a company.
Returns:
Dict like {'google/gbp': {'connected': True, 'account_name': '...', 'expires_at': ...}, ...}
"""
try:
from database import OAuthToken
tokens = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.is_active == True,
).all()
result = {}
for token in tokens:
key = f"{token.provider}/{token.service}"
# Auto-refresh expired tokens
refresh_failed = False
if token.is_expired and token.refresh_token:
if not self._try_refresh_token(db, token):
refresh_failed = True
result[key] = {
'connected': True,
'account_name': token.account_name,
'account_id': token.account_id,
'expires_at': token.expires_at.isoformat() if token.expires_at else None,
'is_expired': token.is_expired,
'refresh_failed': refresh_failed,
'updated_at': token.updated_at.isoformat() if token.updated_at else None,
}
return result
except Exception as e:
logger.error(f"Error getting connected services: {e}")
return {}