Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Without email scope, userinfo endpoint returns 401 and account email cannot be captured during OAuth flow. Added openid+email to both GBP and Search Console scopes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
377 lines
14 KiB
Python
377 lines
14 KiB
Python
"""
|
|
Shared OAuth 2.0 Service for NordaBiz
|
|
=====================================
|
|
|
|
Supports:
|
|
- Google OAuth 2.0 (GBP Business Profile API, Search Console API)
|
|
- Meta OAuth 2.0 (Facebook Graph API, Instagram Graph API)
|
|
|
|
Config via environment variables:
|
|
- GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET
|
|
- META_APP_ID, META_APP_SECRET
|
|
- OAUTH_REDIRECT_BASE_URL (e.g., https://nordabiznes.pl)
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Tuple
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# OAuth Provider Configurations
|
|
OAUTH_PROVIDERS = {
|
|
'google': {
|
|
'auth_url': 'https://accounts.google.com/o/oauth2/v2/auth',
|
|
'token_url': 'https://oauth2.googleapis.com/token',
|
|
'scopes': {
|
|
'gbp': 'https://www.googleapis.com/auth/business.manage openid email',
|
|
'search_console': 'https://www.googleapis.com/auth/webmasters openid email',
|
|
},
|
|
},
|
|
'meta': {
|
|
'auth_url': 'https://www.facebook.com/v21.0/dialog/oauth',
|
|
'token_url': 'https://graph.facebook.com/v21.0/oauth/access_token',
|
|
'scopes': {
|
|
'facebook': 'pages_show_list,pages_read_engagement,pages_manage_posts,read_insights',
|
|
'instagram': 'instagram_basic,instagram_manage_insights,pages_show_list',
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
class OAuthService:
|
|
"""Shared OAuth 2.0 service for external API integrations."""
|
|
|
|
def __init__(self):
|
|
self.google_client_id = os.getenv('GOOGLE_OAUTH_CLIENT_ID')
|
|
self.google_client_secret = os.getenv('GOOGLE_OAUTH_CLIENT_SECRET')
|
|
self.meta_app_id = os.getenv('META_APP_ID')
|
|
self.meta_app_secret = os.getenv('META_APP_SECRET')
|
|
self.redirect_base = os.getenv('OAUTH_REDIRECT_BASE_URL', 'https://nordabiznes.pl')
|
|
|
|
def get_authorization_url(self, provider: str, service: str, state: str) -> Optional[str]:
|
|
"""Generate OAuth authorization URL for a provider/service.
|
|
|
|
Args:
|
|
provider: 'google' or 'meta'
|
|
service: 'gbp', 'search_console', 'facebook', 'instagram'
|
|
state: CSRF state token (include company_id and user_id encoded)
|
|
|
|
Returns:
|
|
Authorization URL or None if provider not configured
|
|
"""
|
|
config = OAUTH_PROVIDERS.get(provider)
|
|
if not config:
|
|
logger.error(f"Unknown OAuth provider: {provider}")
|
|
return None
|
|
|
|
scopes = config['scopes'].get(service, '')
|
|
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
|
|
|
|
if provider == 'google':
|
|
if not self.google_client_id:
|
|
logger.warning("Google OAuth not configured (GOOGLE_OAUTH_CLIENT_ID)")
|
|
return None
|
|
params = {
|
|
'client_id': self.google_client_id,
|
|
'redirect_uri': redirect_uri,
|
|
'response_type': 'code',
|
|
'scope': scopes,
|
|
'state': state,
|
|
'access_type': 'offline',
|
|
'prompt': 'consent',
|
|
}
|
|
elif provider == 'meta':
|
|
if not self.meta_app_id:
|
|
logger.warning("Meta OAuth not configured (META_APP_ID)")
|
|
return None
|
|
params = {
|
|
'client_id': self.meta_app_id,
|
|
'redirect_uri': redirect_uri,
|
|
'response_type': 'code',
|
|
'scope': scopes,
|
|
'state': state,
|
|
}
|
|
else:
|
|
return None
|
|
|
|
query = '&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())
|
|
return f"{config['auth_url']}?{query}"
|
|
|
|
def exchange_code(self, provider: str, code: str) -> Optional[Dict]:
|
|
"""Exchange authorization code for access token.
|
|
|
|
Args:
|
|
provider: 'google' or 'meta'
|
|
code: Authorization code from callback
|
|
|
|
Returns:
|
|
Dict with access_token, refresh_token, expires_in or None on error
|
|
"""
|
|
config = OAUTH_PROVIDERS.get(provider)
|
|
if not config:
|
|
return None
|
|
|
|
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
|
|
|
|
if provider == 'google':
|
|
data = {
|
|
'code': code,
|
|
'client_id': self.google_client_id,
|
|
'client_secret': self.google_client_secret,
|
|
'redirect_uri': redirect_uri,
|
|
'grant_type': 'authorization_code',
|
|
}
|
|
elif provider == 'meta':
|
|
data = {
|
|
'code': code,
|
|
'client_id': self.meta_app_id,
|
|
'client_secret': self.meta_app_secret,
|
|
'redirect_uri': redirect_uri,
|
|
}
|
|
else:
|
|
return None
|
|
|
|
try:
|
|
response = requests.post(config['token_url'], data=data, timeout=15)
|
|
if response.status_code != 200:
|
|
logger.error(f"OAuth token exchange failed for {provider}: {response.status_code} - {response.text}")
|
|
return None
|
|
token_data = response.json()
|
|
|
|
# For Google: fetch user email to identify which account was used
|
|
if provider == 'google' and token_data.get('access_token'):
|
|
try:
|
|
userinfo_resp = requests.get(
|
|
'https://www.googleapis.com/oauth2/v2/userinfo',
|
|
headers={'Authorization': f"Bearer {token_data['access_token']}"},
|
|
timeout=10
|
|
)
|
|
if userinfo_resp.status_code == 200:
|
|
userinfo = userinfo_resp.json()
|
|
token_data['google_email'] = userinfo.get('email')
|
|
token_data['google_name'] = userinfo.get('name')
|
|
except Exception as e:
|
|
logger.warning(f"Could not fetch Google userinfo: {e}")
|
|
|
|
return token_data
|
|
except Exception as e:
|
|
logger.error(f"OAuth token exchange error for {provider}: {e}")
|
|
return None
|
|
|
|
def refresh_access_token(self, provider: str, refresh_token: str) -> Optional[Dict]:
|
|
"""Refresh an expired access token.
|
|
|
|
Args:
|
|
provider: 'google' or 'meta'
|
|
refresh_token: The refresh token
|
|
|
|
Returns:
|
|
Dict with new access_token, expires_in or None
|
|
"""
|
|
if provider == 'google':
|
|
data = {
|
|
'client_id': self.google_client_id,
|
|
'client_secret': self.google_client_secret,
|
|
'refresh_token': refresh_token,
|
|
'grant_type': 'refresh_token',
|
|
}
|
|
token_url = OAUTH_PROVIDERS['google']['token_url']
|
|
elif provider == 'meta':
|
|
# Meta long-lived tokens: exchange short-lived for long-lived
|
|
params = {
|
|
'grant_type': 'fb_exchange_token',
|
|
'client_id': self.meta_app_id,
|
|
'client_secret': self.meta_app_secret,
|
|
'fb_exchange_token': refresh_token,
|
|
}
|
|
try:
|
|
response = requests.get(
|
|
OAUTH_PROVIDERS['meta']['token_url'],
|
|
params=params,
|
|
timeout=15
|
|
)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Meta token refresh error: {e}")
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
try:
|
|
response = requests.post(token_url, data=data, timeout=15)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
logger.error(f"Token refresh failed for {provider}: {response.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Token refresh error for {provider}: {e}")
|
|
return None
|
|
|
|
def _try_refresh_token(self, db, token) -> bool:
|
|
"""Attempt to refresh an expired token in-place. Returns True on success."""
|
|
if not token.refresh_token:
|
|
return False
|
|
try:
|
|
new_data = self.refresh_access_token(token.provider, token.refresh_token)
|
|
if new_data and new_data.get('access_token'):
|
|
token.access_token = new_data['access_token']
|
|
expires_in = new_data.get('expires_in')
|
|
if expires_in:
|
|
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
|
|
token.updated_at = datetime.now()
|
|
db.commit()
|
|
logger.info(f"Auto-refreshed {token.provider}/{token.service} for company {token.company_id}")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Token auto-refresh error {token.provider}/{token.service}: {e}")
|
|
db.rollback()
|
|
return False
|
|
|
|
def save_token(self, db, company_id: int, user_id: int, provider: str,
|
|
service: str, token_data: Dict) -> bool:
|
|
"""Save or update OAuth token in database.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
user_id: User who authorized
|
|
provider: 'google' or 'meta'
|
|
service: 'gbp', 'search_console', 'facebook', 'instagram'
|
|
token_data: Dict from exchange_code() or refresh_access_token()
|
|
|
|
Returns:
|
|
True if saved successfully
|
|
"""
|
|
try:
|
|
from database import OAuthToken
|
|
|
|
# Find existing token or create new
|
|
token = db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.provider == provider,
|
|
OAuthToken.service == service,
|
|
).first()
|
|
|
|
if not token:
|
|
token = OAuthToken(
|
|
company_id=company_id,
|
|
user_id=user_id,
|
|
provider=provider,
|
|
service=service,
|
|
)
|
|
db.add(token)
|
|
|
|
token.access_token = token_data.get('access_token', '')
|
|
token.refresh_token = token_data.get('refresh_token', token.refresh_token)
|
|
token.token_type = token_data.get('token_type', 'Bearer')
|
|
|
|
expires_in = token_data.get('expires_in')
|
|
if expires_in:
|
|
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
|
|
|
|
token.scopes = OAUTH_PROVIDERS.get(provider, {}).get('scopes', {}).get(service, '')
|
|
token.is_active = True
|
|
token.updated_at = datetime.now()
|
|
|
|
# Save Google account identity if available
|
|
if token_data.get('google_email'):
|
|
token.account_id = token_data['google_email']
|
|
token.account_name = token_data.get('google_name', token_data['google_email'])
|
|
|
|
db.commit()
|
|
logger.info(f"OAuth token saved: {provider}/{service} for company {company_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error saving OAuth token: {e}")
|
|
return False
|
|
|
|
def get_valid_token(self, db, company_id: int, provider: str, service: str) -> Optional[str]:
|
|
"""Get a valid access token, refreshing if needed.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
provider: 'google' or 'meta'
|
|
service: Service name
|
|
|
|
Returns:
|
|
Valid access token string or None
|
|
"""
|
|
try:
|
|
from database import OAuthToken
|
|
|
|
token = db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.provider == provider,
|
|
OAuthToken.service == service,
|
|
OAuthToken.is_active == True,
|
|
).first()
|
|
|
|
if not token:
|
|
return None
|
|
|
|
# Check if token is expired
|
|
if token.is_expired:
|
|
if token.refresh_token:
|
|
if not self._try_refresh_token(db, token):
|
|
token.is_active = False
|
|
db.commit()
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
return token.access_token
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting valid token: {e}")
|
|
return None
|
|
|
|
def get_connected_services(self, db, company_id: int) -> Dict[str, Dict]:
|
|
"""Get status of all connected OAuth services for a company.
|
|
|
|
Returns:
|
|
Dict like {'google/gbp': {'connected': True, 'account_name': '...', 'expires_at': ...}, ...}
|
|
"""
|
|
try:
|
|
from database import OAuthToken
|
|
|
|
tokens = db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.is_active == True,
|
|
).all()
|
|
|
|
result = {}
|
|
for token in tokens:
|
|
key = f"{token.provider}/{token.service}"
|
|
|
|
# Auto-refresh expired tokens
|
|
refresh_failed = False
|
|
if token.is_expired and token.refresh_token:
|
|
if not self._try_refresh_token(db, token):
|
|
refresh_failed = True
|
|
|
|
result[key] = {
|
|
'connected': True,
|
|
'account_name': token.account_name,
|
|
'account_id': token.account_id,
|
|
'expires_at': token.expires_at.isoformat() if token.expires_at else None,
|
|
'is_expired': token.is_expired,
|
|
'refresh_failed': refresh_failed,
|
|
'updated_at': token.updated_at.isoformat() if token.updated_at else None,
|
|
}
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting connected services: {e}")
|
|
return {}
|