nordabiz/facebook_graph_service.py
Maciej Pienczyn 147f36ab75
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: use Page Access Token and fix deprecated Graph API fields
- Use config.access_token (Page Token) instead of User Token for feed/insights
- Remove deprecated 'type' field from feed, use attachments{media_type}
- Fetch insights individually to handle unavailable metrics gracefully

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 12:58:24 +01:00

645 lines
25 KiB
Python

"""
Facebook + Instagram Graph API Client
======================================
Uses OAuth 2.0 page tokens to access Facebook Page and Instagram Business data.
API docs: https://developers.facebook.com/docs/graph-api/
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
class FacebookGraphService:
"""Facebook + Instagram Graph API client."""
BASE_URL = "https://graph.facebook.com/v21.0"
def __init__(self, access_token: str):
self.access_token = access_token
self.session = requests.Session()
self.session.timeout = 15
def _get(self, endpoint: str, params: dict = None) -> Optional[Dict]:
"""Make authenticated GET request."""
params = params or {}
params['access_token'] = self.access_token
try:
resp = self.session.get(f"{self.BASE_URL}/{endpoint}", params=params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Facebook API {endpoint} failed: {e}")
return None
def get_managed_pages(self) -> List[Dict]:
"""Get Facebook pages managed by the authenticated user."""
data = self._get('me/accounts', {'fields': 'id,name,category,access_token,fan_count'})
return data.get('data', []) if data else []
def get_page_info(self, page_id: str) -> Optional[Dict]:
"""Get detailed page information."""
return self._get(page_id, {
'fields': 'id,name,fan_count,category,link,about,description,website,phone,emails,'
'single_line_address,location,hours,followers_count,picture,cover,'
'rating_count,overall_star_rating,were_here_count,founded,mission'
})
def get_page_posts_stats(self, page_id: str) -> Dict:
"""Get post counts, engagement stats and last post date from page feed."""
result = {
'posts_30d': 0, 'posts_365d': 0, 'last_post_date': None,
'total_likes': 0, 'total_comments': 0, 'total_shares': 0,
'post_types': {}, # e.g. {'photo': 5, 'video': 2, 'link': 3}
'recent_posts': [], # last 5 posts with title/type/date/engagement
}
now = datetime.now()
since_365d = int((now - timedelta(days=365)).timestamp())
data = self._get(f'{page_id}/feed', {
'fields': 'created_time,message,shares,likes.summary(true),comments.summary(true),attachments{media_type}',
'since': since_365d,
'limit': 100,
})
if not data:
return result
posts = data.get('data', [])
cutoff_30d = now - timedelta(days=30)
for post in posts:
ct = post.get('created_time', '')
if not ct:
continue
try:
post_date = datetime.fromisoformat(ct.replace('+0000', '+00:00').replace('Z', '+00:00'))
post_date_naive = post_date.replace(tzinfo=None)
except (ValueError, TypeError):
result['posts_365d'] += 1
continue
result['posts_365d'] += 1
if post_date_naive >= cutoff_30d:
result['posts_30d'] += 1
if result['last_post_date'] is None or post_date_naive > result['last_post_date']:
result['last_post_date'] = post_date_naive
# Engagement
likes = post.get('likes', {}).get('summary', {}).get('total_count', 0)
comments = post.get('comments', {}).get('summary', {}).get('total_count', 0)
shares = post.get('shares', {}).get('count', 0)
result['total_likes'] += likes
result['total_comments'] += comments
result['total_shares'] += shares
# Post types (from attachments.media_type since 'type' is deprecated)
attachments = post.get('attachments', {}).get('data', [])
ptype = attachments[0].get('media_type', 'status') if attachments else 'status'
result['post_types'][ptype] = result['post_types'].get(ptype, 0) + 1
# Recent posts (top 5 by date)
if len(result['recent_posts']) < 5:
msg = post.get('message', '')
title = (msg[:80] + '...') if len(msg) > 80 else msg
result['recent_posts'].append({
'date': post_date_naive.strftime('%Y-%m-%d'),
'title': title,
'type': ptype,
'likes': likes,
'comments': comments,
'shares': shares,
})
return result
def get_page_insights(self, page_id: str, days: int = 28) -> Dict:
"""Get page insights (views, engagements, reactions).
Fetches metrics individually to handle deprecated/unavailable ones gracefully.
Requires page access token, not user access token.
"""
since = int((datetime.now() - timedelta(days=days)).timestamp())
until = int(datetime.now().timestamp())
all_metrics = [
'page_views_total',
'page_post_engagements',
'page_actions_post_reactions_total',
# These may be deprecated or require additional permissions:
'page_impressions',
'page_engaged_users',
'page_fans',
'page_fan_adds',
'page_fan_removes',
]
result = {}
for metric_name in all_metrics:
data = self._get(f'{page_id}/insights', {
'metric': metric_name,
'period': 'day',
'since': since,
'until': until,
})
if not data:
continue
for metric in data.get('data', []):
name = metric.get('name', '')
values = metric.get('values', [])
if values:
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
result[name] = total
return result
def get_instagram_account(self, page_id: str) -> Optional[str]:
"""Get linked Instagram Business account ID from a Facebook Page."""
data = self._get(page_id, {'fields': 'instagram_business_account'})
if data and 'instagram_business_account' in data:
return data['instagram_business_account'].get('id')
return None
def get_ig_media_insights(self, ig_account_id: str, days: int = 28) -> Dict:
"""Get Instagram account insights.
Returns follower_count, media_count, and recent media engagement.
"""
result = {}
# Basic account info
account_data = self._get(ig_account_id, {
'fields': 'followers_count,media_count,username,biography'
})
if account_data:
result['followers_count'] = account_data.get('followers_count', 0)
result['media_count'] = account_data.get('media_count', 0)
result['username'] = account_data.get('username', '')
# Account insights (reach, impressions)
since = datetime.now() - timedelta(days=days)
until = datetime.now()
insights_data = self._get(f'{ig_account_id}/insights', {
'metric': 'impressions,reach,follower_count',
'period': 'day',
'since': int(since.timestamp()),
'until': int(until.timestamp()),
})
if insights_data:
for metric in insights_data.get('data', []):
name = metric.get('name', '')
values = metric.get('values', [])
if values:
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
result[f'ig_{name}_total'] = total
return result
# ============================================================
# PUBLISHING METHODS (Social Publisher)
# ============================================================
def _post(self, endpoint: str, data: dict = None, files: dict = None) -> Optional[Dict]:
"""Make authenticated POST request."""
data = data or {}
params = {'access_token': self.access_token}
try:
if files:
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data, files=files)
else:
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
error_data = None
try:
error_data = e.response.json()
except Exception:
pass
logger.error(f"Facebook API POST {endpoint} failed: {e}, response: {error_data}")
return None
except Exception as e:
logger.error(f"Facebook API POST {endpoint} failed: {e}")
return None
def _delete(self, endpoint: str) -> Optional[Dict]:
"""Make authenticated DELETE request."""
params = {'access_token': self.access_token}
try:
resp = self.session.delete(f"{self.BASE_URL}/{endpoint}", params=params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Facebook API DELETE {endpoint} failed: {e}")
return None
def upload_photo_unpublished(self, page_id: str, image_path: str) -> Optional[str]:
"""Upload photo as unpublished to get photo_id for feed attachment.
Two-step process:
1. Upload photo with published=false -> get photo_id
2. Use photo_id in create_post() attached_media
Args:
page_id: Facebook Page ID
image_path: Local path to image file
Returns:
Photo ID (media_fbid) or None on failure
"""
try:
with open(image_path, 'rb') as f:
files = {'source': (image_path.split('/')[-1], f, 'image/png')}
result = self._post(f'{page_id}/photos', data={'published': 'false'}, files=files)
if result and 'id' in result:
logger.info(f"Uploaded unpublished photo: {result['id']}")
return result['id']
logger.error(f"Photo upload failed: {result}")
return None
except FileNotFoundError:
logger.error(f"Image file not found: {image_path}")
return None
def create_post(self, page_id: str, message: str, image_path: str = None,
published: bool = True, scheduled_time: int = None) -> Optional[Dict]:
"""Create a post on Facebook Page feed.
For posts with images, uses two-step process:
1. Upload photo as unpublished -> photo_id
2. Create feed post with attached_media[0]
Args:
page_id: Facebook Page ID
message: Post text content
image_path: Optional local path to image
published: If False, creates draft (visible only to page admins)
scheduled_time: Unix timestamp for scheduled publishing (min 10 min ahead)
Returns:
API response dict with 'id' key, or None on failure
"""
data = {'message': message}
# Handle image upload
if image_path:
photo_id = self.upload_photo_unpublished(page_id, image_path)
if photo_id:
data['attached_media[0]'] = json.dumps({'media_fbid': photo_id})
else:
logger.warning("Image upload failed, posting without image")
# Handle scheduling
if scheduled_time:
data['published'] = 'false'
data['scheduled_publish_time'] = str(scheduled_time)
elif not published:
data['published'] = 'false'
result = self._post(f'{page_id}/feed', data=data)
if result and 'id' in result:
logger.info(f"Created post: {result['id']} (published={published})")
return result
def publish_draft(self, post_id: str) -> Optional[Dict]:
"""Publish an unpublished (draft) post.
Args:
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
Returns:
API response or None on failure
"""
return self._post(post_id, data={'is_published': 'true'})
def unpublish_post(self, post_id: str) -> Optional[Dict]:
"""Unpublish a public post (make it draft/hidden).
Args:
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
Returns:
API response or None on failure
"""
return self._post(post_id, data={'is_published': 'false'})
def get_post_engagement(self, post_id: str) -> Optional[Dict]:
"""Get engagement metrics for a published post.
Args:
post_id: Facebook post ID
Returns:
Dict with likes, comments, shares, reactions_total or None
"""
fields = 'id,created_time,message,likes.summary(true),comments.summary(true),shares,reactions.summary(true).limit(0)'
result = self._get(post_id, {'fields': fields})
if not result:
return None
return {
'post_id': result.get('id'),
'created_time': result.get('created_time'),
'likes': result.get('likes', {}).get('summary', {}).get('total_count', 0),
'comments': result.get('comments', {}).get('summary', {}).get('total_count', 0),
'shares': result.get('shares', {}).get('count', 0) if result.get('shares') else 0,
'reactions_total': result.get('reactions', {}).get('summary', {}).get('total_count', 0),
}
def get_page_posts(self, page_id: str, limit: int = 10,
after: str = None) -> Optional[Dict]:
"""Get recent posts from a Facebook Page with engagement metrics.
Args:
page_id: Facebook Page ID
limit: Number of posts to fetch (max 100)
after: Pagination cursor for next page
Returns:
Dict with 'posts' list and 'next_cursor' (or None), or None on failure
"""
fields = (
'id,message,created_time,full_picture,permalink_url,status_type,'
'likes.summary(true).limit(0),comments.summary(true).limit(0),'
'shares,reactions.summary(true).limit(0)'
)
params = {'fields': fields, 'limit': limit}
if after:
params['after'] = after
result = self._get(f'{page_id}/posts', params)
if not result:
return None
posts = []
for item in result.get('data', []):
posts.append({
'id': item.get('id'),
'message': item.get('message', ''),
'created_time': item.get('created_time'),
'full_picture': item.get('full_picture'),
'permalink_url': item.get('permalink_url'),
'status_type': item.get('status_type', ''),
'likes': item.get('likes', {}).get('summary', {}).get('total_count', 0),
'comments': item.get('comments', {}).get('summary', {}).get('total_count', 0),
'shares': item.get('shares', {}).get('count', 0) if item.get('shares') else 0,
'reactions_total': item.get('reactions', {}).get('summary', {}).get('total_count', 0),
})
# Extract next page cursor
next_cursor = None
paging = result.get('paging', {})
if paging.get('next'):
next_cursor = paging.get('cursors', {}).get('after')
return {'posts': posts, 'next_cursor': next_cursor}
def get_post_insights_metrics(self, post_id: str) -> Optional[Dict]:
"""Get detailed insights metrics for a specific post.
Note: Only available for posts on Pages with 100+ fans.
Args:
post_id: Facebook post ID
Returns:
Dict with impressions, reach, engaged_users, clicks or None
"""
metrics = 'post_impressions,post_impressions_unique,post_engaged_users,post_clicks'
result = self._get(f'{post_id}/insights', {'metric': metrics})
if not result:
return None
insights = {}
for metric in result.get('data', []):
name = metric.get('name', '')
values = metric.get('values', [])
if values:
# Lifetime metrics have a single value
value = values[0].get('value', 0)
if name == 'post_impressions':
insights['impressions'] = value
elif name == 'post_impressions_unique':
insights['reach'] = value
elif name == 'post_engaged_users':
insights['engaged_users'] = value
elif name == 'post_clicks':
insights['clicks'] = value
return insights if insights else None
def delete_post(self, post_id: str) -> bool:
"""Delete a post (published or unpublished).
Args:
post_id: Facebook post ID
Returns:
True if deleted successfully
"""
result = self._delete(post_id)
return bool(result and result.get('success'))
# ============================================================
# SYNC: Facebook Page → CompanySocialMedia
# ============================================================
def sync_facebook_to_social_media(db, company_id: int) -> dict:
"""Fetch Facebook page stats via Graph API and upsert into CompanySocialMedia.
Requires an active OAuth token and SocialMediaConfig for the company.
Called automatically after page selection and manually via sync endpoint.
Args:
db: SQLAlchemy session
company_id: Company ID to sync data for
Returns:
dict with 'success' bool and either 'data' or 'error'
"""
from oauth_service import OAuthService
from database import SocialMediaConfig, CompanySocialMedia
# 1. Get page config
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == company_id,
SocialMediaConfig.is_active == True,
).first()
if not config or not config.page_id:
return {'success': False, 'error': 'no_page', 'message': 'Brak skonfigurowanej strony Facebook'}
page_id = config.page_id
# 2. Use Page Access Token (from config) — required for feed/insights.
# Fall back to User Access Token from OAuth if page token unavailable.
token = config.access_token
if not token:
oauth = OAuthService()
token = oauth.get_valid_token(db, company_id, 'meta', 'facebook')
if not token:
return {'success': False, 'error': 'no_token', 'message': 'Brak aktywnego tokenu Facebook'}
# 3. Fetch page info from Graph API
fb = FacebookGraphService(token)
page_info = fb.get_page_info(page_id)
if not page_info:
return {'success': False, 'error': 'api_failed', 'message': 'Nie udało się pobrać danych strony z Facebook API'}
# 4. Fetch page insights (best-effort, may be empty)
insights = fb.get_page_insights(page_id, 28)
# 4b. Fetch post stats (best-effort)
post_stats = fb.get_page_posts_stats(page_id)
# 5. Calculate metrics
followers = page_info.get('followers_count') or page_info.get('fan_count') or 0
engaged_users = insights.get('page_engaged_users', 0)
engagement_rate = None
if followers > 0 and engaged_users > 0:
engagement_rate = round((engaged_users / followers) * 100, 2)
# Profile completeness: 20 points each for 5 key fields
completeness = 0
if page_info.get('about'):
completeness += 20
if page_info.get('website'):
completeness += 20
if page_info.get('phone'):
completeness += 20
if page_info.get('single_line_address'):
completeness += 20
if page_info.get('picture', {}).get('data', {}).get('url'):
completeness += 20
# URL: prefer API vanity link, then existing URL, then numeric fallback
# Don't replace a vanity URL with a numeric one (e.g. facebook.com/inpipl → facebook.com/123456)
import re
api_link = page_info.get('link')
if api_link and re.search(r'facebook\.com/\d+$', api_link):
api_link = None # Numeric URL, not useful as replacement
# 6. Upsert CompanySocialMedia record
# Look for existing Facebook record for this company (any URL)
existing = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id,
CompanySocialMedia.platform == 'facebook',
).first()
now = datetime.now()
if existing:
csm = existing
# Only update URL if API returned a proper link (not numeric fallback)
if api_link:
csm.url = api_link
# Keep existing URL if API didn't return link
else:
page_url = api_link or f"https://facebook.com/{page_id}"
csm = CompanySocialMedia(
company_id=company_id,
platform='facebook',
url=page_url,
)
db.add(csm)
csm.source = 'facebook_api'
csm.is_valid = True
csm.check_status = 'ok'
csm.page_name = page_info.get('name', '')
csm.followers_count = followers if followers > 0 else csm.followers_count
csm.has_bio = bool(page_info.get('about') or page_info.get('description'))
# Prefer 'about' (short), fallback to 'description' (long)
bio_text = page_info.get('about') or page_info.get('description') or ''
csm.profile_description = bio_text[:500]
# Engagement rate: prefer per-post calculation over insights
if post_stats.get('posts_365d', 0) > 0 and followers > 0:
total_engagement = post_stats['total_likes'] + post_stats['total_comments'] + post_stats['total_shares']
avg_engagement_per_post = total_engagement / post_stats['posts_365d']
csm.engagement_rate = round((avg_engagement_per_post / followers) * 100, 2)
elif engagement_rate is not None:
csm.engagement_rate = engagement_rate
csm.profile_completeness_score = completeness
csm.has_profile_photo = bool(page_info.get('picture', {}).get('data', {}).get('url'))
csm.has_cover_photo = bool(page_info.get('cover', {}).get('source'))
csm.posts_count_30d = post_stats.get('posts_30d', 0)
csm.posts_count_365d = post_stats.get('posts_365d', 0)
if post_stats.get('last_post_date'):
csm.last_post_date = post_stats['last_post_date']
# Posting frequency score: 0-10 based on posts per month
p30 = post_stats.get('posts_30d', 0)
csm.posting_frequency_score = min(10, p30) if p30 > 0 else 0
csm.verified_at = now
csm.last_checked_at = now
# Store all API fields in content_types JSONB (no migration needed)
extra = csm.content_types or {}
# Page info fields
for key in ('category', 'website', 'phone', 'founded', 'mission'):
if page_info.get(key):
extra[key] = page_info[key]
if page_info.get('single_line_address'):
extra['address'] = page_info['single_line_address']
if page_info.get('description'):
extra['description'] = page_info['description'][:500]
if page_info.get('emails'):
extra['emails'] = page_info['emails']
if page_info.get('hours'):
extra['hours'] = page_info['hours']
if page_info.get('location'):
loc = page_info['location']
extra['location'] = {k: loc[k] for k in ('city', 'country', 'latitude', 'longitude', 'street', 'zip') if k in loc}
if page_info.get('rating_count'):
extra['rating_count'] = page_info['rating_count']
if page_info.get('overall_star_rating'):
extra['overall_star_rating'] = page_info['overall_star_rating']
if page_info.get('were_here_count'):
extra['were_here_count'] = page_info['were_here_count']
# Post engagement stats
extra['total_likes'] = post_stats.get('total_likes', 0)
extra['total_comments'] = post_stats.get('total_comments', 0)
extra['total_shares'] = post_stats.get('total_shares', 0)
if post_stats.get('post_types'):
extra['post_types'] = post_stats['post_types']
if post_stats.get('recent_posts'):
extra['recent_posts'] = post_stats['recent_posts']
# Insights
for key in ('page_impressions', 'page_engaged_users', 'page_views_total',
'page_post_engagements', 'page_fan_adds', 'page_fan_removes',
'page_actions_post_reactions_total'):
if insights.get(key):
extra[f'insights_{key}'] = insights[key]
csm.content_types = extra
# Append to followers_history
history = csm.followers_history or []
if followers > 0:
today_str = now.strftime('%Y-%m-%d')
# Don't duplicate entries for the same day
if not history or history[-1].get('date') != today_str:
history.append({'date': today_str, 'count': followers})
csm.followers_history = history
db.commit()
logger.info(f"FB sync OK for company {company_id}: {followers} followers, engagement={engagement_rate}")
return {
'success': True,
'data': {
'page_name': csm.page_name,
'followers_count': followers,
'engagement_rate': engagement_rate,
'profile_completeness_score': completeness,
'source': 'facebook_api',
}
}