nordabiz/oauth_service.py
Maciej Pienczyn 555cb99c86
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: capture and display Google account email for OAuth connections
After token exchange, fetches Google userinfo to save the email and
name of the Google account used for authorization. Displays this info
on the GBP audit page so users know which account to reconnect with.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 07:32:43 +01:00

377 lines
14 KiB
Python

"""
Shared OAuth 2.0 Service for NordaBiz
=====================================
Supports:
- Google OAuth 2.0 (GBP Business Profile API, Search Console API)
- Meta OAuth 2.0 (Facebook Graph API, Instagram Graph API)
Config via environment variables:
- GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET
- META_APP_ID, META_APP_SECRET
- OAUTH_REDIRECT_BASE_URL (e.g., https://nordabiznes.pl)
"""
import os
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Tuple
import requests
logger = logging.getLogger(__name__)
# OAuth Provider Configurations
OAUTH_PROVIDERS = {
'google': {
'auth_url': 'https://accounts.google.com/o/oauth2/v2/auth',
'token_url': 'https://oauth2.googleapis.com/token',
'scopes': {
'gbp': 'https://www.googleapis.com/auth/business.manage',
'search_console': 'https://www.googleapis.com/auth/webmasters',
},
},
'meta': {
'auth_url': 'https://www.facebook.com/v21.0/dialog/oauth',
'token_url': 'https://graph.facebook.com/v21.0/oauth/access_token',
'scopes': {
'facebook': 'pages_show_list,pages_read_engagement,pages_manage_posts,read_insights',
'instagram': 'instagram_basic,instagram_manage_insights,pages_show_list',
},
},
}
class OAuthService:
"""Shared OAuth 2.0 service for external API integrations."""
def __init__(self):
self.google_client_id = os.getenv('GOOGLE_OAUTH_CLIENT_ID')
self.google_client_secret = os.getenv('GOOGLE_OAUTH_CLIENT_SECRET')
self.meta_app_id = os.getenv('META_APP_ID')
self.meta_app_secret = os.getenv('META_APP_SECRET')
self.redirect_base = os.getenv('OAUTH_REDIRECT_BASE_URL', 'https://nordabiznes.pl')
def get_authorization_url(self, provider: str, service: str, state: str) -> Optional[str]:
"""Generate OAuth authorization URL for a provider/service.
Args:
provider: 'google' or 'meta'
service: 'gbp', 'search_console', 'facebook', 'instagram'
state: CSRF state token (include company_id and user_id encoded)
Returns:
Authorization URL or None if provider not configured
"""
config = OAUTH_PROVIDERS.get(provider)
if not config:
logger.error(f"Unknown OAuth provider: {provider}")
return None
scopes = config['scopes'].get(service, '')
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
if provider == 'google':
if not self.google_client_id:
logger.warning("Google OAuth not configured (GOOGLE_OAUTH_CLIENT_ID)")
return None
params = {
'client_id': self.google_client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': scopes,
'state': state,
'access_type': 'offline',
'prompt': 'consent',
}
elif provider == 'meta':
if not self.meta_app_id:
logger.warning("Meta OAuth not configured (META_APP_ID)")
return None
params = {
'client_id': self.meta_app_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': scopes,
'state': state,
}
else:
return None
query = '&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())
return f"{config['auth_url']}?{query}"
def exchange_code(self, provider: str, code: str) -> Optional[Dict]:
"""Exchange authorization code for access token.
Args:
provider: 'google' or 'meta'
code: Authorization code from callback
Returns:
Dict with access_token, refresh_token, expires_in or None on error
"""
config = OAUTH_PROVIDERS.get(provider)
if not config:
return None
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
if provider == 'google':
data = {
'code': code,
'client_id': self.google_client_id,
'client_secret': self.google_client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
elif provider == 'meta':
data = {
'code': code,
'client_id': self.meta_app_id,
'client_secret': self.meta_app_secret,
'redirect_uri': redirect_uri,
}
else:
return None
try:
response = requests.post(config['token_url'], data=data, timeout=15)
if response.status_code != 200:
logger.error(f"OAuth token exchange failed for {provider}: {response.status_code} - {response.text}")
return None
token_data = response.json()
# For Google: fetch user email to identify which account was used
if provider == 'google' and token_data.get('access_token'):
try:
userinfo_resp = requests.get(
'https://www.googleapis.com/oauth2/v2/userinfo',
headers={'Authorization': f"Bearer {token_data['access_token']}"},
timeout=10
)
if userinfo_resp.status_code == 200:
userinfo = userinfo_resp.json()
token_data['google_email'] = userinfo.get('email')
token_data['google_name'] = userinfo.get('name')
except Exception as e:
logger.warning(f"Could not fetch Google userinfo: {e}")
return token_data
except Exception as e:
logger.error(f"OAuth token exchange error for {provider}: {e}")
return None
def refresh_access_token(self, provider: str, refresh_token: str) -> Optional[Dict]:
"""Refresh an expired access token.
Args:
provider: 'google' or 'meta'
refresh_token: The refresh token
Returns:
Dict with new access_token, expires_in or None
"""
if provider == 'google':
data = {
'client_id': self.google_client_id,
'client_secret': self.google_client_secret,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
}
token_url = OAUTH_PROVIDERS['google']['token_url']
elif provider == 'meta':
# Meta long-lived tokens: exchange short-lived for long-lived
params = {
'grant_type': 'fb_exchange_token',
'client_id': self.meta_app_id,
'client_secret': self.meta_app_secret,
'fb_exchange_token': refresh_token,
}
try:
response = requests.get(
OAUTH_PROVIDERS['meta']['token_url'],
params=params,
timeout=15
)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.error(f"Meta token refresh error: {e}")
return None
else:
return None
try:
response = requests.post(token_url, data=data, timeout=15)
if response.status_code == 200:
return response.json()
logger.error(f"Token refresh failed for {provider}: {response.status_code}")
return None
except Exception as e:
logger.error(f"Token refresh error for {provider}: {e}")
return None
def _try_refresh_token(self, db, token) -> bool:
"""Attempt to refresh an expired token in-place. Returns True on success."""
if not token.refresh_token:
return False
try:
new_data = self.refresh_access_token(token.provider, token.refresh_token)
if new_data and new_data.get('access_token'):
token.access_token = new_data['access_token']
expires_in = new_data.get('expires_in')
if expires_in:
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
token.updated_at = datetime.now()
db.commit()
logger.info(f"Auto-refreshed {token.provider}/{token.service} for company {token.company_id}")
return True
return False
except Exception as e:
logger.error(f"Token auto-refresh error {token.provider}/{token.service}: {e}")
db.rollback()
return False
def save_token(self, db, company_id: int, user_id: int, provider: str,
service: str, token_data: Dict) -> bool:
"""Save or update OAuth token in database.
Args:
db: Database session
company_id: Company ID
user_id: User who authorized
provider: 'google' or 'meta'
service: 'gbp', 'search_console', 'facebook', 'instagram'
token_data: Dict from exchange_code() or refresh_access_token()
Returns:
True if saved successfully
"""
try:
from database import OAuthToken
# Find existing token or create new
token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == provider,
OAuthToken.service == service,
).first()
if not token:
token = OAuthToken(
company_id=company_id,
user_id=user_id,
provider=provider,
service=service,
)
db.add(token)
token.access_token = token_data.get('access_token', '')
token.refresh_token = token_data.get('refresh_token', token.refresh_token)
token.token_type = token_data.get('token_type', 'Bearer')
expires_in = token_data.get('expires_in')
if expires_in:
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
token.scopes = OAUTH_PROVIDERS.get(provider, {}).get('scopes', {}).get(service, '')
token.is_active = True
token.updated_at = datetime.now()
# Save Google account identity if available
if token_data.get('google_email'):
token.account_id = token_data['google_email']
token.account_name = token_data.get('google_name', token_data['google_email'])
db.commit()
logger.info(f"OAuth token saved: {provider}/{service} for company {company_id}")
return True
except Exception as e:
db.rollback()
logger.error(f"Error saving OAuth token: {e}")
return False
def get_valid_token(self, db, company_id: int, provider: str, service: str) -> Optional[str]:
"""Get a valid access token, refreshing if needed.
Args:
db: Database session
company_id: Company ID
provider: 'google' or 'meta'
service: Service name
Returns:
Valid access token string or None
"""
try:
from database import OAuthToken
token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == provider,
OAuthToken.service == service,
OAuthToken.is_active == True,
).first()
if not token:
return None
# Check if token is expired
if token.is_expired:
if token.refresh_token:
if not self._try_refresh_token(db, token):
token.is_active = False
db.commit()
return None
else:
return None
return token.access_token
except Exception as e:
logger.error(f"Error getting valid token: {e}")
return None
def get_connected_services(self, db, company_id: int) -> Dict[str, Dict]:
"""Get status of all connected OAuth services for a company.
Returns:
Dict like {'google/gbp': {'connected': True, 'account_name': '...', 'expires_at': ...}, ...}
"""
try:
from database import OAuthToken
tokens = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.is_active == True,
).all()
result = {}
for token in tokens:
key = f"{token.provider}/{token.service}"
# Auto-refresh expired tokens
refresh_failed = False
if token.is_expired and token.refresh_token:
if not self._try_refresh_token(db, token):
refresh_failed = True
result[key] = {
'connected': True,
'account_name': token.account_name,
'account_id': token.account_id,
'expires_at': token.expires_at.isoformat() if token.expires_at else None,
'is_expired': token.is_expired,
'refresh_failed': refresh_failed,
'updated_at': token.updated_at.isoformat() if token.updated_at else None,
}
return result
except Exception as e:
logger.error(f"Error getting connected services: {e}")
return {}