nordabiz/scripts/social_media_audit.py
Maciej Pienczyn 39cd257f4e Fix YouTube detection overwriting valid matches
- Add 'channel', 'c', 'user', '@' etc. to YouTube exclusion list
- Add 'bold_themes', 'boldthemes' to Twitter/Facebook exclusions (theme creators)
- Fix pattern matching loop to stop after first valid match per platform
- Prevents fallback pattern from overwriting correct channel ID with 'channel'

Fixes issue where youtube.com/channel/ID was being overwritten with
youtube.com/channel/channel by the second fallback pattern.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 05:36:06 +01:00

1223 lines
50 KiB
Python

#!/usr/bin/env python3
"""
Social Media & Website Audit Script for Norda Biznes
=====================================================
Performs comprehensive audit of company websites and social media presence.
Designed to run with multiple parallel workers.
Features:
- Website analysis (SSL, hosting, author, responsiveness)
- Social media discovery (FB, IG, TikTok, YouTube, LinkedIn)
- Google Reviews scraping via Brave Search
- Parallel execution support
Usage:
python social_media_audit.py --company-id 26
python social_media_audit.py --batch 1-10
python social_media_audit.py --all
Author: Claude Code
Date: 2025-12-29
"""
import os
import sys
import json
import re
import ssl
import socket
import argparse
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Tuple, Any
from urllib.parse import urlparse
import time
from pathlib import Path
# Load .env file from project root
try:
from dotenv import load_dotenv
# Find .env file relative to this script
script_dir = Path(__file__).resolve().parent
project_root = script_dir.parent
env_path = project_root / '.env'
if env_path.exists():
load_dotenv(env_path)
logging.info(f"Loaded .env from {env_path}")
except ImportError:
pass # python-dotenv not installed, rely on system environment
import requests
from bs4 import BeautifulSoup
import whois
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Database configuration
DATABASE_URL = os.getenv(
'DATABASE_URL',
'postgresql://nordabiz_app:NordaBiz2025Secure@127.0.0.1:5432/nordabiz'
)
# Request configuration
REQUEST_TIMEOUT = 15
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
# Known Polish hosting providers (IP ranges and identifiers)
HOSTING_PROVIDERS = {
'nazwa.pl': ['nazwa.pl', '185.252.', '91.227.'],
'home.pl': ['home.pl', '212.85.', '195.26.'],
'OVH': ['ovh.', '51.38.', '51.68.', '51.75.', '51.77.', '51.83.', '51.89.', '51.91.', '54.36.', '54.37.', '54.38.', '135.125.', '141.94.', '141.95.', '142.4.', '144.217.', '145.239.', '147.135.', '149.202.', '151.80.', '158.69.', '164.132.', '167.114.', '176.31.', '178.32.', '185.15.', '188.165.', '192.95.', '193.70.', '194.182.', '195.154.', '198.27.', '198.50.', '198.100.', '213.186.', '213.251.', '217.182.'],
'cyber_Folks': ['cyberfolks', 'cf.', '77.55.'],
'Zenbox': ['zenbox', '195.181.'],
'Linuxpl': ['linuxpl', '91.200.'],
'Hekko': ['hekko', 'hekko.pl'],
'Smarthost': ['smarthost'],
'AZ.pl': ['az.pl', 'aznetwork'],
'Aftermarket': ['aftermarket', 'aftermarket.pl'],
'Cloudflare': ['cloudflare', '104.16.', '104.17.', '104.18.', '104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.', '172.67.'],
'Google Cloud': ['google', '34.', '35.'],
'AWS': ['amazon', 'aws', '52.', '54.'],
'Vercel': ['vercel', '76.76.21.'],
'Netlify': ['netlify'],
}
# Social media patterns
SOCIAL_MEDIA_PATTERNS = {
'facebook': [
r'(?:https?://)?(?:www\.)?facebook\.com/([^/?\s"\'<>]+)',
r'(?:https?://)?(?:www\.)?fb\.com/([^/?\s"\'<>]+)',
],
'instagram': [
r'(?:https?://)?(?:www\.)?instagram\.com/([^/?\s"\'<>]+)',
],
'youtube': [
r'(?:https?://)?(?:www\.)?youtube\.com/(?:channel|c|user|@)/([^/?\s"\'<>]+)',
r'(?:https?://)?(?:www\.)?youtube\.com/([^/?\s"\'<>]+)',
],
'linkedin': [
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/company/([^/?\s"\'<>]+)',
r'(?:https?://)?(?:www\.|pl\.)?linkedin\.com/in/([^/?\s"\'<>]+)',
],
'tiktok': [
r'(?:https?://)?(?:www\.)?tiktok\.com/@([^/?\s"\'<>]+)',
],
'twitter': [
r'(?:https?://)?(?:www\.)?(?:twitter|x)\.com/([^/?\s"\'<>]+)',
],
}
# False positives to exclude
SOCIAL_MEDIA_EXCLUDE = {
'facebook': ['sharer', 'share', 'intent', 'plugins', 'dialog', 'sharer.php', 'login', 'pages', 'boldthemes'],
'instagram': ['explore', 'accounts', 'p', 'reel'],
'youtube': ['embed', 'watch', 'playlist', 'results', 'feed', 'channel', 'c', 'user', '@', 'about', 'featured', 'videos', 'shorts', 'streams', 'playlists', 'community', 'channels', 'store'],
'linkedin': ['shareArticle', 'share', 'login'],
'tiktok': ['embed', 'video'],
'twitter': ['intent', 'share', 'widgets.js', 'widgets', 'tweet', 'platform.twitter.com', 'bold_themes', 'boldthemes'],
}
class WebsiteAuditor:
"""Audits website technical details and metadata."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def audit_website(self, url: str) -> Dict[str, Any]:
"""
Perform comprehensive website audit.
Returns dict with:
- http_status, load_time_ms
- has_ssl, ssl_valid, ssl_expiry
- hosting_provider, hosting_ip, server_software
- site_author, site_generator
- is_mobile_friendly, has_viewport_meta
- last_modified_at
- social_media_links (dict of platform -> url)
"""
result = {
'url': url,
'http_status': None,
'load_time_ms': None,
'has_ssl': False,
'ssl_valid': False,
'ssl_expiry': None,
'ssl_issuer': None,
'hosting_provider': None,
'hosting_ip': None,
'server_software': None,
'site_author': None,
'site_generator': None,
'is_mobile_friendly': False,
'has_viewport_meta': False,
'last_modified_at': None,
'social_media_links': {},
'errors': [],
}
if not url:
result['errors'].append('No URL provided')
return result
# Normalize URL
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
parsed = urlparse(url)
domain = parsed.netloc
# 1. Check SSL certificate
try:
result.update(self._check_ssl(domain))
except Exception as e:
result['errors'].append(f'SSL check failed: {str(e)}')
# 2. Resolve IP and detect hosting
try:
result.update(self._detect_hosting(domain))
except Exception as e:
result['errors'].append(f'Hosting detection failed: {str(e)}')
# 3. Fetch page and analyze
try:
start_time = time.time()
response = self.session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
result['load_time_ms'] = int((time.time() - start_time) * 1000)
result['http_status'] = response.status_code
result['has_ssl'] = response.url.startswith('https://')
# Server header
result['server_software'] = response.headers.get('Server', '')[:100]
# Last-Modified header
last_mod = response.headers.get('Last-Modified')
if last_mod:
try:
result['last_modified_at'] = datetime.strptime(
last_mod, '%a, %d %b %Y %H:%M:%S %Z'
)
except:
pass
# Parse HTML
if response.status_code == 200:
result.update(self._parse_html(response.text))
except requests.exceptions.SSLError as e:
result['errors'].append(f'SSL Error: {str(e)}')
result['ssl_valid'] = False
# Try HTTP fallback
try:
http_url = url.replace('https://', 'http://')
response = self.session.get(http_url, timeout=REQUEST_TIMEOUT)
result['http_status'] = response.status_code
result['has_ssl'] = False
if response.status_code == 200:
result.update(self._parse_html(response.text))
except Exception as e2:
result['errors'].append(f'HTTP fallback failed: {str(e2)}')
except requests.exceptions.RequestException as e:
result['errors'].append(f'Request failed: {str(e)}')
return result
def _check_ssl(self, domain: str) -> Dict[str, Any]:
"""Check SSL certificate validity, expiry and issuer."""
result = {'ssl_valid': False, 'ssl_expiry': None, 'ssl_issuer': None}
try:
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
result['ssl_valid'] = True
# Parse expiry date
not_after = cert.get('notAfter')
if not_after:
result['ssl_expiry'] = datetime.strptime(
not_after, '%b %d %H:%M:%S %Y %Z'
).date()
# Extract issuer (Certificate Authority)
issuer = cert.get('issuer')
if issuer:
# issuer is tuple of tuples like ((('organizationName', 'Let\'s Encrypt'),),)
issuer_dict = {}
for item in issuer:
for key, value in item:
issuer_dict[key] = value
# Prefer Organization name, fallback to Common Name
issuer_name = issuer_dict.get('organizationName') or issuer_dict.get('commonName')
if issuer_name:
result['ssl_issuer'] = issuer_name[:100] # Limit length
except Exception as e:
result['ssl_valid'] = False
return result
def _detect_hosting(self, domain: str) -> Dict[str, Any]:
"""Detect hosting provider from IP and reverse DNS."""
result = {'hosting_provider': None, 'hosting_ip': None}
try:
ip = socket.gethostbyname(domain)
result['hosting_ip'] = ip
# Check against known hosting IP ranges
for provider, patterns in HOSTING_PROVIDERS.items():
for pattern in patterns:
if ip.startswith(pattern) or pattern in domain.lower():
result['hosting_provider'] = provider
return result
# Try reverse DNS
try:
reverse = socket.gethostbyaddr(ip)[0]
for provider, patterns in HOSTING_PROVIDERS.items():
for pattern in patterns:
if pattern in reverse.lower():
result['hosting_provider'] = provider
return result
except:
pass
# Try WHOIS for registrar
try:
w = whois.whois(domain)
if w.registrar:
result['domain_registrar'] = str(w.registrar)[:100]
except:
pass
except Exception as e:
result['errors'] = [f'Hosting detection: {str(e)}']
return result
def _parse_html(self, html: str) -> Dict[str, Any]:
"""Parse HTML for metadata and social media links."""
result = {
'site_author': None,
'site_generator': None,
'is_mobile_friendly': False,
'has_viewport_meta': False,
'social_media_links': {},
}
try:
soup = BeautifulSoup(html, 'html.parser')
# Check viewport meta (mobile-friendly indicator)
viewport = soup.find('meta', attrs={'name': 'viewport'})
if viewport:
result['has_viewport_meta'] = True
content = viewport.get('content', '')
if 'width=device-width' in content:
result['is_mobile_friendly'] = True
# Author meta
author = soup.find('meta', attrs={'name': 'author'})
if author:
result['site_author'] = author.get('content', '')[:255]
# Generator meta (CMS)
generator = soup.find('meta', attrs={'name': 'generator'})
if generator:
result['site_generator'] = generator.get('content', '')[:100]
# Look for author in multiple places
if not result['site_author']:
author_found = None
# 1. Check HTML comments for author info
comments = soup.find_all(string=lambda text: isinstance(text, str) and '<!--' in str(text.parent) if text.parent else False)
html_comments = re.findall(r'<!--(.+?)-->', html, re.DOTALL)
for comment in html_comments:
comment_patterns = [
r'(?:created by|designed by|developed by|made by|author)[:\s]+([^\n<>]+)',
r'(?:agencja|agency|studio)[:\s]+([^\n<>]+)',
]
for pattern in comment_patterns:
match = re.search(pattern, comment, re.IGNORECASE)
if match:
author_found = match.group(1).strip()
break
if author_found:
break
# 2. Check footer text
if not author_found:
footer = soup.find('footer')
if footer:
footer_text = footer.get_text(separator=' ')
footer_patterns = [
r'(?:wykonanie|realizacja|created by|designed by|made by|developed by)[:\s]+([^|<>\n©]+)',
r'(?:projekt|design|strona)[:\s]+([^|<>\n©]+)',
r'(?:powered by|built with)[:\s]+([^|<>\n©]+)',
r'(?:agencja|agency|studio)[:\s]+([^|<>\n©]+)',
]
for pattern in footer_patterns:
match = re.search(pattern, footer_text, re.IGNORECASE)
if match:
author_found = match.group(1).strip()
break
# 3. Check footer links for agency/studio domains
if not author_found:
footer_links = footer.find_all('a', href=True)
agency_domains = ['.pl', '.com', '.eu']
agency_keywords = ['studio', 'agencja', 'agency', 'design', 'web', 'digital', 'media', 'creative']
for link in footer_links:
href = link.get('href', '')
link_text = link.get_text().strip()
# Check if link looks like an agency
if any(kw in href.lower() or kw in link_text.lower() for kw in agency_keywords):
if any(dom in href for dom in agency_domains) and 'facebook' not in href and 'instagram' not in href:
# Extract domain or link text as author
if link_text and len(link_text) > 2 and len(link_text) < 50:
author_found = link_text
break
# 4. Check entire page for common Polish patterns
if not author_found:
page_text = soup.get_text(separator=' ')
page_patterns = [
r'(?:stronę wykonała?|witrynę wykonała?|stronę stworzył[ao]?)[:\s]+([^|<>\n©.]+)',
r'(?:copyright|©).*?(?:by|przez)[:\s]+([^|<>\n©.]+)',
]
for pattern in page_patterns:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
author_found = match.group(1).strip()
break
# Clean up author name
if author_found:
# Remove common prefixes/suffixes
author_found = re.sub(r'^[\s\-–—:]+', '', author_found)
author_found = re.sub(r'[\s\-–—:]+$', '', author_found)
author_found = re.sub(r'\s+', ' ', author_found)
# Remove if too short or looks like garbage
if len(author_found) > 2 and len(author_found) < 100:
result['site_author'] = author_found[:255]
# Extract social media links
html_lower = html.lower()
for platform, patterns in SOCIAL_MEDIA_PATTERNS.items():
found_for_platform = False
for pattern in patterns:
if found_for_platform:
break # Already found this platform, skip remaining patterns
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
# Get first valid match, excluding common false positives
for match in matches:
# Check against exclusion list (exact match only to avoid false positives)
excludes = SOCIAL_MEDIA_EXCLUDE.get(platform, [])
if match.lower() not in excludes:
# Construct full URL
if platform == 'facebook':
url = f'https://facebook.com/{match}'
elif platform == 'instagram':
url = f'https://instagram.com/{match}'
elif platform == 'youtube':
if match.startswith('@'):
url = f'https://youtube.com/{match}'
else:
url = f'https://youtube.com/channel/{match}'
elif platform == 'linkedin':
url = f'https://linkedin.com/company/{match}'
elif platform == 'tiktok':
url = f'https://tiktok.com/@{match}'
elif platform == 'twitter':
url = f'https://twitter.com/{match}'
else:
continue
result['social_media_links'][platform] = url
found_for_platform = True
break # Found valid match, stop searching this pattern's matches
except Exception as e:
result['errors'] = [f'HTML parsing: {str(e)}']
return result
class GooglePlacesSearcher:
"""Search for Google Business profiles using Google Places API."""
# Google Places API configuration
FIND_PLACE_URL = 'https://maps.googleapis.com/maps/api/place/findplacefromtext/json'
PLACE_DETAILS_URL = 'https://maps.googleapis.com/maps/api/place/details/json'
def __init__(self, api_key: Optional[str] = None):
"""
Initialize GooglePlacesSearcher.
Args:
api_key: Google Places API key. Falls back to GOOGLE_PLACES_API_KEY env var.
"""
self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY')
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def find_place(self, company_name: str, city: str = 'Wejherowo') -> Optional[str]:
"""
Find a place by company name and city.
Uses Google Places findplacefromtext API to search for a business
and returns the place_id if found.
Args:
company_name: Name of the company to search for.
city: City to narrow down the search (default: Wejherowo).
Returns:
place_id string if found, None otherwise.
"""
if not self.api_key:
logger.warning('Google Places API key not configured')
return None
try:
# Construct search query with company name and city
search_query = f'{company_name} {city}'
params = {
'input': search_query,
'inputtype': 'textquery',
'fields': 'place_id,name,formatted_address',
'language': 'pl',
'key': self.api_key,
}
response = self.session.get(
self.FIND_PLACE_URL,
params=params,
timeout=REQUEST_TIMEOUT
)
response.raise_for_status()
data = response.json()
if data.get('status') == 'OK' and data.get('candidates'):
candidate = data['candidates'][0]
place_id = candidate.get('place_id')
logger.info(
f"Found place for '{company_name}': {candidate.get('name')} "
f"at {candidate.get('formatted_address')}"
)
return place_id
elif data.get('status') == 'ZERO_RESULTS':
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
return None
else:
logger.warning(
f"Google Places API returned status: {data.get('status')} "
f"for '{company_name}'"
)
return None
except requests.exceptions.Timeout:
logger.error(f"Timeout searching for '{company_name}' on Google Places")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request error searching for '{company_name}': {e}")
return None
except Exception as e:
logger.error(f"Error finding place for '{company_name}': {e}")
return None
def get_place_details(self, place_id: str) -> Dict[str, Any]:
"""
Get detailed information about a place.
Retrieves rating, review count, opening hours, and other business details
from Google Places API.
Args:
place_id: Google Place ID returned from find_place().
Returns:
Dict containing:
- google_rating: Decimal rating (1.0-5.0) or None
- google_reviews_count: Integer review count or None
- opening_hours: Dict with weekday_text and open_now, or None
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
- formatted_phone: Phone number or None
- website: Website URL or None
"""
result = {
'google_rating': None,
'google_reviews_count': None,
'google_photos_count': None,
'opening_hours': None,
'business_status': None,
'formatted_phone': None,
'website': None,
}
if not self.api_key:
logger.warning('Google Places API key not configured')
return result
if not place_id:
return result
try:
# Request fields we need for the audit
fields = [
'rating',
'user_ratings_total',
'opening_hours',
'business_status',
'formatted_phone_number',
'website',
'name',
'photos',
]
params = {
'place_id': place_id,
'fields': ','.join(fields),
'language': 'pl',
'key': self.api_key,
}
response = self.session.get(
self.PLACE_DETAILS_URL,
params=params,
timeout=REQUEST_TIMEOUT
)
response.raise_for_status()
data = response.json()
if data.get('status') == 'OK' and data.get('result'):
place = data['result']
# Extract rating
if 'rating' in place:
result['google_rating'] = round(float(place['rating']), 1)
# Extract review count
if 'user_ratings_total' in place:
result['google_reviews_count'] = int(place['user_ratings_total'])
# Extract opening hours
if 'opening_hours' in place:
hours = place['opening_hours']
result['opening_hours'] = {
'weekday_text': hours.get('weekday_text', []),
'open_now': hours.get('open_now'),
'periods': hours.get('periods', []),
}
# Extract business status
if 'business_status' in place:
result['business_status'] = place['business_status']
# Extract phone
if 'formatted_phone_number' in place:
result['formatted_phone'] = place['formatted_phone_number']
# Extract website
if 'website' in place:
result['website'] = place['website']
# Extract photos count
if 'photos' in place:
result['google_photos_count'] = len(place['photos'])
logger.info(
f"Retrieved details for {place.get('name')}: "
f"rating={result['google_rating']}, "
f"reviews={result['google_reviews_count']}, "
f"photos={result['google_photos_count']}"
)
else:
logger.warning(
f"Google Places API returned status: {data.get('status')} "
f"for place_id: {place_id}"
)
except requests.exceptions.Timeout:
logger.error(f"Timeout getting details for place_id: {place_id}")
except requests.exceptions.RequestException as e:
logger.error(f"Request error getting place details: {e}")
except Exception as e:
logger.error(f"Error getting place details for {place_id}: {e}")
return result
class BraveSearcher:
"""Search for social media profiles and Google reviews using Brave Search."""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.getenv('BRAVE_API_KEY')
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def search_social_media(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, str]:
"""
Search for company social media profiles.
Returns dict of platform -> url.
"""
results = {}
platforms = [
('facebook', f'{company_name} {city} facebook'),
('instagram', f'{company_name} instagram'),
('tiktok', f'{company_name} tiktok'),
('youtube', f'{company_name} youtube kanał'),
('linkedin', f'{company_name} linkedin'),
]
for platform, query in platforms:
try:
url = self._search_brave(query, platform)
if url:
results[platform] = url
time.sleep(0.5) # Rate limiting
except Exception as e:
logger.warning(f'Brave search failed for {platform}: {e}')
return results
def search_google_reviews(self, company_name: str, city: str = 'Wejherowo') -> Dict[str, Any]:
"""
Search for Google reviews using Google Places API.
This method uses the GooglePlacesSearcher to find the company on Google
and retrieve its rating and review count.
Args:
company_name: Name of the company to search for.
city: City to narrow down the search (default: Wejherowo).
Returns:
Dict containing:
- google_rating: Decimal rating (1.0-5.0) or None
- google_reviews_count: Integer review count or None
- opening_hours: Dict with weekday_text and open_now, or None
- business_status: String like 'OPERATIONAL', 'CLOSED_TEMPORARILY', etc.
"""
result = {
'google_rating': None,
'google_reviews_count': None,
'opening_hours': None,
'business_status': None,
}
try:
# Use Google Places API for accurate data
google_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
if google_api_key:
# Use GooglePlacesSearcher for accurate data retrieval
places_searcher = GooglePlacesSearcher(api_key=google_api_key)
# Step 1: Find the place by company name and city
place_id = places_searcher.find_place(company_name, city)
if place_id:
# Step 2: Get detailed information including reviews
details = places_searcher.get_place_details(place_id)
result['google_rating'] = details.get('google_rating')
result['google_reviews_count'] = details.get('google_reviews_count')
result['opening_hours'] = details.get('opening_hours')
result['business_status'] = details.get('business_status')
logger.info(
f"Google reviews for '{company_name}': "
f"rating={result['google_rating']}, "
f"reviews={result['google_reviews_count']}, "
f"status={result['business_status']}"
)
else:
logger.info(f"No Google Business Profile found for '{company_name}' in {city}")
else:
# Fallback: Try Brave Search API if available
if self.api_key:
brave_result = self._search_brave_for_reviews(company_name, city)
if brave_result:
result.update(brave_result)
else:
logger.warning(
'Neither GOOGLE_PLACES_API_KEY nor BRAVE_API_KEY configured. '
'Cannot retrieve Google reviews data.'
)
except Exception as e:
logger.warning(f'Google reviews search failed for {company_name}: {e}')
return result
def _search_brave_for_reviews(self, company_name: str, city: str) -> Optional[Dict[str, Any]]:
"""
Fallback method to search for Google reviews via Brave Search API.
This parses search results to extract rating and review count from
Google Business snippets in search results.
Args:
company_name: Name of the company.
city: City for location context.
Returns:
Dict with google_rating and google_reviews_count, or None if not found.
"""
if not self.api_key:
return None
try:
query = f'{company_name} {city} opinie google'
# Brave Web Search API endpoint
url = 'https://api.search.brave.com/res/v1/web/search'
headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip',
'X-Subscription-Token': self.api_key,
}
params = {
'q': query,
'count': 10,
'country': 'pl',
'search_lang': 'pl',
'ui_lang': 'pl-PL',
}
response = self.session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
data = response.json()
# Parse search results for rating/review patterns
# Google snippets often contain patterns like "4,5 (123 opinii)" or "Rating: 4.5 · 123 reviews"
for result in data.get('web', {}).get('results', []):
snippet = result.get('description', '') + ' ' + result.get('title', '')
# Pattern for Polish Google reviews: "4,5 (123 opinii)" or "4.5 · 123 reviews"
rating_patterns = [
r'(\d+[,\.]\d)\s*[·\(]\s*(\d+)\s*(?:opinii|recenzji|reviews)',
r'ocena[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:opinii|recenzji)',
r'rating[:\s]+(\d+[,\.]\d).*?(\d+)\s*(?:reviews|opinii)',
]
for pattern in rating_patterns:
match = re.search(pattern, snippet, re.IGNORECASE)
if match:
rating_str = match.group(1).replace(',', '.')
reviews_str = match.group(2)
return {
'google_rating': round(float(rating_str), 1),
'google_reviews_count': int(reviews_str),
}
logger.info(f"No Google reviews data found in Brave results for '{company_name}'")
return None
except requests.exceptions.Timeout:
logger.warning(f"Timeout searching Brave for '{company_name}' reviews")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"Brave API request failed for '{company_name}': {e}")
return None
except Exception as e:
logger.warning(f"Error parsing Brave results for '{company_name}': {e}")
return None
def _search_brave(self, query: str, platform: str) -> Optional[str]:
"""
Perform Brave search and extract relevant URL.
Note: This is a placeholder - actual implementation would use Brave API.
"""
# Placeholder for Brave Search API integration
# In production, this would call the Brave Search API
return None
class SocialMediaAuditor:
"""Main auditor class that coordinates website and social media auditing."""
def __init__(self, database_url: str = DATABASE_URL):
self.engine = create_engine(database_url)
self.Session = sessionmaker(bind=self.engine)
self.website_auditor = WebsiteAuditor()
self.brave_searcher = BraveSearcher()
# Initialize Google Places searcher if API key is available
google_places_api_key = os.getenv('GOOGLE_PLACES_API_KEY')
if google_places_api_key:
self.google_places_searcher = GooglePlacesSearcher(api_key=google_places_api_key)
logger.info('Google Places API key found - using Places API for reviews')
else:
self.google_places_searcher = None
logger.info('GOOGLE_PLACES_API_KEY not set - falling back to Brave Search for reviews')
def get_companies(self, company_ids: Optional[List[int]] = None,
batch_start: Optional[int] = None,
batch_end: Optional[int] = None) -> List[Dict]:
"""Fetch companies from database."""
with self.Session() as session:
if company_ids:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
WHERE id = ANY(:ids)
ORDER BY id
""")
result = session.execute(query, {'ids': company_ids})
elif batch_start is not None and batch_end is not None:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
ORDER BY id
OFFSET :offset LIMIT :limit
""")
result = session.execute(query, {
'offset': batch_start - 1,
'limit': batch_end - batch_start + 1
})
else:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
ORDER BY id
""")
result = session.execute(query)
return [dict(row._mapping) for row in result]
def get_company_id_by_slug(self, slug: str) -> Optional[int]:
"""Get company ID by slug."""
with self.Session() as session:
query = text("""
SELECT id FROM companies WHERE slug = :slug
""")
result = session.execute(query, {'slug': slug})
row = result.fetchone()
if row:
return row[0]
return None
def audit_company(self, company: Dict) -> Dict[str, Any]:
"""
Perform full audit for a single company.
Returns comprehensive audit result.
"""
logger.info(f"Auditing company: {company['name']} (ID: {company['id']})")
logger.info(f"Company website: {company.get('website', 'NOT SET')}")
result = {
'company_id': company['id'],
'company_name': company['name'],
'audit_date': datetime.now(),
'website': {},
'social_media': {},
'google_reviews': {},
'errors': [],
}
# 1. Website audit
if company.get('website'):
try:
logger.info(f"Starting website audit for: {company['website']}")
result['website'] = self.website_auditor.audit_website(company['website'])
logger.info(f"Website audit completed. HTTP status: {result['website'].get('http_status')}")
except Exception as e:
logger.error(f"Website audit failed: {str(e)}")
result['errors'].append(f'Website audit failed: {str(e)}')
else:
logger.warning(f"No website URL for company {company['name']}")
result['website'] = {'errors': ['No website URL']}
# 2. Social media from website
website_social = result['website'].get('social_media_links', {})
if website_social:
logger.info(f"Social media found on website: {list(website_social.keys())}")
else:
logger.info("No social media links found on website")
# 3. Search for additional social media via Brave
city = company.get('address_city', 'Wejherowo')
try:
logger.info(f"Searching Brave for social media: {company['name']} in {city}")
brave_social = self.brave_searcher.search_social_media(company['name'], city)
if brave_social:
logger.info(f"Brave search found: {list(brave_social.keys())}")
else:
logger.info("Brave search found no additional social media")
# Merge, website takes precedence
for platform, url in brave_social.items():
if platform not in website_social:
website_social[platform] = url
logger.info(f"Added {platform} from Brave search: {url}")
except Exception as e:
logger.warning(f"Brave search failed: {str(e)}")
result['errors'].append(f'Brave search failed: {str(e)}')
result['social_media'] = website_social
logger.info(f"Total social media profiles found: {len(website_social)} - {list(website_social.keys())}")
# 4. Google reviews search - prefer Google Places API if available
try:
if self.google_places_searcher:
# Use Google Places API directly for accurate data
place_id = self.google_places_searcher.find_place(company['name'], city)
if place_id:
details = self.google_places_searcher.get_place_details(place_id)
result['google_reviews'] = {
'google_rating': details.get('google_rating'),
'google_reviews_count': details.get('google_reviews_count'),
'google_opening_hours': details.get('opening_hours'),
'google_photos_count': details.get('google_photos_count'),
'business_status': details.get('business_status'),
}
else:
result['google_reviews'] = {
'google_rating': None,
'google_reviews_count': None,
'google_opening_hours': None,
'google_photos_count': None,
'business_status': None,
}
else:
# Fallback to Brave Search
result['google_reviews'] = self.brave_searcher.search_google_reviews(
company['name'], city
)
except Exception as e:
result['errors'].append(f'Google reviews search failed: {str(e)}')
return result
def save_audit_result(self, result: Dict) -> bool:
"""Save audit result to database."""
try:
with self.Session() as session:
company_id = result['company_id']
website = result.get('website', {})
# Update or insert website analysis
upsert_website = text("""
INSERT INTO company_website_analysis (
company_id, analyzed_at, website_url, http_status_code,
load_time_ms, has_ssl, ssl_expires_at, ssl_issuer, is_responsive,
is_mobile_friendly, has_viewport_meta, last_modified_at,
hosting_provider, hosting_ip, server_software, site_author,
cms_detected, google_rating, google_reviews_count,
google_opening_hours, google_photos_count,
audit_source, audit_version
) VALUES (
:company_id, :analyzed_at, :website_url, :http_status_code,
:load_time_ms, :has_ssl, :ssl_expires_at, :ssl_issuer, :is_responsive,
:is_mobile_friendly, :has_viewport_meta, :last_modified_at,
:hosting_provider, :hosting_ip, :server_software, :site_author,
:cms_detected, :google_rating, :google_reviews_count,
:google_opening_hours, :google_photos_count,
:audit_source, :audit_version
)
ON CONFLICT (company_id) DO UPDATE SET
analyzed_at = EXCLUDED.analyzed_at,
http_status_code = EXCLUDED.http_status_code,
load_time_ms = EXCLUDED.load_time_ms,
has_ssl = EXCLUDED.has_ssl,
ssl_expires_at = EXCLUDED.ssl_expires_at,
ssl_issuer = EXCLUDED.ssl_issuer,
is_mobile_friendly = EXCLUDED.is_mobile_friendly,
has_viewport_meta = EXCLUDED.has_viewport_meta,
last_modified_at = EXCLUDED.last_modified_at,
hosting_provider = EXCLUDED.hosting_provider,
hosting_ip = EXCLUDED.hosting_ip,
server_software = EXCLUDED.server_software,
site_author = EXCLUDED.site_author,
cms_detected = EXCLUDED.cms_detected,
google_rating = EXCLUDED.google_rating,
google_reviews_count = EXCLUDED.google_reviews_count,
google_opening_hours = EXCLUDED.google_opening_hours,
google_photos_count = EXCLUDED.google_photos_count,
audit_source = EXCLUDED.audit_source,
audit_version = EXCLUDED.audit_version
""")
google_reviews = result.get('google_reviews', {})
# Convert opening_hours dict to JSON string for JSONB column
opening_hours = google_reviews.get('google_opening_hours')
opening_hours_json = json.dumps(opening_hours) if opening_hours else None
session.execute(upsert_website, {
'company_id': company_id,
'analyzed_at': result['audit_date'],
'website_url': website.get('url'),
'http_status_code': website.get('http_status'),
'load_time_ms': website.get('load_time_ms'),
'has_ssl': website.get('has_ssl', False),
'ssl_expires_at': website.get('ssl_expiry'),
'ssl_issuer': website.get('ssl_issuer'),
'is_responsive': website.get('is_mobile_friendly', False),
'is_mobile_friendly': website.get('is_mobile_friendly', False),
'has_viewport_meta': website.get('has_viewport_meta', False),
'last_modified_at': website.get('last_modified_at'),
'hosting_provider': website.get('hosting_provider'),
'hosting_ip': website.get('hosting_ip'),
'server_software': website.get('server_software'),
'site_author': website.get('site_author'),
'cms_detected': website.get('site_generator'),
'google_rating': google_reviews.get('google_rating'),
'google_reviews_count': google_reviews.get('google_reviews_count'),
'google_opening_hours': opening_hours_json,
'google_photos_count': google_reviews.get('google_photos_count'),
'audit_source': 'automated',
'audit_version': '1.0',
})
# Save social media
for platform, url in result.get('social_media', {}).items():
upsert_social = text("""
INSERT INTO company_social_media (
company_id, platform, url, verified_at, source, is_valid
) VALUES (
:company_id, :platform, :url, :verified_at, :source, :is_valid
)
ON CONFLICT (company_id, platform, url) DO UPDATE SET
verified_at = EXCLUDED.verified_at,
source = EXCLUDED.source,
is_valid = EXCLUDED.is_valid
""")
session.execute(upsert_social, {
'company_id': company_id,
'platform': platform,
'url': url,
'verified_at': result['audit_date'],
'source': 'website_scrape',
'is_valid': True,
})
session.commit()
logger.info(f"Saved audit for company {company_id}")
return True
except Exception as e:
logger.error(f"Failed to save audit result: {e}")
return False
def run_audit(self, company_ids: Optional[List[int]] = None,
batch_start: Optional[int] = None,
batch_end: Optional[int] = None,
dry_run: bool = False) -> Dict[str, Any]:
"""
Run audit for specified companies.
Returns summary of audit results.
"""
companies = self.get_companies(company_ids, batch_start, batch_end)
summary = {
'total': len(companies),
'success': 0,
'failed': 0,
'results': [],
}
for company in companies:
try:
result = self.audit_company(company)
if not dry_run:
if self.save_audit_result(result):
summary['success'] += 1
else:
summary['failed'] += 1
else:
summary['success'] += 1
print(json.dumps(result, default=str, indent=2))
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': 'success',
'social_media_found': len(result.get('social_media', {})),
})
except Exception as e:
logger.error(f"Audit failed for company {company['id']}: {e}")
summary['failed'] += 1
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': 'failed',
'error': str(e),
})
return summary
def main():
parser = argparse.ArgumentParser(description='Social Media & Website Audit')
parser.add_argument('--company-id', type=int, help='Audit single company by ID')
parser.add_argument('--company-slug', type=str, help='Audit single company by slug')
parser.add_argument('--batch', type=str, help='Audit batch of companies (e.g., 1-10)')
parser.add_argument('--all', action='store_true', help='Audit all companies')
parser.add_argument('--dry-run', action='store_true', help='Print results without saving')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
auditor = SocialMediaAuditor()
if args.company_id:
summary = auditor.run_audit(company_ids=[args.company_id], dry_run=args.dry_run)
elif args.company_slug:
# Look up company ID by slug
company_id = auditor.get_company_id_by_slug(args.company_slug)
if company_id:
summary = auditor.run_audit(company_ids=[company_id], dry_run=args.dry_run)
else:
print(f"Error: Company with slug '{args.company_slug}' not found")
sys.exit(1)
elif args.batch:
start, end = map(int, args.batch.split('-'))
summary = auditor.run_audit(batch_start=start, batch_end=end, dry_run=args.dry_run)
elif args.all:
summary = auditor.run_audit(dry_run=args.dry_run)
else:
parser.print_help()
sys.exit(1)
print("\n" + "=" * 60)
print(f"AUDIT SUMMARY")
print("=" * 60)
print(f"Total companies: {summary['total']}")
print(f"Successful: {summary['success']}")
print(f"Failed: {summary['failed']}")
print("=" * 60)
if __name__ == '__main__':
main()