Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Meta deprecates page_impressions, post_impressions, page_video_views et al. on 2026-06-30. Replaced by *_media_view family. Both old and new metrics are requested during the transition window so historical data and fresh data coexist without UI gaps. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
918 lines
36 KiB
Python
918 lines
36 KiB
Python
"""
|
|
Facebook + Instagram Graph API Client
|
|
======================================
|
|
|
|
Uses OAuth 2.0 page tokens to access Facebook Page and Instagram Business data.
|
|
|
|
API docs: https://developers.facebook.com/docs/graph-api/
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FacebookGraphService:
|
|
"""Facebook + Instagram Graph API client."""
|
|
|
|
BASE_URL = "https://graph.facebook.com/v21.0"
|
|
|
|
def __init__(self, access_token: str):
|
|
self.access_token = access_token
|
|
self.session = requests.Session()
|
|
self.session.timeout = 15
|
|
|
|
def _get(self, endpoint: str, params: dict = None) -> Optional[Dict]:
|
|
"""Make authenticated GET request."""
|
|
params = params or {}
|
|
params['access_token'] = self.access_token
|
|
try:
|
|
resp = self.session.get(f"{self.BASE_URL}/{endpoint}", params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.error(f"Facebook API {endpoint} failed: {e}")
|
|
return None
|
|
|
|
def get_managed_pages(self) -> List[Dict]:
|
|
"""Get Facebook pages managed by the authenticated user."""
|
|
data = self._get('me/accounts', {'fields': 'id,name,category,access_token,fan_count'})
|
|
return data.get('data', []) if data else []
|
|
|
|
def get_page_info(self, page_id: str) -> Optional[Dict]:
|
|
"""Get detailed page information."""
|
|
return self._get(page_id, {
|
|
'fields': 'id,name,fan_count,category,category_list,link,about,description,website,phone,emails,'
|
|
'single_line_address,location,hours,followers_count,picture,cover,'
|
|
'rating_count,overall_star_rating,were_here_count,founded,mission,'
|
|
'talking_about_count,username,verification_status,is_verified,price_range'
|
|
})
|
|
|
|
def get_page_posts_stats(self, page_id: str) -> Dict:
|
|
"""Get post counts, engagement stats and last post date from page feed."""
|
|
result = {
|
|
'posts_30d': 0, 'posts_365d': 0, 'last_post_date': None,
|
|
'total_likes': 0, 'total_comments': 0, 'total_shares': 0,
|
|
'post_types': {}, # e.g. {'photo': 5, 'video': 2, 'link': 3}
|
|
'recent_posts': [], # last 5 posts with title/type/date/engagement
|
|
}
|
|
now = datetime.now()
|
|
since_365d = int((now - timedelta(days=365)).timestamp())
|
|
|
|
data = self._get(f'{page_id}/feed', {
|
|
'fields': 'created_time,message,shares,likes.summary(true),comments.summary(true),attachments{media_type}',
|
|
'since': since_365d,
|
|
'limit': 100,
|
|
})
|
|
if not data:
|
|
return result
|
|
|
|
posts = data.get('data', [])
|
|
cutoff_30d = now - timedelta(days=30)
|
|
for post in posts:
|
|
ct = post.get('created_time', '')
|
|
if not ct:
|
|
continue
|
|
try:
|
|
post_date = datetime.fromisoformat(ct.replace('+0000', '+00:00').replace('Z', '+00:00'))
|
|
post_date_naive = post_date.replace(tzinfo=None)
|
|
except (ValueError, TypeError):
|
|
result['posts_365d'] += 1
|
|
continue
|
|
|
|
result['posts_365d'] += 1
|
|
if post_date_naive >= cutoff_30d:
|
|
result['posts_30d'] += 1
|
|
if result['last_post_date'] is None or post_date_naive > result['last_post_date']:
|
|
result['last_post_date'] = post_date_naive
|
|
|
|
# Engagement
|
|
likes = post.get('likes', {}).get('summary', {}).get('total_count', 0)
|
|
comments = post.get('comments', {}).get('summary', {}).get('total_count', 0)
|
|
shares = post.get('shares', {}).get('count', 0)
|
|
result['total_likes'] += likes
|
|
result['total_comments'] += comments
|
|
result['total_shares'] += shares
|
|
|
|
# Post types (from attachments.media_type since 'type' is deprecated)
|
|
attachments = post.get('attachments', {}).get('data', [])
|
|
ptype = attachments[0].get('media_type', 'status') if attachments else 'status'
|
|
result['post_types'][ptype] = result['post_types'].get(ptype, 0) + 1
|
|
|
|
# Recent posts (top 5 by date)
|
|
if len(result['recent_posts']) < 5:
|
|
msg = post.get('message', '')
|
|
title = (msg[:80] + '...') if len(msg) > 80 else msg
|
|
result['recent_posts'].append({
|
|
'date': post_date_naive.strftime('%Y-%m-%d'),
|
|
'title': title,
|
|
'type': ptype,
|
|
'likes': likes,
|
|
'comments': comments,
|
|
'shares': shares,
|
|
})
|
|
|
|
return result
|
|
|
|
def get_page_insights(self, page_id: str, days: int = 28) -> Dict:
|
|
"""Get page insights (views, engagements, reactions).
|
|
|
|
Fetches metrics individually to handle deprecated/unavailable ones gracefully.
|
|
Requires page access token, not user access token.
|
|
"""
|
|
since = int((datetime.now() - timedelta(days=days)).timestamp())
|
|
until = int(datetime.now().timestamp())
|
|
|
|
# Meta deprecated impressions/views metrics on 2026-06-30.
|
|
# Migrated to *_media_view family. Old keys kept temporarily for
|
|
# transition (Meta serves both until cutoff).
|
|
all_metrics = [
|
|
'page_views_total',
|
|
'page_post_engagements',
|
|
'page_actions_post_reactions_total',
|
|
'page_media_view',
|
|
'page_total_media_view_unique',
|
|
'page_daily_follows',
|
|
'page_daily_unfollows',
|
|
'page_fans',
|
|
'page_fan_adds',
|
|
'page_fan_removes',
|
|
# Deprecated 2026-06-30 — keep until migration verified end-to-end:
|
|
'page_posts_impressions',
|
|
'page_video_views',
|
|
'page_impressions',
|
|
'page_engaged_users',
|
|
]
|
|
|
|
result = {}
|
|
for metric_name in all_metrics:
|
|
data = self._get(f'{page_id}/insights', {
|
|
'metric': metric_name,
|
|
'period': 'day',
|
|
'since': since,
|
|
'until': until,
|
|
})
|
|
if not data:
|
|
continue
|
|
for metric in data.get('data', []):
|
|
name = metric.get('name', '')
|
|
values = metric.get('values', [])
|
|
if values:
|
|
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
|
|
result[name] = total
|
|
|
|
return result
|
|
|
|
def get_instagram_account(self, page_id: str) -> Optional[str]:
|
|
"""Get linked Instagram Business account ID from a Facebook Page."""
|
|
data = self._get(page_id, {'fields': 'instagram_business_account'})
|
|
if data and 'instagram_business_account' in data:
|
|
return data['instagram_business_account'].get('id')
|
|
return None
|
|
|
|
def get_ig_account_info(self, ig_account_id: str) -> Dict:
|
|
"""Get Instagram Business account profile info.
|
|
|
|
Returns username, name, bio, followers, following, media count, profile pic, website.
|
|
"""
|
|
data = self._get(ig_account_id, {
|
|
'fields': (
|
|
'username,name,biography,followers_count,follows_count,'
|
|
'media_count,profile_picture_url,website,ig_id'
|
|
)
|
|
})
|
|
return data or {}
|
|
|
|
def get_ig_media_insights(self, ig_account_id: str, days: int = 28) -> Dict:
|
|
"""Get Instagram account insights.
|
|
|
|
Returns follower_count, media_count, and recent media engagement.
|
|
"""
|
|
result = {}
|
|
|
|
# Basic account info
|
|
account_data = self._get(ig_account_id, {
|
|
'fields': 'followers_count,media_count,username,biography'
|
|
})
|
|
if account_data:
|
|
result['followers_count'] = account_data.get('followers_count', 0)
|
|
result['media_count'] = account_data.get('media_count', 0)
|
|
result['username'] = account_data.get('username', '')
|
|
|
|
# Account insights (reach, impressions) — fetch individually for robustness
|
|
since = datetime.now() - timedelta(days=days)
|
|
until = datetime.now()
|
|
for metric_name in ('impressions', 'reach', 'follower_count'):
|
|
try:
|
|
insights_data = self._get(f'{ig_account_id}/insights', {
|
|
'metric': metric_name,
|
|
'period': 'day',
|
|
'since': int(since.timestamp()),
|
|
'until': int(until.timestamp()),
|
|
})
|
|
if insights_data:
|
|
for metric in insights_data.get('data', []):
|
|
name = metric.get('name', '')
|
|
values = metric.get('values', [])
|
|
if values:
|
|
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
|
|
result[f'ig_{name}_total'] = total
|
|
except Exception as e:
|
|
logger.debug(f"IG insight {metric_name} failed: {e}")
|
|
|
|
return result
|
|
|
|
def get_ig_recent_media(self, ig_account_id: str, limit: int = 25) -> list:
|
|
"""Get recent media from Instagram Business account.
|
|
|
|
Returns list of media with engagement data.
|
|
"""
|
|
data = self._get(f'{ig_account_id}/media', {
|
|
'fields': 'id,caption,media_type,media_url,permalink,timestamp,like_count,comments_count',
|
|
'limit': limit,
|
|
})
|
|
if not data:
|
|
return []
|
|
return data.get('data', [])
|
|
|
|
# ============================================================
|
|
# PUBLISHING METHODS (Social Publisher)
|
|
# ============================================================
|
|
|
|
def _post(self, endpoint: str, data: dict = None, files: dict = None) -> Optional[Dict]:
|
|
"""Make authenticated POST request."""
|
|
data = data or {}
|
|
params = {'access_token': self.access_token}
|
|
try:
|
|
if files:
|
|
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data, files=files)
|
|
else:
|
|
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
error_data = None
|
|
try:
|
|
error_data = e.response.json()
|
|
except Exception:
|
|
pass
|
|
logger.error(f"Facebook API POST {endpoint} failed: {e}, response: {error_data}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Facebook API POST {endpoint} failed: {e}")
|
|
return None
|
|
|
|
def _delete(self, endpoint: str) -> Optional[Dict]:
|
|
"""Make authenticated DELETE request."""
|
|
params = {'access_token': self.access_token}
|
|
try:
|
|
resp = self.session.delete(f"{self.BASE_URL}/{endpoint}", params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.error(f"Facebook API DELETE {endpoint} failed: {e}")
|
|
return None
|
|
|
|
def upload_photo_unpublished(self, page_id: str, image_path: str) -> Optional[str]:
|
|
"""Upload photo as unpublished to get photo_id for feed attachment.
|
|
|
|
Two-step process:
|
|
1. Upload photo with published=false -> get photo_id
|
|
2. Use photo_id in create_post() attached_media
|
|
|
|
Args:
|
|
page_id: Facebook Page ID
|
|
image_path: Local path to image file
|
|
|
|
Returns:
|
|
Photo ID (media_fbid) or None on failure
|
|
"""
|
|
try:
|
|
with open(image_path, 'rb') as f:
|
|
files = {'source': (image_path.split('/')[-1], f, 'image/png')}
|
|
result = self._post(f'{page_id}/photos', data={'published': 'false'}, files=files)
|
|
if result and 'id' in result:
|
|
logger.info(f"Uploaded unpublished photo: {result['id']}")
|
|
return result['id']
|
|
logger.error(f"Photo upload failed: {result}")
|
|
return None
|
|
except FileNotFoundError:
|
|
logger.error(f"Image file not found: {image_path}")
|
|
return None
|
|
|
|
def create_post(self, page_id: str, message: str, image_path: str = None,
|
|
published: bool = True, scheduled_time: int = None) -> Optional[Dict]:
|
|
"""Create a post on Facebook Page feed.
|
|
|
|
For posts with images, uses two-step process:
|
|
1. Upload photo as unpublished -> photo_id
|
|
2. Create feed post with attached_media[0]
|
|
|
|
Args:
|
|
page_id: Facebook Page ID
|
|
message: Post text content
|
|
image_path: Optional local path to image
|
|
published: If False, creates draft (visible only to page admins)
|
|
scheduled_time: Unix timestamp for scheduled publishing (min 10 min ahead)
|
|
|
|
Returns:
|
|
API response dict with 'id' key, or None on failure
|
|
"""
|
|
data = {'message': message}
|
|
|
|
# Handle image upload
|
|
if image_path:
|
|
photo_id = self.upload_photo_unpublished(page_id, image_path)
|
|
if photo_id:
|
|
data['attached_media[0]'] = json.dumps({'media_fbid': photo_id})
|
|
else:
|
|
logger.warning("Image upload failed, posting without image")
|
|
|
|
# Handle scheduling
|
|
if scheduled_time:
|
|
data['published'] = 'false'
|
|
data['scheduled_publish_time'] = str(scheduled_time)
|
|
elif not published:
|
|
data['published'] = 'false'
|
|
|
|
result = self._post(f'{page_id}/feed', data=data)
|
|
if result and 'id' in result:
|
|
logger.info(f"Created post: {result['id']} (published={published})")
|
|
return result
|
|
|
|
def publish_draft(self, post_id: str) -> Optional[Dict]:
|
|
"""Publish an unpublished (draft) post.
|
|
|
|
Args:
|
|
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
|
|
|
|
Returns:
|
|
API response or None on failure
|
|
"""
|
|
return self._post(post_id, data={'is_published': 'true'})
|
|
|
|
def unpublish_post(self, post_id: str) -> Optional[Dict]:
|
|
"""Unpublish a public post (make it draft/hidden).
|
|
|
|
Args:
|
|
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
|
|
|
|
Returns:
|
|
API response or None on failure
|
|
"""
|
|
return self._post(post_id, data={'is_published': 'false'})
|
|
|
|
def get_post_engagement(self, post_id: str) -> Optional[Dict]:
|
|
"""Get engagement metrics for a published post.
|
|
|
|
Args:
|
|
post_id: Facebook post ID
|
|
|
|
Returns:
|
|
Dict with likes, comments, shares, reactions_total or None
|
|
"""
|
|
fields = 'id,created_time,message,likes.summary(true),comments.summary(true),shares,reactions.summary(true).limit(0)'
|
|
result = self._get(post_id, {'fields': fields})
|
|
if not result:
|
|
return None
|
|
|
|
return {
|
|
'post_id': result.get('id'),
|
|
'created_time': result.get('created_time'),
|
|
'likes': result.get('likes', {}).get('summary', {}).get('total_count', 0),
|
|
'comments': result.get('comments', {}).get('summary', {}).get('total_count', 0),
|
|
'shares': result.get('shares', {}).get('count', 0) if result.get('shares') else 0,
|
|
'reactions_total': result.get('reactions', {}).get('summary', {}).get('total_count', 0),
|
|
}
|
|
|
|
def get_page_posts(self, page_id: str, limit: int = 10,
|
|
after: str = None) -> Optional[Dict]:
|
|
"""Get recent posts from a Facebook Page with engagement metrics.
|
|
|
|
Args:
|
|
page_id: Facebook Page ID
|
|
limit: Number of posts to fetch (max 100)
|
|
after: Pagination cursor for next page
|
|
|
|
Returns:
|
|
Dict with 'posts' list and 'next_cursor' (or None), or None on failure
|
|
"""
|
|
fields = (
|
|
'id,message,created_time,full_picture,permalink_url,status_type,'
|
|
'likes.summary(true).limit(0),comments.summary(true).limit(0),'
|
|
'shares,reactions.summary(true).limit(0)'
|
|
)
|
|
params = {'fields': fields, 'limit': limit}
|
|
if after:
|
|
params['after'] = after
|
|
|
|
result = self._get(f'{page_id}/posts', params)
|
|
if not result:
|
|
return None
|
|
|
|
posts = []
|
|
for item in result.get('data', []):
|
|
posts.append({
|
|
'id': item.get('id'),
|
|
'message': item.get('message', ''),
|
|
'created_time': item.get('created_time'),
|
|
'full_picture': item.get('full_picture'),
|
|
'permalink_url': item.get('permalink_url'),
|
|
'status_type': item.get('status_type', ''),
|
|
'likes': item.get('likes', {}).get('summary', {}).get('total_count', 0),
|
|
'comments': item.get('comments', {}).get('summary', {}).get('total_count', 0),
|
|
'shares': item.get('shares', {}).get('count', 0) if item.get('shares') else 0,
|
|
'reactions_total': item.get('reactions', {}).get('summary', {}).get('total_count', 0),
|
|
})
|
|
|
|
# Extract next page cursor
|
|
next_cursor = None
|
|
paging = result.get('paging', {})
|
|
if paging.get('next'):
|
|
next_cursor = paging.get('cursors', {}).get('after')
|
|
|
|
return {'posts': posts, 'next_cursor': next_cursor}
|
|
|
|
def get_post_insights_metrics(self, post_id: str) -> Optional[Dict]:
|
|
"""Get detailed insights metrics for a specific post.
|
|
|
|
Note: Only available for posts on Pages with 100+ fans.
|
|
|
|
Args:
|
|
post_id: Facebook post ID
|
|
|
|
Returns:
|
|
Dict with impressions, reach, engaged_users, clicks or None
|
|
"""
|
|
# post_impressions/post_impressions_unique deprecated 2026-06-30.
|
|
# Replaced by post_media_view / post_total_media_view_unique.
|
|
# Both old and new requested during transition window.
|
|
metrics = ('post_media_view,post_total_media_view_unique,'
|
|
'post_impressions,post_impressions_unique,'
|
|
'post_engaged_users,post_clicks')
|
|
result = self._get(f'{post_id}/insights', {'metric': metrics})
|
|
if not result:
|
|
return None
|
|
|
|
insights = {}
|
|
for metric in result.get('data', []):
|
|
name = metric.get('name', '')
|
|
values = metric.get('values', [])
|
|
if values:
|
|
value = values[0].get('value', 0)
|
|
if name in ('post_media_view', 'post_impressions'):
|
|
insights['impressions'] = value
|
|
elif name in ('post_total_media_view_unique', 'post_impressions_unique'):
|
|
insights['reach'] = value
|
|
elif name == 'post_engaged_users':
|
|
insights['engaged_users'] = value
|
|
elif name == 'post_clicks':
|
|
insights['clicks'] = value
|
|
|
|
return insights if insights else None
|
|
|
|
def delete_post(self, post_id: str) -> bool:
|
|
"""Delete a post (published or unpublished).
|
|
|
|
Args:
|
|
post_id: Facebook post ID
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
"""
|
|
result = self._delete(post_id)
|
|
return bool(result and result.get('success'))
|
|
|
|
|
|
# ============================================================
|
|
# SYNC: Facebook Page → CompanySocialMedia
|
|
# ============================================================
|
|
|
|
def sync_facebook_to_social_media(db, company_id: int) -> dict:
|
|
"""Fetch Facebook page stats via Graph API and upsert into CompanySocialMedia.
|
|
|
|
Requires an active OAuth token and SocialMediaConfig for the company.
|
|
Called automatically after page selection and manually via sync endpoint.
|
|
|
|
Args:
|
|
db: SQLAlchemy session
|
|
company_id: Company ID to sync data for
|
|
|
|
Returns:
|
|
dict with 'success' bool and either 'data' or 'error'
|
|
"""
|
|
from oauth_service import OAuthService
|
|
from database import SocialMediaConfig, CompanySocialMedia
|
|
|
|
# 1. Get page config
|
|
config = db.query(SocialMediaConfig).filter(
|
|
SocialMediaConfig.platform == 'facebook',
|
|
SocialMediaConfig.company_id == company_id,
|
|
SocialMediaConfig.is_active == True,
|
|
).first()
|
|
|
|
if not config or not config.page_id:
|
|
return {'success': False, 'error': 'no_page', 'message': 'Brak skonfigurowanej strony Facebook'}
|
|
|
|
page_id = config.page_id
|
|
|
|
# 2. Use Page Access Token (from config) — required for feed/insights.
|
|
# Fall back to User Access Token from OAuth if page token unavailable.
|
|
token = config.access_token
|
|
if not token:
|
|
oauth = OAuthService()
|
|
token = oauth.get_valid_token(db, company_id, 'meta', 'facebook')
|
|
if not token:
|
|
return {'success': False, 'error': 'no_token', 'message': 'Brak aktywnego tokenu Facebook'}
|
|
|
|
# 3. Fetch page info from Graph API
|
|
fb = FacebookGraphService(token)
|
|
page_info = fb.get_page_info(page_id)
|
|
|
|
if not page_info:
|
|
return {'success': False, 'error': 'api_failed', 'message': 'Nie udało się pobrać danych strony z Facebook API'}
|
|
|
|
# 4. Fetch page insights (best-effort, may be empty)
|
|
insights = fb.get_page_insights(page_id, 28)
|
|
|
|
# 4b. Fetch post stats (best-effort)
|
|
post_stats = fb.get_page_posts_stats(page_id)
|
|
|
|
# 5. Calculate metrics
|
|
followers = page_info.get('followers_count') or page_info.get('fan_count') or 0
|
|
engaged_users = insights.get('page_engaged_users', 0)
|
|
engagement_rate = None
|
|
if followers > 0 and engaged_users > 0:
|
|
engagement_rate = round((engaged_users / followers) * 100, 2)
|
|
|
|
# Profile completeness: 20 points each for 5 key fields
|
|
completeness = 0
|
|
if page_info.get('about'):
|
|
completeness += 20
|
|
if page_info.get('website'):
|
|
completeness += 20
|
|
if page_info.get('phone'):
|
|
completeness += 20
|
|
if page_info.get('single_line_address'):
|
|
completeness += 20
|
|
if page_info.get('picture', {}).get('data', {}).get('url'):
|
|
completeness += 20
|
|
|
|
# URL: prefer API vanity link, then existing URL, then numeric fallback
|
|
# Don't replace a vanity URL with a numeric one (e.g. facebook.com/inpipl → facebook.com/123456)
|
|
import re
|
|
api_link = page_info.get('link')
|
|
if api_link and re.search(r'facebook\.com/\d+$', api_link):
|
|
api_link = None # Numeric URL, not useful as replacement
|
|
|
|
# 6. Upsert CompanySocialMedia record
|
|
# Look for existing Facebook record for this company (any URL)
|
|
existing = db.query(CompanySocialMedia).filter(
|
|
CompanySocialMedia.company_id == company_id,
|
|
CompanySocialMedia.platform == 'facebook',
|
|
).first()
|
|
|
|
now = datetime.now()
|
|
|
|
if existing:
|
|
csm = existing
|
|
# Only update URL if API returned a proper link (not numeric fallback)
|
|
if api_link:
|
|
csm.url = api_link
|
|
# Keep existing URL if API didn't return link
|
|
else:
|
|
page_url = api_link or f"https://facebook.com/{page_id}"
|
|
csm = CompanySocialMedia(
|
|
company_id=company_id,
|
|
platform='facebook',
|
|
url=page_url,
|
|
)
|
|
db.add(csm)
|
|
|
|
csm.source = 'facebook_api'
|
|
csm.is_valid = True
|
|
csm.check_status = 'ok'
|
|
csm.page_name = page_info.get('name', '')
|
|
csm.followers_count = followers if followers > 0 else csm.followers_count
|
|
csm.has_bio = bool(page_info.get('about') or page_info.get('description'))
|
|
# Prefer 'about' (short), fallback to 'description' (long)
|
|
bio_text = page_info.get('about') or page_info.get('description') or ''
|
|
csm.profile_description = bio_text[:500]
|
|
# Engagement rate: prefer per-post calculation over insights
|
|
if post_stats.get('posts_365d', 0) > 0 and followers > 0:
|
|
total_engagement = post_stats['total_likes'] + post_stats['total_comments'] + post_stats['total_shares']
|
|
avg_engagement_per_post = total_engagement / post_stats['posts_365d']
|
|
csm.engagement_rate = round((avg_engagement_per_post / followers) * 100, 2)
|
|
elif engagement_rate is not None:
|
|
csm.engagement_rate = engagement_rate
|
|
csm.profile_completeness_score = completeness
|
|
csm.has_profile_photo = bool(page_info.get('picture', {}).get('data', {}).get('url'))
|
|
csm.has_cover_photo = bool(page_info.get('cover', {}).get('source'))
|
|
csm.posts_count_30d = post_stats.get('posts_30d', 0)
|
|
csm.posts_count_365d = post_stats.get('posts_365d', 0)
|
|
if post_stats.get('last_post_date'):
|
|
csm.last_post_date = post_stats['last_post_date']
|
|
# Posting frequency score: 0-10 based on posts per month
|
|
p30 = post_stats.get('posts_30d', 0)
|
|
csm.posting_frequency_score = min(10, p30) if p30 > 0 else 0
|
|
csm.verified_at = now
|
|
csm.last_checked_at = now
|
|
|
|
# Store all API fields in content_types JSONB (no migration needed)
|
|
extra = dict(csm.content_types or {})
|
|
# Page info fields
|
|
for key in ('category', 'website', 'phone', 'founded', 'mission'):
|
|
if page_info.get(key):
|
|
extra[key] = page_info[key]
|
|
if page_info.get('single_line_address'):
|
|
extra['address'] = page_info['single_line_address']
|
|
if page_info.get('description'):
|
|
extra['description'] = page_info['description'][:500]
|
|
if page_info.get('emails'):
|
|
extra['emails'] = page_info['emails']
|
|
if page_info.get('hours'):
|
|
extra['hours'] = page_info['hours']
|
|
if page_info.get('location'):
|
|
loc = page_info['location']
|
|
extra['location'] = {k: loc[k] for k in ('city', 'country', 'latitude', 'longitude', 'street', 'zip') if k in loc}
|
|
if page_info.get('rating_count'):
|
|
extra['rating_count'] = page_info['rating_count']
|
|
if page_info.get('overall_star_rating'):
|
|
extra['overall_star_rating'] = page_info['overall_star_rating']
|
|
if page_info.get('were_here_count'):
|
|
extra['were_here_count'] = page_info['were_here_count']
|
|
# Additional page info fields
|
|
if page_info.get('talking_about_count'):
|
|
extra['talking_about_count'] = page_info['talking_about_count']
|
|
if page_info.get('username'):
|
|
extra['username'] = page_info['username']
|
|
if page_info.get('verification_status'):
|
|
extra['verification_status'] = page_info['verification_status']
|
|
if page_info.get('is_verified') is not None:
|
|
extra['is_verified'] = page_info['is_verified']
|
|
if page_info.get('price_range'):
|
|
extra['price_range'] = page_info['price_range']
|
|
if page_info.get('category_list'):
|
|
extra['category_list'] = [c.get('name') for c in page_info['category_list'] if c.get('name')]
|
|
# Post engagement stats
|
|
extra['total_likes'] = post_stats.get('total_likes', 0)
|
|
extra['total_comments'] = post_stats.get('total_comments', 0)
|
|
extra['total_shares'] = post_stats.get('total_shares', 0)
|
|
if post_stats.get('post_types'):
|
|
extra['post_types'] = post_stats['post_types']
|
|
if post_stats.get('recent_posts'):
|
|
extra['recent_posts'] = post_stats['recent_posts']
|
|
# Insights
|
|
for key in ('page_media_view', 'page_total_media_view_unique',
|
|
'page_views_total', 'page_post_engagements',
|
|
'page_fan_adds', 'page_fan_removes',
|
|
'page_actions_post_reactions_total',
|
|
'page_daily_follows', 'page_daily_unfollows',
|
|
# Deprecated 2026-06-30, kept for backward compat:
|
|
'page_impressions', 'page_engaged_users',
|
|
'page_posts_impressions', 'page_video_views'):
|
|
if insights.get(key):
|
|
extra[f'insights_{key}'] = insights[key]
|
|
csm.content_types = extra
|
|
|
|
# Append to followers_history
|
|
history = list(csm.followers_history or [])
|
|
if followers > 0:
|
|
today_str = now.strftime('%Y-%m-%d')
|
|
# Don't duplicate entries for the same day
|
|
if not history or history[-1].get('date') != today_str:
|
|
history.append({'date': today_str, 'count': followers})
|
|
csm.followers_history = history
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"FB sync OK for company {company_id}: {followers} followers, engagement={engagement_rate}")
|
|
|
|
return {
|
|
'success': True,
|
|
'data': {
|
|
'page_name': csm.page_name,
|
|
'followers_count': followers,
|
|
'engagement_rate': engagement_rate,
|
|
'profile_completeness_score': completeness,
|
|
'source': 'facebook_api',
|
|
}
|
|
}
|
|
|
|
|
|
def sync_instagram_to_social_media(db, company_id: int) -> dict:
|
|
"""Fetch Instagram stats via Graph API and upsert into CompanySocialMedia.
|
|
|
|
Uses the Facebook Page token to access the linked Instagram Business account.
|
|
Requires:
|
|
- Active Facebook OAuth with instagram_basic permission
|
|
- Instagram Business/Creator account linked to a Facebook Page
|
|
|
|
Args:
|
|
db: SQLAlchemy session
|
|
company_id: Company ID to sync data for
|
|
|
|
Returns:
|
|
dict with 'success' bool and either 'data' or 'error'
|
|
"""
|
|
from database import SocialMediaConfig, CompanySocialMedia
|
|
|
|
# 1. Get Facebook page config (Instagram uses the same Page token)
|
|
fb_config = db.query(SocialMediaConfig).filter(
|
|
SocialMediaConfig.platform == 'facebook',
|
|
SocialMediaConfig.company_id == company_id,
|
|
SocialMediaConfig.is_active == True,
|
|
).first()
|
|
|
|
if not fb_config or not fb_config.page_id or not fb_config.access_token:
|
|
return {'success': False, 'error': 'no_fb_config',
|
|
'message': 'Brak skonfigurowanej strony Facebook (wymagana do Instagram API)'}
|
|
|
|
token = fb_config.access_token
|
|
fb = FacebookGraphService(token)
|
|
|
|
# 2. Get linked Instagram Business account ID
|
|
ig_account_id = fb.get_instagram_account(fb_config.page_id)
|
|
if not ig_account_id:
|
|
return {'success': False, 'error': 'no_ig_linked',
|
|
'message': 'Brak powiązanego konta Instagram Business ze stroną Facebook'}
|
|
|
|
# 3. Fetch Instagram profile data
|
|
ig_info = fb.get_ig_account_info(ig_account_id)
|
|
if not ig_info:
|
|
return {'success': False, 'error': 'api_failed',
|
|
'message': 'Nie udało się pobrać danych profilu Instagram'}
|
|
|
|
# 4. Fetch insights (best-effort)
|
|
ig_insights = fb.get_ig_media_insights(ig_account_id, 28)
|
|
|
|
# 5. Fetch recent media for engagement calculation
|
|
recent_media = fb.get_ig_recent_media(ig_account_id, 25)
|
|
|
|
# 6. Calculate metrics
|
|
followers = ig_info.get('followers_count', 0)
|
|
media_count = ig_info.get('media_count', 0)
|
|
username = ig_info.get('username', '')
|
|
|
|
# Engagement rate from recent posts
|
|
engagement_rate = None
|
|
posts_30d = 0
|
|
posts_365d = 0
|
|
last_post_date = None
|
|
total_engagement = 0
|
|
recent_posts_data = []
|
|
|
|
now = datetime.now()
|
|
for media in recent_media:
|
|
ts = media.get('timestamp', '')
|
|
try:
|
|
media_date = datetime.fromisoformat(ts.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
days_ago = (now - media_date).days
|
|
if days_ago <= 365:
|
|
posts_365d += 1
|
|
likes = media.get('like_count', 0)
|
|
comments = media.get('comments_count', 0)
|
|
total_engagement += likes + comments
|
|
|
|
if days_ago <= 30:
|
|
posts_30d += 1
|
|
|
|
if last_post_date is None or media_date > last_post_date:
|
|
last_post_date = media_date
|
|
|
|
if len(recent_posts_data) < 5:
|
|
caption = media.get('caption', '') or ''
|
|
recent_posts_data.append({
|
|
'date': media_date.strftime('%Y-%m-%d'),
|
|
'type': media.get('media_type', 'UNKNOWN'),
|
|
'likes': likes,
|
|
'comments': comments,
|
|
'caption': caption[:100],
|
|
'permalink': media.get('permalink', ''),
|
|
})
|
|
|
|
if posts_365d > 0 and followers > 0:
|
|
avg_engagement = total_engagement / posts_365d
|
|
engagement_rate = round((avg_engagement / followers) * 100, 2)
|
|
|
|
# Profile completeness
|
|
completeness = 0
|
|
if ig_info.get('biography'):
|
|
completeness += 20
|
|
if ig_info.get('website'):
|
|
completeness += 20
|
|
if ig_info.get('profile_picture_url'):
|
|
completeness += 20
|
|
if followers > 10:
|
|
completeness += 20
|
|
if posts_30d > 0:
|
|
completeness += 20
|
|
|
|
# 7. Upsert CompanySocialMedia record
|
|
existing = db.query(CompanySocialMedia).filter(
|
|
CompanySocialMedia.company_id == company_id,
|
|
CompanySocialMedia.platform == 'instagram',
|
|
).first()
|
|
|
|
if existing:
|
|
csm = existing
|
|
else:
|
|
csm = CompanySocialMedia(
|
|
company_id=company_id,
|
|
platform='instagram',
|
|
url=f'https://instagram.com/{username}' if username else f'https://instagram.com/',
|
|
)
|
|
db.add(csm)
|
|
|
|
if username:
|
|
csm.url = f'https://instagram.com/{username}'
|
|
csm.source = 'instagram_api'
|
|
csm.is_valid = True
|
|
csm.check_status = 'ok'
|
|
csm.page_name = ig_info.get('name', '') or username
|
|
csm.followers_count = followers if followers > 0 else csm.followers_count
|
|
csm.has_bio = bool(ig_info.get('biography'))
|
|
csm.profile_description = (ig_info.get('biography') or '')[:500]
|
|
csm.has_profile_photo = bool(ig_info.get('profile_picture_url'))
|
|
csm.engagement_rate = engagement_rate
|
|
csm.profile_completeness_score = completeness
|
|
csm.posts_count_30d = posts_30d
|
|
csm.posts_count_365d = posts_365d
|
|
if last_post_date:
|
|
csm.last_post_date = last_post_date
|
|
csm.posting_frequency_score = min(10, posts_30d) if posts_30d > 0 else 0
|
|
csm.verified_at = now
|
|
csm.last_checked_at = now
|
|
|
|
# Extra data in content_types JSONB
|
|
extra = dict(csm.content_types or {})
|
|
extra['ig_account_id'] = ig_account_id
|
|
extra['media_count'] = media_count
|
|
extra['follows_count'] = ig_info.get('follows_count', 0)
|
|
if ig_info.get('website'):
|
|
extra['website'] = ig_info['website']
|
|
if ig_info.get('profile_picture_url'):
|
|
extra['profile_picture_url'] = ig_info['profile_picture_url']
|
|
if recent_posts_data:
|
|
extra['recent_posts'] = recent_posts_data
|
|
# Media type breakdown from recent posts
|
|
media_types = {}
|
|
for media in recent_media:
|
|
mt = media.get('media_type', 'UNKNOWN')
|
|
media_types[mt] = media_types.get(mt, 0) + 1
|
|
if media_types:
|
|
extra['media_types'] = media_types
|
|
extra['total_likes'] = sum(m.get('like_count', 0) for m in recent_media)
|
|
extra['total_comments'] = sum(m.get('comments_count', 0) for m in recent_media)
|
|
# Insights
|
|
for key in ('ig_impressions_total', 'ig_reach_total', 'ig_follower_count_total'):
|
|
if ig_insights.get(key):
|
|
extra[key] = ig_insights[key]
|
|
csm.content_types = extra
|
|
|
|
# Followers history
|
|
history = list(csm.followers_history or [])
|
|
if followers > 0:
|
|
today_str = now.strftime('%Y-%m-%d')
|
|
if not history or history[-1].get('date') != today_str:
|
|
history.append({'date': today_str, 'count': followers})
|
|
csm.followers_history = history
|
|
|
|
# Save Instagram config (so enrichment knows it's API-managed)
|
|
ig_config = db.query(SocialMediaConfig).filter(
|
|
SocialMediaConfig.platform == 'instagram',
|
|
SocialMediaConfig.company_id == company_id,
|
|
).first()
|
|
if not ig_config:
|
|
ig_config = SocialMediaConfig(platform='instagram', company_id=company_id)
|
|
db.add(ig_config)
|
|
ig_config.page_id = ig_account_id
|
|
ig_config.page_name = username or ig_info.get('name', '')
|
|
ig_config.access_token = token
|
|
ig_config.is_active = True
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"IG sync OK for company {company_id}: @{username}, "
|
|
f"{followers} followers, {media_count} posts, engagement={engagement_rate}")
|
|
|
|
return {
|
|
'success': True,
|
|
'data': {
|
|
'username': username,
|
|
'followers_count': followers,
|
|
'media_count': media_count,
|
|
'engagement_rate': engagement_rate,
|
|
'profile_completeness_score': completeness,
|
|
'source': 'instagram_api',
|
|
}
|
|
}
|