nordabiz/facebook_graph_service.py
Maciej Pienczyn 599e4bde83
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore(facebook): migrate to *_media_view metrics (Meta deprecation 2026-06-30)
Meta deprecates page_impressions, post_impressions, page_video_views et al.
on 2026-06-30. Replaced by *_media_view family. Both old and new metrics
are requested during the transition window so historical data and fresh
data coexist without UI gaps.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 12:59:45 +02:00

918 lines
36 KiB
Python

"""
Facebook + Instagram Graph API Client
======================================
Uses OAuth 2.0 page tokens to access Facebook Page and Instagram Business data.
API docs: https://developers.facebook.com/docs/graph-api/
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
class FacebookGraphService:
"""Facebook + Instagram Graph API client."""
BASE_URL = "https://graph.facebook.com/v21.0"
def __init__(self, access_token: str):
self.access_token = access_token
self.session = requests.Session()
self.session.timeout = 15
def _get(self, endpoint: str, params: dict = None) -> Optional[Dict]:
"""Make authenticated GET request."""
params = params or {}
params['access_token'] = self.access_token
try:
resp = self.session.get(f"{self.BASE_URL}/{endpoint}", params=params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Facebook API {endpoint} failed: {e}")
return None
def get_managed_pages(self) -> List[Dict]:
"""Get Facebook pages managed by the authenticated user."""
data = self._get('me/accounts', {'fields': 'id,name,category,access_token,fan_count'})
return data.get('data', []) if data else []
def get_page_info(self, page_id: str) -> Optional[Dict]:
"""Get detailed page information."""
return self._get(page_id, {
'fields': 'id,name,fan_count,category,category_list,link,about,description,website,phone,emails,'
'single_line_address,location,hours,followers_count,picture,cover,'
'rating_count,overall_star_rating,were_here_count,founded,mission,'
'talking_about_count,username,verification_status,is_verified,price_range'
})
def get_page_posts_stats(self, page_id: str) -> Dict:
"""Get post counts, engagement stats and last post date from page feed."""
result = {
'posts_30d': 0, 'posts_365d': 0, 'last_post_date': None,
'total_likes': 0, 'total_comments': 0, 'total_shares': 0,
'post_types': {}, # e.g. {'photo': 5, 'video': 2, 'link': 3}
'recent_posts': [], # last 5 posts with title/type/date/engagement
}
now = datetime.now()
since_365d = int((now - timedelta(days=365)).timestamp())
data = self._get(f'{page_id}/feed', {
'fields': 'created_time,message,shares,likes.summary(true),comments.summary(true),attachments{media_type}',
'since': since_365d,
'limit': 100,
})
if not data:
return result
posts = data.get('data', [])
cutoff_30d = now - timedelta(days=30)
for post in posts:
ct = post.get('created_time', '')
if not ct:
continue
try:
post_date = datetime.fromisoformat(ct.replace('+0000', '+00:00').replace('Z', '+00:00'))
post_date_naive = post_date.replace(tzinfo=None)
except (ValueError, TypeError):
result['posts_365d'] += 1
continue
result['posts_365d'] += 1
if post_date_naive >= cutoff_30d:
result['posts_30d'] += 1
if result['last_post_date'] is None or post_date_naive > result['last_post_date']:
result['last_post_date'] = post_date_naive
# Engagement
likes = post.get('likes', {}).get('summary', {}).get('total_count', 0)
comments = post.get('comments', {}).get('summary', {}).get('total_count', 0)
shares = post.get('shares', {}).get('count', 0)
result['total_likes'] += likes
result['total_comments'] += comments
result['total_shares'] += shares
# Post types (from attachments.media_type since 'type' is deprecated)
attachments = post.get('attachments', {}).get('data', [])
ptype = attachments[0].get('media_type', 'status') if attachments else 'status'
result['post_types'][ptype] = result['post_types'].get(ptype, 0) + 1
# Recent posts (top 5 by date)
if len(result['recent_posts']) < 5:
msg = post.get('message', '')
title = (msg[:80] + '...') if len(msg) > 80 else msg
result['recent_posts'].append({
'date': post_date_naive.strftime('%Y-%m-%d'),
'title': title,
'type': ptype,
'likes': likes,
'comments': comments,
'shares': shares,
})
return result
def get_page_insights(self, page_id: str, days: int = 28) -> Dict:
"""Get page insights (views, engagements, reactions).
Fetches metrics individually to handle deprecated/unavailable ones gracefully.
Requires page access token, not user access token.
"""
since = int((datetime.now() - timedelta(days=days)).timestamp())
until = int(datetime.now().timestamp())
# Meta deprecated impressions/views metrics on 2026-06-30.
# Migrated to *_media_view family. Old keys kept temporarily for
# transition (Meta serves both until cutoff).
all_metrics = [
'page_views_total',
'page_post_engagements',
'page_actions_post_reactions_total',
'page_media_view',
'page_total_media_view_unique',
'page_daily_follows',
'page_daily_unfollows',
'page_fans',
'page_fan_adds',
'page_fan_removes',
# Deprecated 2026-06-30 — keep until migration verified end-to-end:
'page_posts_impressions',
'page_video_views',
'page_impressions',
'page_engaged_users',
]
result = {}
for metric_name in all_metrics:
data = self._get(f'{page_id}/insights', {
'metric': metric_name,
'period': 'day',
'since': since,
'until': until,
})
if not data:
continue
for metric in data.get('data', []):
name = metric.get('name', '')
values = metric.get('values', [])
if values:
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
result[name] = total
return result
def get_instagram_account(self, page_id: str) -> Optional[str]:
"""Get linked Instagram Business account ID from a Facebook Page."""
data = self._get(page_id, {'fields': 'instagram_business_account'})
if data and 'instagram_business_account' in data:
return data['instagram_business_account'].get('id')
return None
def get_ig_account_info(self, ig_account_id: str) -> Dict:
"""Get Instagram Business account profile info.
Returns username, name, bio, followers, following, media count, profile pic, website.
"""
data = self._get(ig_account_id, {
'fields': (
'username,name,biography,followers_count,follows_count,'
'media_count,profile_picture_url,website,ig_id'
)
})
return data or {}
def get_ig_media_insights(self, ig_account_id: str, days: int = 28) -> Dict:
"""Get Instagram account insights.
Returns follower_count, media_count, and recent media engagement.
"""
result = {}
# Basic account info
account_data = self._get(ig_account_id, {
'fields': 'followers_count,media_count,username,biography'
})
if account_data:
result['followers_count'] = account_data.get('followers_count', 0)
result['media_count'] = account_data.get('media_count', 0)
result['username'] = account_data.get('username', '')
# Account insights (reach, impressions) — fetch individually for robustness
since = datetime.now() - timedelta(days=days)
until = datetime.now()
for metric_name in ('impressions', 'reach', 'follower_count'):
try:
insights_data = self._get(f'{ig_account_id}/insights', {
'metric': metric_name,
'period': 'day',
'since': int(since.timestamp()),
'until': int(until.timestamp()),
})
if insights_data:
for metric in insights_data.get('data', []):
name = metric.get('name', '')
values = metric.get('values', [])
if values:
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
result[f'ig_{name}_total'] = total
except Exception as e:
logger.debug(f"IG insight {metric_name} failed: {e}")
return result
def get_ig_recent_media(self, ig_account_id: str, limit: int = 25) -> list:
"""Get recent media from Instagram Business account.
Returns list of media with engagement data.
"""
data = self._get(f'{ig_account_id}/media', {
'fields': 'id,caption,media_type,media_url,permalink,timestamp,like_count,comments_count',
'limit': limit,
})
if not data:
return []
return data.get('data', [])
# ============================================================
# PUBLISHING METHODS (Social Publisher)
# ============================================================
def _post(self, endpoint: str, data: dict = None, files: dict = None) -> Optional[Dict]:
"""Make authenticated POST request."""
data = data or {}
params = {'access_token': self.access_token}
try:
if files:
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data, files=files)
else:
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
error_data = None
try:
error_data = e.response.json()
except Exception:
pass
logger.error(f"Facebook API POST {endpoint} failed: {e}, response: {error_data}")
return None
except Exception as e:
logger.error(f"Facebook API POST {endpoint} failed: {e}")
return None
def _delete(self, endpoint: str) -> Optional[Dict]:
"""Make authenticated DELETE request."""
params = {'access_token': self.access_token}
try:
resp = self.session.delete(f"{self.BASE_URL}/{endpoint}", params=params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Facebook API DELETE {endpoint} failed: {e}")
return None
def upload_photo_unpublished(self, page_id: str, image_path: str) -> Optional[str]:
"""Upload photo as unpublished to get photo_id for feed attachment.
Two-step process:
1. Upload photo with published=false -> get photo_id
2. Use photo_id in create_post() attached_media
Args:
page_id: Facebook Page ID
image_path: Local path to image file
Returns:
Photo ID (media_fbid) or None on failure
"""
try:
with open(image_path, 'rb') as f:
files = {'source': (image_path.split('/')[-1], f, 'image/png')}
result = self._post(f'{page_id}/photos', data={'published': 'false'}, files=files)
if result and 'id' in result:
logger.info(f"Uploaded unpublished photo: {result['id']}")
return result['id']
logger.error(f"Photo upload failed: {result}")
return None
except FileNotFoundError:
logger.error(f"Image file not found: {image_path}")
return None
def create_post(self, page_id: str, message: str, image_path: str = None,
published: bool = True, scheduled_time: int = None) -> Optional[Dict]:
"""Create a post on Facebook Page feed.
For posts with images, uses two-step process:
1. Upload photo as unpublished -> photo_id
2. Create feed post with attached_media[0]
Args:
page_id: Facebook Page ID
message: Post text content
image_path: Optional local path to image
published: If False, creates draft (visible only to page admins)
scheduled_time: Unix timestamp for scheduled publishing (min 10 min ahead)
Returns:
API response dict with 'id' key, or None on failure
"""
data = {'message': message}
# Handle image upload
if image_path:
photo_id = self.upload_photo_unpublished(page_id, image_path)
if photo_id:
data['attached_media[0]'] = json.dumps({'media_fbid': photo_id})
else:
logger.warning("Image upload failed, posting without image")
# Handle scheduling
if scheduled_time:
data['published'] = 'false'
data['scheduled_publish_time'] = str(scheduled_time)
elif not published:
data['published'] = 'false'
result = self._post(f'{page_id}/feed', data=data)
if result and 'id' in result:
logger.info(f"Created post: {result['id']} (published={published})")
return result
def publish_draft(self, post_id: str) -> Optional[Dict]:
"""Publish an unpublished (draft) post.
Args:
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
Returns:
API response or None on failure
"""
return self._post(post_id, data={'is_published': 'true'})
def unpublish_post(self, post_id: str) -> Optional[Dict]:
"""Unpublish a public post (make it draft/hidden).
Args:
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
Returns:
API response or None on failure
"""
return self._post(post_id, data={'is_published': 'false'})
def get_post_engagement(self, post_id: str) -> Optional[Dict]:
"""Get engagement metrics for a published post.
Args:
post_id: Facebook post ID
Returns:
Dict with likes, comments, shares, reactions_total or None
"""
fields = 'id,created_time,message,likes.summary(true),comments.summary(true),shares,reactions.summary(true).limit(0)'
result = self._get(post_id, {'fields': fields})
if not result:
return None
return {
'post_id': result.get('id'),
'created_time': result.get('created_time'),
'likes': result.get('likes', {}).get('summary', {}).get('total_count', 0),
'comments': result.get('comments', {}).get('summary', {}).get('total_count', 0),
'shares': result.get('shares', {}).get('count', 0) if result.get('shares') else 0,
'reactions_total': result.get('reactions', {}).get('summary', {}).get('total_count', 0),
}
def get_page_posts(self, page_id: str, limit: int = 10,
after: str = None) -> Optional[Dict]:
"""Get recent posts from a Facebook Page with engagement metrics.
Args:
page_id: Facebook Page ID
limit: Number of posts to fetch (max 100)
after: Pagination cursor for next page
Returns:
Dict with 'posts' list and 'next_cursor' (or None), or None on failure
"""
fields = (
'id,message,created_time,full_picture,permalink_url,status_type,'
'likes.summary(true).limit(0),comments.summary(true).limit(0),'
'shares,reactions.summary(true).limit(0)'
)
params = {'fields': fields, 'limit': limit}
if after:
params['after'] = after
result = self._get(f'{page_id}/posts', params)
if not result:
return None
posts = []
for item in result.get('data', []):
posts.append({
'id': item.get('id'),
'message': item.get('message', ''),
'created_time': item.get('created_time'),
'full_picture': item.get('full_picture'),
'permalink_url': item.get('permalink_url'),
'status_type': item.get('status_type', ''),
'likes': item.get('likes', {}).get('summary', {}).get('total_count', 0),
'comments': item.get('comments', {}).get('summary', {}).get('total_count', 0),
'shares': item.get('shares', {}).get('count', 0) if item.get('shares') else 0,
'reactions_total': item.get('reactions', {}).get('summary', {}).get('total_count', 0),
})
# Extract next page cursor
next_cursor = None
paging = result.get('paging', {})
if paging.get('next'):
next_cursor = paging.get('cursors', {}).get('after')
return {'posts': posts, 'next_cursor': next_cursor}
def get_post_insights_metrics(self, post_id: str) -> Optional[Dict]:
"""Get detailed insights metrics for a specific post.
Note: Only available for posts on Pages with 100+ fans.
Args:
post_id: Facebook post ID
Returns:
Dict with impressions, reach, engaged_users, clicks or None
"""
# post_impressions/post_impressions_unique deprecated 2026-06-30.
# Replaced by post_media_view / post_total_media_view_unique.
# Both old and new requested during transition window.
metrics = ('post_media_view,post_total_media_view_unique,'
'post_impressions,post_impressions_unique,'
'post_engaged_users,post_clicks')
result = self._get(f'{post_id}/insights', {'metric': metrics})
if not result:
return None
insights = {}
for metric in result.get('data', []):
name = metric.get('name', '')
values = metric.get('values', [])
if values:
value = values[0].get('value', 0)
if name in ('post_media_view', 'post_impressions'):
insights['impressions'] = value
elif name in ('post_total_media_view_unique', 'post_impressions_unique'):
insights['reach'] = value
elif name == 'post_engaged_users':
insights['engaged_users'] = value
elif name == 'post_clicks':
insights['clicks'] = value
return insights if insights else None
def delete_post(self, post_id: str) -> bool:
"""Delete a post (published or unpublished).
Args:
post_id: Facebook post ID
Returns:
True if deleted successfully
"""
result = self._delete(post_id)
return bool(result and result.get('success'))
# ============================================================
# SYNC: Facebook Page → CompanySocialMedia
# ============================================================
def sync_facebook_to_social_media(db, company_id: int) -> dict:
"""Fetch Facebook page stats via Graph API and upsert into CompanySocialMedia.
Requires an active OAuth token and SocialMediaConfig for the company.
Called automatically after page selection and manually via sync endpoint.
Args:
db: SQLAlchemy session
company_id: Company ID to sync data for
Returns:
dict with 'success' bool and either 'data' or 'error'
"""
from oauth_service import OAuthService
from database import SocialMediaConfig, CompanySocialMedia
# 1. Get page config
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == company_id,
SocialMediaConfig.is_active == True,
).first()
if not config or not config.page_id:
return {'success': False, 'error': 'no_page', 'message': 'Brak skonfigurowanej strony Facebook'}
page_id = config.page_id
# 2. Use Page Access Token (from config) — required for feed/insights.
# Fall back to User Access Token from OAuth if page token unavailable.
token = config.access_token
if not token:
oauth = OAuthService()
token = oauth.get_valid_token(db, company_id, 'meta', 'facebook')
if not token:
return {'success': False, 'error': 'no_token', 'message': 'Brak aktywnego tokenu Facebook'}
# 3. Fetch page info from Graph API
fb = FacebookGraphService(token)
page_info = fb.get_page_info(page_id)
if not page_info:
return {'success': False, 'error': 'api_failed', 'message': 'Nie udało się pobrać danych strony z Facebook API'}
# 4. Fetch page insights (best-effort, may be empty)
insights = fb.get_page_insights(page_id, 28)
# 4b. Fetch post stats (best-effort)
post_stats = fb.get_page_posts_stats(page_id)
# 5. Calculate metrics
followers = page_info.get('followers_count') or page_info.get('fan_count') or 0
engaged_users = insights.get('page_engaged_users', 0)
engagement_rate = None
if followers > 0 and engaged_users > 0:
engagement_rate = round((engaged_users / followers) * 100, 2)
# Profile completeness: 20 points each for 5 key fields
completeness = 0
if page_info.get('about'):
completeness += 20
if page_info.get('website'):
completeness += 20
if page_info.get('phone'):
completeness += 20
if page_info.get('single_line_address'):
completeness += 20
if page_info.get('picture', {}).get('data', {}).get('url'):
completeness += 20
# URL: prefer API vanity link, then existing URL, then numeric fallback
# Don't replace a vanity URL with a numeric one (e.g. facebook.com/inpipl → facebook.com/123456)
import re
api_link = page_info.get('link')
if api_link and re.search(r'facebook\.com/\d+$', api_link):
api_link = None # Numeric URL, not useful as replacement
# 6. Upsert CompanySocialMedia record
# Look for existing Facebook record for this company (any URL)
existing = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id,
CompanySocialMedia.platform == 'facebook',
).first()
now = datetime.now()
if existing:
csm = existing
# Only update URL if API returned a proper link (not numeric fallback)
if api_link:
csm.url = api_link
# Keep existing URL if API didn't return link
else:
page_url = api_link or f"https://facebook.com/{page_id}"
csm = CompanySocialMedia(
company_id=company_id,
platform='facebook',
url=page_url,
)
db.add(csm)
csm.source = 'facebook_api'
csm.is_valid = True
csm.check_status = 'ok'
csm.page_name = page_info.get('name', '')
csm.followers_count = followers if followers > 0 else csm.followers_count
csm.has_bio = bool(page_info.get('about') or page_info.get('description'))
# Prefer 'about' (short), fallback to 'description' (long)
bio_text = page_info.get('about') or page_info.get('description') or ''
csm.profile_description = bio_text[:500]
# Engagement rate: prefer per-post calculation over insights
if post_stats.get('posts_365d', 0) > 0 and followers > 0:
total_engagement = post_stats['total_likes'] + post_stats['total_comments'] + post_stats['total_shares']
avg_engagement_per_post = total_engagement / post_stats['posts_365d']
csm.engagement_rate = round((avg_engagement_per_post / followers) * 100, 2)
elif engagement_rate is not None:
csm.engagement_rate = engagement_rate
csm.profile_completeness_score = completeness
csm.has_profile_photo = bool(page_info.get('picture', {}).get('data', {}).get('url'))
csm.has_cover_photo = bool(page_info.get('cover', {}).get('source'))
csm.posts_count_30d = post_stats.get('posts_30d', 0)
csm.posts_count_365d = post_stats.get('posts_365d', 0)
if post_stats.get('last_post_date'):
csm.last_post_date = post_stats['last_post_date']
# Posting frequency score: 0-10 based on posts per month
p30 = post_stats.get('posts_30d', 0)
csm.posting_frequency_score = min(10, p30) if p30 > 0 else 0
csm.verified_at = now
csm.last_checked_at = now
# Store all API fields in content_types JSONB (no migration needed)
extra = dict(csm.content_types or {})
# Page info fields
for key in ('category', 'website', 'phone', 'founded', 'mission'):
if page_info.get(key):
extra[key] = page_info[key]
if page_info.get('single_line_address'):
extra['address'] = page_info['single_line_address']
if page_info.get('description'):
extra['description'] = page_info['description'][:500]
if page_info.get('emails'):
extra['emails'] = page_info['emails']
if page_info.get('hours'):
extra['hours'] = page_info['hours']
if page_info.get('location'):
loc = page_info['location']
extra['location'] = {k: loc[k] for k in ('city', 'country', 'latitude', 'longitude', 'street', 'zip') if k in loc}
if page_info.get('rating_count'):
extra['rating_count'] = page_info['rating_count']
if page_info.get('overall_star_rating'):
extra['overall_star_rating'] = page_info['overall_star_rating']
if page_info.get('were_here_count'):
extra['were_here_count'] = page_info['were_here_count']
# Additional page info fields
if page_info.get('talking_about_count'):
extra['talking_about_count'] = page_info['talking_about_count']
if page_info.get('username'):
extra['username'] = page_info['username']
if page_info.get('verification_status'):
extra['verification_status'] = page_info['verification_status']
if page_info.get('is_verified') is not None:
extra['is_verified'] = page_info['is_verified']
if page_info.get('price_range'):
extra['price_range'] = page_info['price_range']
if page_info.get('category_list'):
extra['category_list'] = [c.get('name') for c in page_info['category_list'] if c.get('name')]
# Post engagement stats
extra['total_likes'] = post_stats.get('total_likes', 0)
extra['total_comments'] = post_stats.get('total_comments', 0)
extra['total_shares'] = post_stats.get('total_shares', 0)
if post_stats.get('post_types'):
extra['post_types'] = post_stats['post_types']
if post_stats.get('recent_posts'):
extra['recent_posts'] = post_stats['recent_posts']
# Insights
for key in ('page_media_view', 'page_total_media_view_unique',
'page_views_total', 'page_post_engagements',
'page_fan_adds', 'page_fan_removes',
'page_actions_post_reactions_total',
'page_daily_follows', 'page_daily_unfollows',
# Deprecated 2026-06-30, kept for backward compat:
'page_impressions', 'page_engaged_users',
'page_posts_impressions', 'page_video_views'):
if insights.get(key):
extra[f'insights_{key}'] = insights[key]
csm.content_types = extra
# Append to followers_history
history = list(csm.followers_history or [])
if followers > 0:
today_str = now.strftime('%Y-%m-%d')
# Don't duplicate entries for the same day
if not history or history[-1].get('date') != today_str:
history.append({'date': today_str, 'count': followers})
csm.followers_history = history
db.commit()
logger.info(f"FB sync OK for company {company_id}: {followers} followers, engagement={engagement_rate}")
return {
'success': True,
'data': {
'page_name': csm.page_name,
'followers_count': followers,
'engagement_rate': engagement_rate,
'profile_completeness_score': completeness,
'source': 'facebook_api',
}
}
def sync_instagram_to_social_media(db, company_id: int) -> dict:
"""Fetch Instagram stats via Graph API and upsert into CompanySocialMedia.
Uses the Facebook Page token to access the linked Instagram Business account.
Requires:
- Active Facebook OAuth with instagram_basic permission
- Instagram Business/Creator account linked to a Facebook Page
Args:
db: SQLAlchemy session
company_id: Company ID to sync data for
Returns:
dict with 'success' bool and either 'data' or 'error'
"""
from database import SocialMediaConfig, CompanySocialMedia
# 1. Get Facebook page config (Instagram uses the same Page token)
fb_config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == company_id,
SocialMediaConfig.is_active == True,
).first()
if not fb_config or not fb_config.page_id or not fb_config.access_token:
return {'success': False, 'error': 'no_fb_config',
'message': 'Brak skonfigurowanej strony Facebook (wymagana do Instagram API)'}
token = fb_config.access_token
fb = FacebookGraphService(token)
# 2. Get linked Instagram Business account ID
ig_account_id = fb.get_instagram_account(fb_config.page_id)
if not ig_account_id:
return {'success': False, 'error': 'no_ig_linked',
'message': 'Brak powiązanego konta Instagram Business ze stroną Facebook'}
# 3. Fetch Instagram profile data
ig_info = fb.get_ig_account_info(ig_account_id)
if not ig_info:
return {'success': False, 'error': 'api_failed',
'message': 'Nie udało się pobrać danych profilu Instagram'}
# 4. Fetch insights (best-effort)
ig_insights = fb.get_ig_media_insights(ig_account_id, 28)
# 5. Fetch recent media for engagement calculation
recent_media = fb.get_ig_recent_media(ig_account_id, 25)
# 6. Calculate metrics
followers = ig_info.get('followers_count', 0)
media_count = ig_info.get('media_count', 0)
username = ig_info.get('username', '')
# Engagement rate from recent posts
engagement_rate = None
posts_30d = 0
posts_365d = 0
last_post_date = None
total_engagement = 0
recent_posts_data = []
now = datetime.now()
for media in recent_media:
ts = media.get('timestamp', '')
try:
media_date = datetime.fromisoformat(ts.replace('Z', '+00:00')).replace(tzinfo=None)
except (ValueError, AttributeError):
continue
days_ago = (now - media_date).days
if days_ago <= 365:
posts_365d += 1
likes = media.get('like_count', 0)
comments = media.get('comments_count', 0)
total_engagement += likes + comments
if days_ago <= 30:
posts_30d += 1
if last_post_date is None or media_date > last_post_date:
last_post_date = media_date
if len(recent_posts_data) < 5:
caption = media.get('caption', '') or ''
recent_posts_data.append({
'date': media_date.strftime('%Y-%m-%d'),
'type': media.get('media_type', 'UNKNOWN'),
'likes': likes,
'comments': comments,
'caption': caption[:100],
'permalink': media.get('permalink', ''),
})
if posts_365d > 0 and followers > 0:
avg_engagement = total_engagement / posts_365d
engagement_rate = round((avg_engagement / followers) * 100, 2)
# Profile completeness
completeness = 0
if ig_info.get('biography'):
completeness += 20
if ig_info.get('website'):
completeness += 20
if ig_info.get('profile_picture_url'):
completeness += 20
if followers > 10:
completeness += 20
if posts_30d > 0:
completeness += 20
# 7. Upsert CompanySocialMedia record
existing = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company_id,
CompanySocialMedia.platform == 'instagram',
).first()
if existing:
csm = existing
else:
csm = CompanySocialMedia(
company_id=company_id,
platform='instagram',
url=f'https://instagram.com/{username}' if username else f'https://instagram.com/',
)
db.add(csm)
if username:
csm.url = f'https://instagram.com/{username}'
csm.source = 'instagram_api'
csm.is_valid = True
csm.check_status = 'ok'
csm.page_name = ig_info.get('name', '') or username
csm.followers_count = followers if followers > 0 else csm.followers_count
csm.has_bio = bool(ig_info.get('biography'))
csm.profile_description = (ig_info.get('biography') or '')[:500]
csm.has_profile_photo = bool(ig_info.get('profile_picture_url'))
csm.engagement_rate = engagement_rate
csm.profile_completeness_score = completeness
csm.posts_count_30d = posts_30d
csm.posts_count_365d = posts_365d
if last_post_date:
csm.last_post_date = last_post_date
csm.posting_frequency_score = min(10, posts_30d) if posts_30d > 0 else 0
csm.verified_at = now
csm.last_checked_at = now
# Extra data in content_types JSONB
extra = dict(csm.content_types or {})
extra['ig_account_id'] = ig_account_id
extra['media_count'] = media_count
extra['follows_count'] = ig_info.get('follows_count', 0)
if ig_info.get('website'):
extra['website'] = ig_info['website']
if ig_info.get('profile_picture_url'):
extra['profile_picture_url'] = ig_info['profile_picture_url']
if recent_posts_data:
extra['recent_posts'] = recent_posts_data
# Media type breakdown from recent posts
media_types = {}
for media in recent_media:
mt = media.get('media_type', 'UNKNOWN')
media_types[mt] = media_types.get(mt, 0) + 1
if media_types:
extra['media_types'] = media_types
extra['total_likes'] = sum(m.get('like_count', 0) for m in recent_media)
extra['total_comments'] = sum(m.get('comments_count', 0) for m in recent_media)
# Insights
for key in ('ig_impressions_total', 'ig_reach_total', 'ig_follower_count_total'):
if ig_insights.get(key):
extra[key] = ig_insights[key]
csm.content_types = extra
# Followers history
history = list(csm.followers_history or [])
if followers > 0:
today_str = now.strftime('%Y-%m-%d')
if not history or history[-1].get('date') != today_str:
history.append({'date': today_str, 'count': followers})
csm.followers_history = history
# Save Instagram config (so enrichment knows it's API-managed)
ig_config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'instagram',
SocialMediaConfig.company_id == company_id,
).first()
if not ig_config:
ig_config = SocialMediaConfig(platform='instagram', company_id=company_id)
db.add(ig_config)
ig_config.page_id = ig_account_id
ig_config.page_name = username or ig_info.get('name', '')
ig_config.access_token = token
ig_config.is_active = True
db.commit()
logger.info(f"IG sync OK for company {company_id}: @{username}, "
f"{followers} followers, {media_count} posts, engagement={engagement_rate}")
return {
'success': True,
'data': {
'username': username,
'followers_count': followers,
'media_count': media_count,
'engagement_rate': engagement_rate,
'profile_completeness_score': completeness,
'source': 'instagram_api',
}
}