""" Facebook + Instagram Graph API Client ====================================== Uses OAuth 2.0 page tokens to access Facebook Page and Instagram Business data. API docs: https://developers.facebook.com/docs/graph-api/ """ import json import logging from datetime import datetime, timedelta from typing import Dict, List, Optional import requests logger = logging.getLogger(__name__) class FacebookGraphService: """Facebook + Instagram Graph API client.""" BASE_URL = "https://graph.facebook.com/v21.0" def __init__(self, access_token: str): self.access_token = access_token self.session = requests.Session() self.session.timeout = 15 def _get(self, endpoint: str, params: dict = None) -> Optional[Dict]: """Make authenticated GET request.""" params = params or {} params['access_token'] = self.access_token try: resp = self.session.get(f"{self.BASE_URL}/{endpoint}", params=params) resp.raise_for_status() return resp.json() except Exception as e: logger.error(f"Facebook API {endpoint} failed: {e}") return None def get_managed_pages(self) -> List[Dict]: """Get Facebook pages managed by the authenticated user.""" data = self._get('me/accounts', {'fields': 'id,name,category,access_token,fan_count'}) return data.get('data', []) if data else [] def get_page_info(self, page_id: str) -> Optional[Dict]: """Get detailed page information.""" return self._get(page_id, { 'fields': 'id,name,fan_count,category,category_list,link,about,description,website,phone,emails,' 'single_line_address,location,hours,followers_count,picture,cover,' 'rating_count,overall_star_rating,were_here_count,founded,mission,' 'talking_about_count,username,verification_status,is_verified,price_range' }) def get_page_posts_stats(self, page_id: str) -> Dict: """Get post counts, engagement stats and last post date from page feed.""" result = { 'posts_30d': 0, 'posts_365d': 0, 'last_post_date': None, 'total_likes': 0, 'total_comments': 0, 'total_shares': 0, 'post_types': {}, # e.g. {'photo': 5, 'video': 2, 'link': 3} 'recent_posts': [], # last 5 posts with title/type/date/engagement } now = datetime.now() since_365d = int((now - timedelta(days=365)).timestamp()) data = self._get(f'{page_id}/feed', { 'fields': 'created_time,message,shares,likes.summary(true),comments.summary(true),attachments{media_type}', 'since': since_365d, 'limit': 100, }) if not data: return result posts = data.get('data', []) cutoff_30d = now - timedelta(days=30) for post in posts: ct = post.get('created_time', '') if not ct: continue try: post_date = datetime.fromisoformat(ct.replace('+0000', '+00:00').replace('Z', '+00:00')) post_date_naive = post_date.replace(tzinfo=None) except (ValueError, TypeError): result['posts_365d'] += 1 continue result['posts_365d'] += 1 if post_date_naive >= cutoff_30d: result['posts_30d'] += 1 if result['last_post_date'] is None or post_date_naive > result['last_post_date']: result['last_post_date'] = post_date_naive # Engagement likes = post.get('likes', {}).get('summary', {}).get('total_count', 0) comments = post.get('comments', {}).get('summary', {}).get('total_count', 0) shares = post.get('shares', {}).get('count', 0) result['total_likes'] += likes result['total_comments'] += comments result['total_shares'] += shares # Post types (from attachments.media_type since 'type' is deprecated) attachments = post.get('attachments', {}).get('data', []) ptype = attachments[0].get('media_type', 'status') if attachments else 'status' result['post_types'][ptype] = result['post_types'].get(ptype, 0) + 1 # Recent posts (top 5 by date) if len(result['recent_posts']) < 5: msg = post.get('message', '') title = (msg[:80] + '...') if len(msg) > 80 else msg result['recent_posts'].append({ 'date': post_date_naive.strftime('%Y-%m-%d'), 'title': title, 'type': ptype, 'likes': likes, 'comments': comments, 'shares': shares, }) return result def get_page_insights(self, page_id: str, days: int = 28) -> Dict: """Get page insights (views, engagements, reactions). Fetches metrics individually to handle deprecated/unavailable ones gracefully. Requires page access token, not user access token. """ since = int((datetime.now() - timedelta(days=days)).timestamp()) until = int(datetime.now().timestamp()) # Meta deprecated impressions/views metrics on 2026-06-30. # Migrated to *_media_view family. Old keys kept temporarily for # transition (Meta serves both until cutoff). all_metrics = [ 'page_views_total', 'page_post_engagements', 'page_actions_post_reactions_total', 'page_media_view', 'page_total_media_view_unique', 'page_daily_follows', 'page_daily_unfollows', 'page_fans', 'page_fan_adds', 'page_fan_removes', # Deprecated 2026-06-30 — keep until migration verified end-to-end: 'page_posts_impressions', 'page_video_views', 'page_impressions', 'page_engaged_users', ] result = {} for metric_name in all_metrics: data = self._get(f'{page_id}/insights', { 'metric': metric_name, 'period': 'day', 'since': since, 'until': until, }) if not data: continue for metric in data.get('data', []): name = metric.get('name', '') values = metric.get('values', []) if values: total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float))) result[name] = total return result def get_instagram_account(self, page_id: str) -> Optional[str]: """Get linked Instagram Business account ID from a Facebook Page.""" data = self._get(page_id, {'fields': 'instagram_business_account'}) if data and 'instagram_business_account' in data: return data['instagram_business_account'].get('id') return None def get_ig_account_info(self, ig_account_id: str) -> Dict: """Get Instagram Business account profile info. Returns username, name, bio, followers, following, media count, profile pic, website. """ data = self._get(ig_account_id, { 'fields': ( 'username,name,biography,followers_count,follows_count,' 'media_count,profile_picture_url,website,ig_id' ) }) return data or {} def get_ig_media_insights(self, ig_account_id: str, days: int = 28) -> Dict: """Get Instagram account insights. Returns follower_count, media_count, and recent media engagement. """ result = {} # Basic account info account_data = self._get(ig_account_id, { 'fields': 'followers_count,media_count,username,biography' }) if account_data: result['followers_count'] = account_data.get('followers_count', 0) result['media_count'] = account_data.get('media_count', 0) result['username'] = account_data.get('username', '') # Account insights (reach, impressions) — fetch individually for robustness since = datetime.now() - timedelta(days=days) until = datetime.now() for metric_name in ('impressions', 'reach', 'follower_count'): try: insights_data = self._get(f'{ig_account_id}/insights', { 'metric': metric_name, 'period': 'day', 'since': int(since.timestamp()), 'until': int(until.timestamp()), }) if insights_data: for metric in insights_data.get('data', []): name = metric.get('name', '') values = metric.get('values', []) if values: total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float))) result[f'ig_{name}_total'] = total except Exception as e: logger.debug(f"IG insight {metric_name} failed: {e}") return result def get_ig_recent_media(self, ig_account_id: str, limit: int = 25) -> list: """Get recent media from Instagram Business account. Returns list of media with engagement data. """ data = self._get(f'{ig_account_id}/media', { 'fields': 'id,caption,media_type,media_url,permalink,timestamp,like_count,comments_count', 'limit': limit, }) if not data: return [] return data.get('data', []) # ============================================================ # PUBLISHING METHODS (Social Publisher) # ============================================================ def _post(self, endpoint: str, data: dict = None, files: dict = None) -> Optional[Dict]: """Make authenticated POST request.""" data = data or {} params = {'access_token': self.access_token} try: if files: resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data, files=files) else: resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data) resp.raise_for_status() return resp.json() except requests.exceptions.HTTPError as e: error_data = None try: error_data = e.response.json() except Exception: pass logger.error(f"Facebook API POST {endpoint} failed: {e}, response: {error_data}") return None except Exception as e: logger.error(f"Facebook API POST {endpoint} failed: {e}") return None def _delete(self, endpoint: str) -> Optional[Dict]: """Make authenticated DELETE request.""" params = {'access_token': self.access_token} try: resp = self.session.delete(f"{self.BASE_URL}/{endpoint}", params=params) resp.raise_for_status() return resp.json() except Exception as e: logger.error(f"Facebook API DELETE {endpoint} failed: {e}") return None def upload_photo_unpublished(self, page_id: str, image_path: str) -> Optional[str]: """Upload photo as unpublished to get photo_id for feed attachment. Two-step process: 1. Upload photo with published=false -> get photo_id 2. Use photo_id in create_post() attached_media Args: page_id: Facebook Page ID image_path: Local path to image file Returns: Photo ID (media_fbid) or None on failure """ try: with open(image_path, 'rb') as f: files = {'source': (image_path.split('/')[-1], f, 'image/png')} result = self._post(f'{page_id}/photos', data={'published': 'false'}, files=files) if result and 'id' in result: logger.info(f"Uploaded unpublished photo: {result['id']}") return result['id'] logger.error(f"Photo upload failed: {result}") return None except FileNotFoundError: logger.error(f"Image file not found: {image_path}") return None def create_post(self, page_id: str, message: str, image_path: str = None, published: bool = True, scheduled_time: int = None) -> Optional[Dict]: """Create a post on Facebook Page feed. For posts with images, uses two-step process: 1. Upload photo as unpublished -> photo_id 2. Create feed post with attached_media[0] Args: page_id: Facebook Page ID message: Post text content image_path: Optional local path to image published: If False, creates draft (visible only to page admins) scheduled_time: Unix timestamp for scheduled publishing (min 10 min ahead) Returns: API response dict with 'id' key, or None on failure """ data = {'message': message} # Handle image upload if image_path: photo_id = self.upload_photo_unpublished(page_id, image_path) if photo_id: data['attached_media[0]'] = json.dumps({'media_fbid': photo_id}) else: logger.warning("Image upload failed, posting without image") # Handle scheduling if scheduled_time: data['published'] = 'false' data['scheduled_publish_time'] = str(scheduled_time) elif not published: data['published'] = 'false' result = self._post(f'{page_id}/feed', data=data) if result and 'id' in result: logger.info(f"Created post: {result['id']} (published={published})") return result def publish_draft(self, post_id: str) -> Optional[Dict]: """Publish an unpublished (draft) post. Args: post_id: Facebook post ID (format: PAGE_ID_POST_ID) Returns: API response or None on failure """ return self._post(post_id, data={'is_published': 'true'}) def unpublish_post(self, post_id: str) -> Optional[Dict]: """Unpublish a public post (make it draft/hidden). Args: post_id: Facebook post ID (format: PAGE_ID_POST_ID) Returns: API response or None on failure """ return self._post(post_id, data={'is_published': 'false'}) def get_post_engagement(self, post_id: str) -> Optional[Dict]: """Get engagement metrics for a published post. Args: post_id: Facebook post ID Returns: Dict with likes, comments, shares, reactions_total or None """ fields = 'id,created_time,message,likes.summary(true),comments.summary(true),shares,reactions.summary(true).limit(0)' result = self._get(post_id, {'fields': fields}) if not result: return None return { 'post_id': result.get('id'), 'created_time': result.get('created_time'), 'likes': result.get('likes', {}).get('summary', {}).get('total_count', 0), 'comments': result.get('comments', {}).get('summary', {}).get('total_count', 0), 'shares': result.get('shares', {}).get('count', 0) if result.get('shares') else 0, 'reactions_total': result.get('reactions', {}).get('summary', {}).get('total_count', 0), } def get_page_posts(self, page_id: str, limit: int = 10, after: str = None) -> Optional[Dict]: """Get recent posts from a Facebook Page with engagement metrics. Args: page_id: Facebook Page ID limit: Number of posts to fetch (max 100) after: Pagination cursor for next page Returns: Dict with 'posts' list and 'next_cursor' (or None), or None on failure """ fields = ( 'id,message,created_time,full_picture,permalink_url,status_type,' 'likes.summary(true).limit(0),comments.summary(true).limit(0),' 'shares,reactions.summary(true).limit(0)' ) params = {'fields': fields, 'limit': limit} if after: params['after'] = after result = self._get(f'{page_id}/posts', params) if not result: return None posts = [] for item in result.get('data', []): posts.append({ 'id': item.get('id'), 'message': item.get('message', ''), 'created_time': item.get('created_time'), 'full_picture': item.get('full_picture'), 'permalink_url': item.get('permalink_url'), 'status_type': item.get('status_type', ''), 'likes': item.get('likes', {}).get('summary', {}).get('total_count', 0), 'comments': item.get('comments', {}).get('summary', {}).get('total_count', 0), 'shares': item.get('shares', {}).get('count', 0) if item.get('shares') else 0, 'reactions_total': item.get('reactions', {}).get('summary', {}).get('total_count', 0), }) # Extract next page cursor next_cursor = None paging = result.get('paging', {}) if paging.get('next'): next_cursor = paging.get('cursors', {}).get('after') return {'posts': posts, 'next_cursor': next_cursor} def get_post_insights_metrics(self, post_id: str) -> Optional[Dict]: """Get detailed insights metrics for a specific post. Note: Only available for posts on Pages with 100+ fans. Args: post_id: Facebook post ID Returns: Dict with impressions, reach, engaged_users, clicks or None """ # post_impressions/post_impressions_unique deprecated 2026-06-30. # Replaced by post_media_view / post_total_media_view_unique. # Both old and new requested during transition window. metrics = ('post_media_view,post_total_media_view_unique,' 'post_impressions,post_impressions_unique,' 'post_engaged_users,post_clicks') result = self._get(f'{post_id}/insights', {'metric': metrics}) if not result: return None insights = {} for metric in result.get('data', []): name = metric.get('name', '') values = metric.get('values', []) if values: value = values[0].get('value', 0) if name in ('post_media_view', 'post_impressions'): insights['impressions'] = value elif name in ('post_total_media_view_unique', 'post_impressions_unique'): insights['reach'] = value elif name == 'post_engaged_users': insights['engaged_users'] = value elif name == 'post_clicks': insights['clicks'] = value return insights if insights else None def delete_post(self, post_id: str) -> bool: """Delete a post (published or unpublished). Args: post_id: Facebook post ID Returns: True if deleted successfully """ result = self._delete(post_id) return bool(result and result.get('success')) # ============================================================ # SYNC: Facebook Page → CompanySocialMedia # ============================================================ def sync_facebook_to_social_media(db, company_id: int) -> dict: """Fetch Facebook page stats via Graph API and upsert into CompanySocialMedia. Requires an active OAuth token and SocialMediaConfig for the company. Called automatically after page selection and manually via sync endpoint. Args: db: SQLAlchemy session company_id: Company ID to sync data for Returns: dict with 'success' bool and either 'data' or 'error' """ from oauth_service import OAuthService from database import SocialMediaConfig, CompanySocialMedia # 1. Get page config config = db.query(SocialMediaConfig).filter( SocialMediaConfig.platform == 'facebook', SocialMediaConfig.company_id == company_id, SocialMediaConfig.is_active == True, ).first() if not config or not config.page_id: return {'success': False, 'error': 'no_page', 'message': 'Brak skonfigurowanej strony Facebook'} page_id = config.page_id # 2. Use Page Access Token (from config) — required for feed/insights. # Fall back to User Access Token from OAuth if page token unavailable. token = config.access_token if not token: oauth = OAuthService() token = oauth.get_valid_token(db, company_id, 'meta', 'facebook') if not token: return {'success': False, 'error': 'no_token', 'message': 'Brak aktywnego tokenu Facebook'} # 3. Fetch page info from Graph API fb = FacebookGraphService(token) page_info = fb.get_page_info(page_id) if not page_info: return {'success': False, 'error': 'api_failed', 'message': 'Nie udało się pobrać danych strony z Facebook API'} # 4. Fetch page insights (best-effort, may be empty) insights = fb.get_page_insights(page_id, 28) # 4b. Fetch post stats (best-effort) post_stats = fb.get_page_posts_stats(page_id) # 5. Calculate metrics followers = page_info.get('followers_count') or page_info.get('fan_count') or 0 engaged_users = insights.get('page_engaged_users', 0) engagement_rate = None if followers > 0 and engaged_users > 0: engagement_rate = round((engaged_users / followers) * 100, 2) # Profile completeness: 20 points each for 5 key fields completeness = 0 if page_info.get('about'): completeness += 20 if page_info.get('website'): completeness += 20 if page_info.get('phone'): completeness += 20 if page_info.get('single_line_address'): completeness += 20 if page_info.get('picture', {}).get('data', {}).get('url'): completeness += 20 # URL: prefer API vanity link, then existing URL, then numeric fallback # Don't replace a vanity URL with a numeric one (e.g. facebook.com/inpipl → facebook.com/123456) import re api_link = page_info.get('link') if api_link and re.search(r'facebook\.com/\d+$', api_link): api_link = None # Numeric URL, not useful as replacement # 6. Upsert CompanySocialMedia record # Look for existing Facebook record for this company (any URL) existing = db.query(CompanySocialMedia).filter( CompanySocialMedia.company_id == company_id, CompanySocialMedia.platform == 'facebook', ).first() now = datetime.now() if existing: csm = existing # Only update URL if API returned a proper link (not numeric fallback) if api_link: csm.url = api_link # Keep existing URL if API didn't return link else: page_url = api_link or f"https://facebook.com/{page_id}" csm = CompanySocialMedia( company_id=company_id, platform='facebook', url=page_url, ) db.add(csm) csm.source = 'facebook_api' csm.is_valid = True csm.check_status = 'ok' csm.page_name = page_info.get('name', '') csm.followers_count = followers if followers > 0 else csm.followers_count csm.has_bio = bool(page_info.get('about') or page_info.get('description')) # Prefer 'about' (short), fallback to 'description' (long) bio_text = page_info.get('about') or page_info.get('description') or '' csm.profile_description = bio_text[:500] # Engagement rate: prefer per-post calculation over insights if post_stats.get('posts_365d', 0) > 0 and followers > 0: total_engagement = post_stats['total_likes'] + post_stats['total_comments'] + post_stats['total_shares'] avg_engagement_per_post = total_engagement / post_stats['posts_365d'] csm.engagement_rate = round((avg_engagement_per_post / followers) * 100, 2) elif engagement_rate is not None: csm.engagement_rate = engagement_rate csm.profile_completeness_score = completeness csm.has_profile_photo = bool(page_info.get('picture', {}).get('data', {}).get('url')) csm.has_cover_photo = bool(page_info.get('cover', {}).get('source')) csm.posts_count_30d = post_stats.get('posts_30d', 0) csm.posts_count_365d = post_stats.get('posts_365d', 0) if post_stats.get('last_post_date'): csm.last_post_date = post_stats['last_post_date'] # Posting frequency score: 0-10 based on posts per month p30 = post_stats.get('posts_30d', 0) csm.posting_frequency_score = min(10, p30) if p30 > 0 else 0 csm.verified_at = now csm.last_checked_at = now # Store all API fields in content_types JSONB (no migration needed) extra = dict(csm.content_types or {}) # Page info fields for key in ('category', 'website', 'phone', 'founded', 'mission'): if page_info.get(key): extra[key] = page_info[key] if page_info.get('single_line_address'): extra['address'] = page_info['single_line_address'] if page_info.get('description'): extra['description'] = page_info['description'][:500] if page_info.get('emails'): extra['emails'] = page_info['emails'] if page_info.get('hours'): extra['hours'] = page_info['hours'] if page_info.get('location'): loc = page_info['location'] extra['location'] = {k: loc[k] for k in ('city', 'country', 'latitude', 'longitude', 'street', 'zip') if k in loc} if page_info.get('rating_count'): extra['rating_count'] = page_info['rating_count'] if page_info.get('overall_star_rating'): extra['overall_star_rating'] = page_info['overall_star_rating'] if page_info.get('were_here_count'): extra['were_here_count'] = page_info['were_here_count'] # Additional page info fields if page_info.get('talking_about_count'): extra['talking_about_count'] = page_info['talking_about_count'] if page_info.get('username'): extra['username'] = page_info['username'] if page_info.get('verification_status'): extra['verification_status'] = page_info['verification_status'] if page_info.get('is_verified') is not None: extra['is_verified'] = page_info['is_verified'] if page_info.get('price_range'): extra['price_range'] = page_info['price_range'] if page_info.get('category_list'): extra['category_list'] = [c.get('name') for c in page_info['category_list'] if c.get('name')] # Post engagement stats extra['total_likes'] = post_stats.get('total_likes', 0) extra['total_comments'] = post_stats.get('total_comments', 0) extra['total_shares'] = post_stats.get('total_shares', 0) if post_stats.get('post_types'): extra['post_types'] = post_stats['post_types'] if post_stats.get('recent_posts'): extra['recent_posts'] = post_stats['recent_posts'] # Insights for key in ('page_media_view', 'page_total_media_view_unique', 'page_views_total', 'page_post_engagements', 'page_fan_adds', 'page_fan_removes', 'page_actions_post_reactions_total', 'page_daily_follows', 'page_daily_unfollows', # Deprecated 2026-06-30, kept for backward compat: 'page_impressions', 'page_engaged_users', 'page_posts_impressions', 'page_video_views'): if insights.get(key): extra[f'insights_{key}'] = insights[key] csm.content_types = extra # Append to followers_history history = list(csm.followers_history or []) if followers > 0: today_str = now.strftime('%Y-%m-%d') # Don't duplicate entries for the same day if not history or history[-1].get('date') != today_str: history.append({'date': today_str, 'count': followers}) csm.followers_history = history db.commit() logger.info(f"FB sync OK for company {company_id}: {followers} followers, engagement={engagement_rate}") return { 'success': True, 'data': { 'page_name': csm.page_name, 'followers_count': followers, 'engagement_rate': engagement_rate, 'profile_completeness_score': completeness, 'source': 'facebook_api', } } def sync_instagram_to_social_media(db, company_id: int) -> dict: """Fetch Instagram stats via Graph API and upsert into CompanySocialMedia. Uses the Facebook Page token to access the linked Instagram Business account. Requires: - Active Facebook OAuth with instagram_basic permission - Instagram Business/Creator account linked to a Facebook Page Args: db: SQLAlchemy session company_id: Company ID to sync data for Returns: dict with 'success' bool and either 'data' or 'error' """ from database import SocialMediaConfig, CompanySocialMedia # 1. Get Facebook page config (Instagram uses the same Page token) fb_config = db.query(SocialMediaConfig).filter( SocialMediaConfig.platform == 'facebook', SocialMediaConfig.company_id == company_id, SocialMediaConfig.is_active == True, ).first() if not fb_config or not fb_config.page_id or not fb_config.access_token: return {'success': False, 'error': 'no_fb_config', 'message': 'Brak skonfigurowanej strony Facebook (wymagana do Instagram API)'} token = fb_config.access_token fb = FacebookGraphService(token) # 2. Get linked Instagram Business account ID ig_account_id = fb.get_instagram_account(fb_config.page_id) if not ig_account_id: return {'success': False, 'error': 'no_ig_linked', 'message': 'Brak powiązanego konta Instagram Business ze stroną Facebook'} # 3. Fetch Instagram profile data ig_info = fb.get_ig_account_info(ig_account_id) if not ig_info: return {'success': False, 'error': 'api_failed', 'message': 'Nie udało się pobrać danych profilu Instagram'} # 4. Fetch insights (best-effort) ig_insights = fb.get_ig_media_insights(ig_account_id, 28) # 5. Fetch recent media for engagement calculation recent_media = fb.get_ig_recent_media(ig_account_id, 25) # 6. Calculate metrics followers = ig_info.get('followers_count', 0) media_count = ig_info.get('media_count', 0) username = ig_info.get('username', '') # Engagement rate from recent posts engagement_rate = None posts_30d = 0 posts_365d = 0 last_post_date = None total_engagement = 0 recent_posts_data = [] now = datetime.now() for media in recent_media: ts = media.get('timestamp', '') try: media_date = datetime.fromisoformat(ts.replace('Z', '+00:00')).replace(tzinfo=None) except (ValueError, AttributeError): continue days_ago = (now - media_date).days if days_ago <= 365: posts_365d += 1 likes = media.get('like_count', 0) comments = media.get('comments_count', 0) total_engagement += likes + comments if days_ago <= 30: posts_30d += 1 if last_post_date is None or media_date > last_post_date: last_post_date = media_date if len(recent_posts_data) < 5: caption = media.get('caption', '') or '' recent_posts_data.append({ 'date': media_date.strftime('%Y-%m-%d'), 'type': media.get('media_type', 'UNKNOWN'), 'likes': likes, 'comments': comments, 'caption': caption[:100], 'permalink': media.get('permalink', ''), }) if posts_365d > 0 and followers > 0: avg_engagement = total_engagement / posts_365d engagement_rate = round((avg_engagement / followers) * 100, 2) # Profile completeness completeness = 0 if ig_info.get('biography'): completeness += 20 if ig_info.get('website'): completeness += 20 if ig_info.get('profile_picture_url'): completeness += 20 if followers > 10: completeness += 20 if posts_30d > 0: completeness += 20 # 7. Upsert CompanySocialMedia record existing = db.query(CompanySocialMedia).filter( CompanySocialMedia.company_id == company_id, CompanySocialMedia.platform == 'instagram', ).first() if existing: csm = existing else: csm = CompanySocialMedia( company_id=company_id, platform='instagram', url=f'https://instagram.com/{username}' if username else f'https://instagram.com/', ) db.add(csm) if username: csm.url = f'https://instagram.com/{username}' csm.source = 'instagram_api' csm.is_valid = True csm.check_status = 'ok' csm.page_name = ig_info.get('name', '') or username csm.followers_count = followers if followers > 0 else csm.followers_count csm.has_bio = bool(ig_info.get('biography')) csm.profile_description = (ig_info.get('biography') or '')[:500] csm.has_profile_photo = bool(ig_info.get('profile_picture_url')) csm.engagement_rate = engagement_rate csm.profile_completeness_score = completeness csm.posts_count_30d = posts_30d csm.posts_count_365d = posts_365d if last_post_date: csm.last_post_date = last_post_date csm.posting_frequency_score = min(10, posts_30d) if posts_30d > 0 else 0 csm.verified_at = now csm.last_checked_at = now # Extra data in content_types JSONB extra = dict(csm.content_types or {}) extra['ig_account_id'] = ig_account_id extra['media_count'] = media_count extra['follows_count'] = ig_info.get('follows_count', 0) if ig_info.get('website'): extra['website'] = ig_info['website'] if ig_info.get('profile_picture_url'): extra['profile_picture_url'] = ig_info['profile_picture_url'] if recent_posts_data: extra['recent_posts'] = recent_posts_data # Media type breakdown from recent posts media_types = {} for media in recent_media: mt = media.get('media_type', 'UNKNOWN') media_types[mt] = media_types.get(mt, 0) + 1 if media_types: extra['media_types'] = media_types extra['total_likes'] = sum(m.get('like_count', 0) for m in recent_media) extra['total_comments'] = sum(m.get('comments_count', 0) for m in recent_media) # Insights for key in ('ig_impressions_total', 'ig_reach_total', 'ig_follower_count_total'): if ig_insights.get(key): extra[key] = ig_insights[key] csm.content_types = extra # Followers history history = list(csm.followers_history or []) if followers > 0: today_str = now.strftime('%Y-%m-%d') if not history or history[-1].get('date') != today_str: history.append({'date': today_str, 'count': followers}) csm.followers_history = history # Save Instagram config (so enrichment knows it's API-managed) ig_config = db.query(SocialMediaConfig).filter( SocialMediaConfig.platform == 'instagram', SocialMediaConfig.company_id == company_id, ).first() if not ig_config: ig_config = SocialMediaConfig(platform='instagram', company_id=company_id) db.add(ig_config) ig_config.page_id = ig_account_id ig_config.page_name = username or ig_info.get('name', '') ig_config.access_token = token ig_config.is_active = True db.commit() logger.info(f"IG sync OK for company {company_id}: @{username}, " f"{followers} followers, {media_count} posts, engagement={engagement_rate}") return { 'success': True, 'data': { 'username': username, 'followers_count': followers, 'media_count': media_count, 'engagement_rate': engagement_rate, 'profile_completeness_score': completeness, 'source': 'instagram_api', } }