nordabiz/scripts/seo_audit.py
Maciej Pienczyn 05a09812a5
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(seo-audit): Move load_dotenv before module-level env reads
DATABASE_URL and PAGESPEED_API_KEY are read at module level (import
time), so load_dotenv must run before third-party imports that
reference these variables.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 15:27:43 +01:00

1811 lines
76 KiB
Python

#!/usr/bin/env python3
"""
SEO Audit Script for Norda Biznes
=================================
Performs comprehensive SEO audit of company websites using:
- Google PageSpeed Insights API (performance, accessibility, SEO scores)
- On-page SEO analysis (meta tags, headings, images, links, structured data)
- Technical SEO checks (robots.txt, sitemap, canonical, indexability)
Designed to run in batches with rate limiting for API quota management.
Usage:
python seo_audit.py --company-id 26
python seo_audit.py --batch 1-10
python seo_audit.py --all
python seo_audit.py --company-id 26 --dry-run
Exit codes:
0 - All audits completed successfully
1 - Argument error or invalid input
2 - Partial failures (some audits failed)
3 - All audits failed
4 - Database connection error
5 - API quota exceeded
Author: Claude Code
Date: 2026-01-08
"""
import os
import sys
import re
import json
import ssl
import socket
import argparse
import logging
import time as time_module
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any, Tuple
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '.env'))
import requests
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
# Import SEO analysis components
from pagespeed_client import (
GooglePageSpeedClient,
PageSpeedResult,
PageSpeedAPIError,
QuotaExceededError,
Strategy,
)
from seo_analyzer import (
OnPageSEOAnalyzer,
OnPageSEOResult,
TechnicalSEOChecker,
TechnicalSEOResult,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Exit codes
EXIT_SUCCESS = 0
EXIT_ARGUMENT_ERROR = 1
EXIT_PARTIAL_FAILURES = 2
EXIT_ALL_FAILED = 3
EXIT_DATABASE_ERROR = 4
EXIT_QUOTA_EXCEEDED = 5
# Database configuration
# WARNING: The fallback DATABASE_URL uses a placeholder password.
# Production credentials MUST be set via the DATABASE_URL environment variable.
# NEVER commit real credentials to version control (CWE-798).
DATABASE_URL = os.getenv(
'DATABASE_URL',
'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz'
)
# Request configuration
REQUEST_TIMEOUT = 30
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 NordaBiznes-SEO-Auditor/1.0'
# SEO Audit version for tracking
SEO_AUDIT_VERSION = '1.0.0'
class LocalSEOAnalyzer:
"""Analyzes Local SEO factors for business websites."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def analyze(self, html_content: str, url: str, company_data: Dict = None) -> Dict[str, Any]:
"""Run all local SEO checks on HTML content."""
result = {
'local_seo_score': 0,
'has_local_business_schema': False,
'local_business_schema_fields': {},
'nap_on_website': {},
'has_google_maps_embed': False,
'has_local_keywords': False,
'local_keywords_found': [],
}
soup = BeautifulSoup(html_content, 'html.parser')
# Check LocalBusiness schema
schema_result = self._check_local_business_schema(html_content)
result.update(schema_result)
# Extract NAP from website
nap = self._extract_nap(soup, html_content)
result['nap_on_website'] = nap
# Check Google Maps embed
result['has_google_maps_embed'] = self._check_google_maps(html_content)
# Check local keywords
city = (company_data or {}).get('address_city', 'Wejherowo')
keywords = self._find_local_keywords(soup, html_content, city)
result['has_local_keywords'] = len(keywords) > 0
result['local_keywords_found'] = keywords[:20]
# Calculate local SEO score
result['local_seo_score'] = self._calculate_local_score(result)
return result
def _check_local_business_schema(self, html: str) -> Dict[str, Any]:
"""Check for Schema.org LocalBusiness structured data."""
import json as json_mod
result = {
'has_local_business_schema': False,
'local_business_schema_fields': {},
}
# Find JSON-LD blocks
ld_pattern = re.compile(r'<script[^>]*type=["\']application/ld\+json["\'][^>]*>(.*?)</script>', re.DOTALL | re.IGNORECASE)
matches = ld_pattern.findall(html)
local_types = ['LocalBusiness', 'Organization', 'Store', 'Restaurant',
'ProfessionalService', 'AutoRepair', 'HealthAndBeautyBusiness',
'LodgingBusiness', 'FoodEstablishment', 'FinancialService']
for match in matches:
try:
data = json_mod.loads(match.strip())
items = [data] if isinstance(data, dict) else data if isinstance(data, list) else []
for item in items:
item_type = item.get('@type', '')
if isinstance(item_type, list):
item_type = item_type[0] if item_type else ''
if item_type in local_types:
result['has_local_business_schema'] = True
# Check which fields are present
important_fields = ['name', 'address', 'telephone', 'email',
'url', 'openingHours', 'openingHoursSpecification',
'geo', 'image', 'description', 'priceRange',
'areaServed', 'aggregateRating']
for field in important_fields:
result['local_business_schema_fields'][field] = field in item and bool(item[field])
break
except (json_mod.JSONDecodeError, TypeError):
continue
return result
def _extract_nap(self, soup, html: str) -> Dict[str, Any]:
"""Extract Name, Address, Phone from website HTML."""
nap = {'name': None, 'address': None, 'phone': None}
text = soup.get_text(separator=' ')
# Phone patterns (Polish format)
phone_patterns = [
r'(?:tel\.?|telefon|phone|zadzwoń)[:\s]*([+]?\d[\d\s\-]{7,15})',
r'(?:href="tel:)([+]?\d[\d\-]{7,15})"',
r'(\+48[\s\-]?\d{3}[\s\-]?\d{3}[\s\-]?\d{3})',
r'(\d{2}[\s\-]\d{3}[\s\-]\d{2}[\s\-]\d{2})',
]
for pattern in phone_patterns:
match = re.search(pattern, html, re.IGNORECASE)
if match:
phone = re.sub(r'[\s\-]', '', match.group(1))
if len(phone) >= 9:
nap['phone'] = match.group(1).strip()
break
# Address patterns (Polish)
address_patterns = [
r'(?:ul\.?|ulica)\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144\s]+\s+\d+[a-zA-Z]?(?:/\d+)?(?:,?\s+\d{2}-\d{3}\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144]+)?',
r'\d{2}-\d{3}\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144]+',
]
for pattern in address_patterns:
match = re.search(pattern, text)
if match:
nap['address'] = match.group(0).strip()[:200]
break
# Business name from structured data or og:site_name
og_site = soup.find('meta', property='og:site_name')
if og_site and og_site.get('content'):
nap['name'] = og_site['content'].strip()[:200]
return nap
def _check_google_maps(self, html: str) -> bool:
"""Check if page has embedded Google Maps."""
maps_patterns = [
r'maps\.googleapis\.com',
r'maps\.google\.com/maps',
r'google\.com/maps/embed',
r'<iframe[^>]*google[^>]*maps[^>]*>',
]
return any(re.search(p, html, re.IGNORECASE) for p in maps_patterns)
def _find_local_keywords(self, soup, html: str, city: str) -> List[str]:
"""Find local keywords in page content (service + city patterns)."""
keywords_found = []
text = soup.get_text(separator=' ').lower()
# Common service keywords for Polish businesses
service_keywords = [
'hydraulik', 'elektryk', 'mechanik', 'fryzjer', 'dentysta',
'prawnik', 'adwokat', 'księgowy', 'architekt', 'fotograf',
'restauracja', 'hotel', 'sklep', 'serwis', 'naprawa',
'instalacje', 'remonty', 'transport', 'catering',
'szkolenia', 'kursy', 'gabinet', 'klinika', 'studio',
]
city_lower = city.lower() if city else 'wejherowo'
nearby_cities = ['wejherowo', 'rumia', 'reda', 'gdynia', 'gdańsk', 'sopot', 'puck', 'luzino']
for keyword in service_keywords:
for c in nearby_cities:
phrase = f'{keyword} {c}'
if phrase in text:
keywords_found.append(phrase)
# Also check meta title and description
title = (soup.title.string if soup.title else '').lower()
meta_desc = ''
desc_tag = soup.find('meta', {'name': 'description'})
if desc_tag:
meta_desc = (desc_tag.get('content', '') or '').lower()
if city_lower in title:
keywords_found.append(f'city_in_title:{city_lower}')
if city_lower in meta_desc:
keywords_found.append(f'city_in_description:{city_lower}')
return list(set(keywords_found))
def _calculate_local_score(self, data: Dict) -> int:
"""Calculate Local SEO score 0-100."""
score = 0
if data.get('has_local_business_schema'):
score += 25
# Bonus for complete schema
fields = data.get('local_business_schema_fields', {})
filled = sum(1 for v in fields.values() if v)
total = len(fields)
if total > 0:
score += int(10 * (filled / total))
nap = data.get('nap_on_website', {})
if nap.get('name'): score += 10
if nap.get('address'): score += 10
if nap.get('phone'): score += 10
if data.get('has_google_maps_embed'): score += 15
if data.get('has_local_keywords'): score += 15
# Bonus for multiple local keywords
kw_count = len(data.get('local_keywords_found', []))
if kw_count >= 5: score += 5
return min(score, 100)
class CitationChecker:
"""Checks company presence in Polish local business directories."""
# Polish business directories to check
DIRECTORIES = [
{'name': 'panoramafirm.pl', 'url': 'https://panoramafirm.pl', 'search_domain': 'panoramafirm.pl'},
{'name': 'pkt.pl', 'url': 'https://pkt.pl', 'search_domain': 'pkt.pl'},
{'name': 'aleo.com', 'url': 'https://aleo.com', 'search_domain': 'aleo.com'},
{'name': 'firmy.net', 'url': 'https://firmy.net', 'search_domain': 'firmy.net'},
{'name': 'zumi.pl', 'url': 'https://zumi.pl', 'search_domain': 'zumi.pl'},
{'name': 'gowork.pl', 'url': 'https://gowork.pl', 'search_domain': 'gowork.pl'},
{'name': 'oferteo.pl', 'url': 'https://oferteo.pl', 'search_domain': 'oferteo.pl'},
{'name': 'google.com/maps', 'url': 'https://google.com/maps', 'search_domain': 'google.com/maps'},
{'name': 'facebook.com', 'url': 'https://facebook.com', 'search_domain': 'facebook.com'},
{'name': 'yelp.com', 'url': 'https://yelp.com', 'search_domain': 'yelp.com'},
]
def __init__(self):
self.brave_api_key = os.getenv('BRAVE_API_KEY')
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def check_citations(self, company_name: str, city: str = 'Wejherowo') -> List[Dict[str, Any]]:
"""Check if company is listed in directories."""
results = []
if not self.brave_api_key:
logger.warning("BRAVE_API_KEY not set, citation check skipped")
return results
for directory in self.DIRECTORIES:
try:
citation = self._check_single_directory(company_name, city, directory)
results.append(citation)
# Rate limit
time_module.sleep(0.5)
except Exception as e:
logger.warning(f"Citation check failed for {directory['name']}: {e}")
results.append({
'directory_name': directory['name'],
'directory_url': directory['url'],
'status': 'error',
'listing_url': None,
})
return results
def _check_single_directory(self, company_name: str, city: str, directory: Dict) -> Dict:
"""Check one directory using Brave Search."""
query = f'"{company_name}" site:{directory["search_domain"]}'
try:
resp = self.session.get(
'https://api.search.brave.com/res/v1/web/search',
params={'q': query, 'count': 3},
headers={'X-Subscription-Token': self.brave_api_key},
timeout=10
)
resp.raise_for_status()
data = resp.json()
results = data.get('web', {}).get('results', [])
if results:
return {
'directory_name': directory['name'],
'directory_url': directory['url'],
'listing_url': results[0].get('url'),
'status': 'found',
}
else:
return {
'directory_name': directory['name'],
'directory_url': directory['url'],
'listing_url': None,
'status': 'not_found',
}
except Exception as e:
logger.debug(f"Brave search for {directory['name']}: {e}")
return {
'directory_name': directory['name'],
'directory_url': directory['url'],
'listing_url': None,
'status': 'error',
}
class ContentFreshnessChecker:
"""Checks content freshness of a website."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def check_freshness(self, url: str, html_content: str = None) -> Dict[str, Any]:
"""Check content freshness indicators."""
result = {
'last_content_update': None,
'content_freshness_score': 0,
}
# Check Last-Modified header
try:
resp = self.session.head(url, timeout=10, allow_redirects=True)
last_modified = resp.headers.get('Last-Modified')
if last_modified:
from email.utils import parsedate_to_datetime
try:
result['last_content_update'] = parsedate_to_datetime(last_modified)
except Exception:
pass
except Exception:
pass
# Check dates in HTML content
if html_content:
soup = BeautifulSoup(html_content, 'html.parser')
# Look for date patterns in the page
date_patterns = [
r'20\d{2}[-./]\d{1,2}[-./]\d{1,2}',
r'\d{1,2}[-./]\d{1,2}[-./]20\d{2}',
]
text = soup.get_text()
latest_date = None
for pattern in date_patterns:
matches = re.findall(pattern, text)
for m in matches:
try:
# Try parsing various formats
for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%d.%m.%Y', '%d-%m-%Y', '%d/%m/%Y']:
try:
d = datetime.strptime(m, fmt)
if d.year >= 2020 and d <= datetime.now():
if latest_date is None or d > latest_date:
latest_date = d
break
except ValueError:
continue
except Exception:
continue
if latest_date and (result['last_content_update'] is None or latest_date > result['last_content_update']):
result['last_content_update'] = latest_date
# Calculate freshness score
if result['last_content_update']:
days_old = (datetime.now() - result['last_content_update']).days
if days_old <= 30:
result['content_freshness_score'] = 100
elif days_old <= 90:
result['content_freshness_score'] = 80
elif days_old <= 180:
result['content_freshness_score'] = 60
elif days_old <= 365:
result['content_freshness_score'] = 40
else:
result['content_freshness_score'] = 20
else:
result['content_freshness_score'] = 10 # Unknown = low score
return result
class SEOAuditor:
"""
Main SEO auditor class that coordinates website SEO auditing.
Follows the same pattern as SocialMediaAuditor from social_media_audit.py.
Orchestrates PageSpeed API, on-page analysis, and technical SEO checks.
"""
def __init__(self, database_url: str = DATABASE_URL):
"""
Initialize SEO Auditor.
Args:
database_url: Database connection string.
"""
self.engine = create_engine(database_url)
self.Session = sessionmaker(bind=self.engine)
# Initialize analysis components
self.pagespeed_client = GooglePageSpeedClient()
self.onpage_analyzer = OnPageSEOAnalyzer()
self.technical_checker = TechnicalSEOChecker()
self.local_seo_analyzer = LocalSEOAnalyzer()
self.citation_checker = CitationChecker()
self.freshness_checker = ContentFreshnessChecker()
# HTTP session for fetching pages
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def get_companies(self, company_ids: Optional[List[int]] = None,
batch_start: Optional[int] = None,
batch_end: Optional[int] = None) -> List[Dict]:
"""
Fetch companies from database.
Args:
company_ids: List of specific company IDs to fetch.
batch_start: Start index for batch processing (1-indexed).
batch_end: End index for batch processing (1-indexed).
Returns:
List of company dicts with id, name, slug, website.
"""
with self.Session() as session:
if company_ids:
# Use IN clause for SQLite/PostgreSQL compatibility
placeholders = ', '.join([f':id_{i}' for i in range(len(company_ids))])
query = text(f"""
SELECT id, name, slug, website, address_city
FROM companies
WHERE id IN ({placeholders})
ORDER BY id
""")
params = {f'id_{i}': cid for i, cid in enumerate(company_ids)}
result = session.execute(query, params)
elif batch_start is not None and batch_end is not None:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
ORDER BY id
OFFSET :offset LIMIT :limit
""")
result = session.execute(query, {
'offset': batch_start - 1,
'limit': batch_end - batch_start + 1
})
else:
query = text("""
SELECT id, name, slug, website, address_city
FROM companies
ORDER BY id
""")
result = session.execute(query)
return [dict(row._mapping) for row in result]
def audit_company(self, company: Dict) -> Dict[str, Any]:
"""
Perform full SEO audit for a single company.
Args:
company: Company dict with id, name, slug, website.
Returns:
Comprehensive SEO audit result dict.
"""
logger.info(f"Auditing SEO for: {company['name']} (ID: {company['id']})")
result = {
'company_id': company['id'],
'company_name': company['name'],
'company_slug': company['slug'],
'audit_date': datetime.now(),
'audit_version': SEO_AUDIT_VERSION,
'website_url': company.get('website'),
'pagespeed': None,
'onpage': None,
'technical': None,
'scores': {
'pagespeed_seo': None,
'pagespeed_performance': None,
'pagespeed_accessibility': None,
'pagespeed_best_practices': None,
'overall_seo': None,
},
'errors': [],
}
website_url = company.get('website')
# Check if company has a website
if not website_url:
result['errors'].append('No website URL configured')
logger.warning(f" Company {company['id']} has no website URL")
return result
# Normalize URL
if not website_url.startswith(('http://', 'https://')):
website_url = 'https://' + website_url
result['website_url'] = website_url
# 1. Fetch page HTML for on-page analysis
html_content = None
final_url = website_url
http_status = None
load_time_ms = None
try:
logger.info(f" Fetching page: {website_url}")
start_time = time_module.time()
response = self.session.get(
website_url,
timeout=REQUEST_TIMEOUT,
allow_redirects=True
)
load_time_ms = int((time_module.time() - start_time) * 1000)
http_status = response.status_code
final_url = response.url
if response.status_code == 200:
# Fix encoding: requests defaults to ISO-8859-1 when charset missing
if response.encoding and response.encoding.lower() == 'iso-8859-1':
response.encoding = response.apparent_encoding
html_content = response.text
logger.info(f" Page fetched successfully ({load_time_ms}ms)")
else:
result['errors'].append(f'HTTP {response.status_code}')
logger.warning(f" HTTP {response.status_code} for {website_url}")
except requests.exceptions.SSLError as e:
result['errors'].append(f'SSL Error: {str(e)[:100]}')
logger.warning(f" SSL error for {website_url}: {e}")
# Try HTTP fallback
try:
http_url = website_url.replace('https://', 'http://')
response = self.session.get(http_url, timeout=REQUEST_TIMEOUT)
http_status = response.status_code
final_url = response.url
if response.status_code == 200:
if response.encoding and response.encoding.lower() == 'iso-8859-1':
response.encoding = response.apparent_encoding
html_content = response.text
except Exception as e2:
result['errors'].append(f'HTTP fallback failed: {str(e2)[:50]}')
except requests.exceptions.Timeout:
result['errors'].append(f'Timeout after {REQUEST_TIMEOUT}s')
logger.warning(f" Timeout for {website_url}")
except requests.exceptions.ConnectionError as e:
result['errors'].append(f'Connection error: {str(e)[:100]}')
logger.warning(f" Connection error for {website_url}")
except requests.exceptions.RequestException as e:
result['errors'].append(f'Request error: {str(e)[:100]}')
logger.warning(f" Request error for {website_url}: {e}")
# Store HTTP info
result['http_status'] = http_status
result['load_time_ms'] = load_time_ms
result['final_url'] = final_url
# 2. On-page SEO analysis (if we have HTML)
if html_content:
try:
logger.info(" Running on-page SEO analysis...")
onpage_result = self.onpage_analyzer.analyze_html(
html_content,
base_url=final_url
)
result['onpage'] = onpage_result.to_dict()
logger.info(f" On-page analysis complete")
except Exception as e:
result['errors'].append(f'On-page analysis failed: {str(e)[:100]}')
logger.error(f" On-page analysis error: {e}")
# 3. Technical SEO checks (robots.txt, sitemap, etc.)
try:
logger.info(" Running technical SEO checks...")
technical_result = self.technical_checker.check_url(final_url)
result['technical'] = technical_result.to_dict()
logger.info(f" Technical checks complete")
except Exception as e:
result['errors'].append(f'Technical checks failed: {str(e)[:100]}')
logger.error(f" Technical checks error: {e}")
# 4. PageSpeed Insights API (if quota available)
try:
remaining_quota = self.pagespeed_client.get_remaining_quota()
if remaining_quota > 0:
logger.info(f" Running PageSpeed Insights (quota: {remaining_quota})...")
pagespeed_result = self.pagespeed_client.analyze_url(
final_url,
strategy=Strategy.MOBILE
)
result['pagespeed'] = pagespeed_result.to_dict()
# Extract scores
result['scores']['pagespeed_seo'] = pagespeed_result.scores.seo
result['scores']['pagespeed_performance'] = pagespeed_result.scores.performance
result['scores']['pagespeed_accessibility'] = pagespeed_result.scores.accessibility
result['scores']['pagespeed_best_practices'] = pagespeed_result.scores.best_practices
logger.info(f" PageSpeed complete - SEO: {pagespeed_result.scores.seo}, "
f"Perf: {pagespeed_result.scores.performance}")
else:
result['errors'].append('PageSpeed API quota exceeded')
logger.warning(" PageSpeed quota exceeded, skipping")
except QuotaExceededError:
result['errors'].append('PageSpeed API quota exceeded')
logger.warning(" PageSpeed quota exceeded")
except PageSpeedAPIError as e:
result['errors'].append(f'PageSpeed API error: {str(e)[:100]}')
logger.error(f" PageSpeed error: {e}")
except Exception as e:
result['errors'].append(f'PageSpeed unexpected error: {str(e)[:100]}')
logger.error(f" PageSpeed unexpected error: {e}")
# 6. Local SEO analysis
if html_content:
try:
logger.info(" Running Local SEO analysis...")
local_seo = self.local_seo_analyzer.analyze(html_content, final_url, company)
result['local_seo'] = local_seo
logger.info(f" Local SEO score: {local_seo.get('local_seo_score', 0)}")
except Exception as e:
result['errors'].append(f'Local SEO analysis failed: {str(e)[:100]}')
logger.error(f" Local SEO error: {e}")
# 7. Citation check
try:
city = company.get('address_city', 'Wejherowo')
logger.info(f" Checking citations for '{company['name']}' in {city}...")
citations = self.citation_checker.check_citations(company['name'], city)
result['citations'] = citations
found_count = sum(1 for c in citations if c.get('status') == 'found')
logger.info(f" Citations found: {found_count}/{len(citations)}")
except Exception as e:
result['errors'].append(f'Citation check failed: {str(e)[:100]}')
logger.error(f" Citation check error: {e}")
# 8. Content freshness
try:
logger.info(" Checking content freshness...")
freshness = self.freshness_checker.check_freshness(final_url, html_content)
result['freshness'] = freshness
logger.info(f" Freshness score: {freshness.get('content_freshness_score', 0)}")
except Exception as e:
result['errors'].append(f'Freshness check failed: {str(e)[:100]}')
# 5. Calculate overall SEO score
result['scores']['overall_seo'] = self._calculate_overall_score(result)
return result
def _calculate_overall_score(self, result: Dict[str, Any]) -> Optional[int]:
"""
Calculate an overall SEO score based on all available metrics.
Args:
result: Full audit result dict.
Returns:
Overall SEO score 0-100, or None if insufficient data.
"""
scores = []
weights = []
# PageSpeed SEO score (weight: 3)
if result.get('scores', {}).get('pagespeed_seo') is not None:
scores.append(result['scores']['pagespeed_seo'])
weights.append(3)
# PageSpeed Performance (weight: 2)
if result.get('scores', {}).get('pagespeed_performance') is not None:
scores.append(result['scores']['pagespeed_performance'])
weights.append(2)
# On-page factors score (calculated from analysis)
onpage = result.get('onpage')
if onpage:
onpage_score = self._calculate_onpage_score(onpage)
if onpage_score is not None:
scores.append(onpage_score)
weights.append(2)
# Technical SEO score
technical = result.get('technical')
if technical:
technical_score = self._calculate_technical_score(technical)
if technical_score is not None:
scores.append(technical_score)
weights.append(2)
# Calculate weighted average
if scores and weights:
weighted_sum = sum(s * w for s, w in zip(scores, weights))
total_weight = sum(weights)
return int(round(weighted_sum / total_weight))
return None
def _calculate_onpage_score(self, onpage: Dict[str, Any]) -> Optional[int]:
"""Calculate on-page SEO score from analysis results."""
score = 100
deductions = 0
# Meta tags checks
meta = onpage.get('meta_tags', {})
if not meta.get('title'):
deductions += 15
elif meta.get('title_length', 0) < 30 or meta.get('title_length', 0) > 70:
deductions += 5
if not meta.get('description'):
deductions += 10
elif meta.get('description_length', 0) < 120 or meta.get('description_length', 0) > 160:
deductions += 5
if not meta.get('canonical_url'):
deductions += 5
# Headings check
headings = onpage.get('headings', {})
if headings.get('h1_count', 0) == 0:
deductions += 10
elif headings.get('h1_count', 0) > 1:
deductions += 5
if not headings.get('has_proper_hierarchy', True):
deductions += 5
# Images check
images = onpage.get('images', {})
total_images = images.get('total_images', 0)
images_without_alt = images.get('images_without_alt', 0)
if total_images > 0 and images_without_alt > 0:
alt_ratio = images_without_alt / total_images
if alt_ratio > 0.5:
deductions += 10
elif alt_ratio > 0.2:
deductions += 5
# Structured data check
structured = onpage.get('structured_data', {})
if not structured.get('has_structured_data', False):
deductions += 5
# Open Graph check
og = onpage.get('open_graph', {})
if not og.get('og_title'):
deductions += 3
return max(0, score - deductions)
def _calculate_technical_score(self, technical: Dict[str, Any]) -> Optional[int]:
"""Calculate technical SEO score from check results."""
score = 100
deductions = 0
# Robots.txt check
robots = technical.get('robots_txt', {})
if not robots.get('exists', False):
deductions += 10
elif robots.get('blocks_googlebot', False):
deductions += 20
# Sitemap check
sitemap = technical.get('sitemap', {})
if not sitemap.get('exists', False):
deductions += 10
elif not sitemap.get('is_valid_xml', False):
deductions += 5
# Redirect chain check
redirects = technical.get('redirect_chain', {})
chain_length = redirects.get('chain_length', 0)
if chain_length > 3:
deductions += 10
elif chain_length > 1:
deductions += 5
if redirects.get('has_redirect_loop', False):
deductions += 20
# Indexability check
indexability = technical.get('indexability', {})
if not indexability.get('is_indexable', True):
deductions += 15
# Canonical check
canonical = technical.get('canonical', {})
if canonical.get('has_canonical', False):
if canonical.get('points_to_different_domain', False):
deductions += 10
return max(0, score - deductions)
def save_audit_result(self, result: Dict) -> bool:
"""
Save audit result to database.
Uses ON CONFLICT DO UPDATE for idempotent upserts.
Args:
result: Full audit result dict.
Returns:
True if save was successful, False otherwise.
"""
try:
with self.Session() as session:
company_id = result['company_id']
# Extract values from result (use 'or {}' to handle None values)
onpage = result.get('onpage') or {}
technical = result.get('technical') or {}
pagespeed = result.get('pagespeed') or {}
meta_tags = onpage.get('meta_tags') or {}
headings = onpage.get('headings') or {}
images = onpage.get('images') or {}
links = onpage.get('links') or {}
structured_data = onpage.get('structured_data') or {}
og = onpage.get('open_graph') or {}
tc = onpage.get('twitter_card') or {}
robots = technical.get('robots_txt') or {}
sitemap = technical.get('sitemap') or {}
canonical = technical.get('canonical') or {}
indexability = technical.get('indexability') or {}
cwv = pagespeed.get('core_web_vitals') or {}
ps_scores = pagespeed.get('scores') or {}
# Upsert query for company_website_analysis
# Uses ON CONFLICT DO UPDATE for idempotent upserts
upsert_query = text("""
INSERT INTO company_website_analysis (
company_id, analyzed_at, website_url, final_url,
http_status_code, load_time_ms,
-- PageSpeed Insights
pagespeed_seo_score, pagespeed_performance_score,
pagespeed_accessibility_score, pagespeed_best_practices_score,
pagespeed_audits,
-- On-page SEO
meta_title, meta_description, meta_keywords,
h1_count, h2_count, h3_count, h1_text,
total_images, images_without_alt, images_with_alt,
internal_links_count, external_links_count, broken_links_count,
has_structured_data, structured_data_types, structured_data_json,
-- Technical SEO
has_canonical, canonical_url, is_indexable, noindex_reason,
has_sitemap, has_robots_txt,
viewport_configured, is_mobile_friendly,
-- SSL
has_ssl, ssl_expires_at, ssl_issuer,
-- Core Web Vitals
largest_contentful_paint_ms, interaction_to_next_paint_ms, cumulative_layout_shift,
-- Open Graph
has_og_tags, og_title, og_description, og_image,
has_twitter_cards,
-- Language & International
html_lang, has_hreflang,
-- Word count
word_count_homepage,
-- SEO Audit metadata
seo_audit_version, seo_audited_at, seo_audit_errors,
seo_overall_score, seo_health_score, seo_issues,
-- Local SEO
local_seo_score, has_local_business_schema, local_business_schema_fields,
nap_on_website, has_google_maps_embed, has_local_keywords, local_keywords_found,
-- Citations
citations_found, citations_count,
-- Content freshness
content_freshness_score, last_content_update
) VALUES (
:company_id, :analyzed_at, :website_url, :final_url,
:http_status_code, :load_time_ms,
:pagespeed_seo_score, :pagespeed_performance_score,
:pagespeed_accessibility_score, :pagespeed_best_practices_score,
:pagespeed_audits,
:meta_title, :meta_description, :meta_keywords,
:h1_count, :h2_count, :h3_count, :h1_text,
:total_images, :images_without_alt, :images_with_alt,
:internal_links_count, :external_links_count, :broken_links_count,
:has_structured_data, :structured_data_types, :structured_data_json,
:has_canonical, :canonical_url, :is_indexable, :noindex_reason,
:has_sitemap, :has_robots_txt,
:viewport_configured, :is_mobile_friendly,
:has_ssl, :ssl_expires_at, :ssl_issuer,
:largest_contentful_paint_ms, :interaction_to_next_paint_ms, :cumulative_layout_shift,
:has_og_tags, :og_title, :og_description, :og_image,
:has_twitter_cards,
:html_lang, :has_hreflang,
:word_count_homepage,
:seo_audit_version, :seo_audited_at, :seo_audit_errors,
:seo_overall_score, :seo_health_score, :seo_issues,
:local_seo_score, :has_local_business_schema, :local_business_schema_fields,
:nap_on_website, :has_google_maps_embed, :has_local_keywords, :local_keywords_found,
:citations_found, :citations_count,
:content_freshness_score, :last_content_update
)
ON CONFLICT (company_id) DO UPDATE SET
analyzed_at = EXCLUDED.analyzed_at,
website_url = EXCLUDED.website_url,
final_url = EXCLUDED.final_url,
http_status_code = EXCLUDED.http_status_code,
load_time_ms = EXCLUDED.load_time_ms,
pagespeed_seo_score = EXCLUDED.pagespeed_seo_score,
pagespeed_performance_score = EXCLUDED.pagespeed_performance_score,
pagespeed_accessibility_score = EXCLUDED.pagespeed_accessibility_score,
pagespeed_best_practices_score = EXCLUDED.pagespeed_best_practices_score,
pagespeed_audits = EXCLUDED.pagespeed_audits,
meta_title = EXCLUDED.meta_title,
meta_description = EXCLUDED.meta_description,
meta_keywords = EXCLUDED.meta_keywords,
h1_count = EXCLUDED.h1_count,
h2_count = EXCLUDED.h2_count,
h3_count = EXCLUDED.h3_count,
h1_text = EXCLUDED.h1_text,
total_images = EXCLUDED.total_images,
images_without_alt = EXCLUDED.images_without_alt,
images_with_alt = EXCLUDED.images_with_alt,
internal_links_count = EXCLUDED.internal_links_count,
external_links_count = EXCLUDED.external_links_count,
broken_links_count = EXCLUDED.broken_links_count,
has_structured_data = EXCLUDED.has_structured_data,
structured_data_types = EXCLUDED.structured_data_types,
structured_data_json = EXCLUDED.structured_data_json,
has_canonical = EXCLUDED.has_canonical,
canonical_url = EXCLUDED.canonical_url,
is_indexable = EXCLUDED.is_indexable,
noindex_reason = EXCLUDED.noindex_reason,
has_sitemap = EXCLUDED.has_sitemap,
has_robots_txt = EXCLUDED.has_robots_txt,
viewport_configured = EXCLUDED.viewport_configured,
is_mobile_friendly = EXCLUDED.is_mobile_friendly,
has_ssl = EXCLUDED.has_ssl,
ssl_expires_at = EXCLUDED.ssl_expires_at,
ssl_issuer = EXCLUDED.ssl_issuer,
largest_contentful_paint_ms = EXCLUDED.largest_contentful_paint_ms,
interaction_to_next_paint_ms = EXCLUDED.interaction_to_next_paint_ms,
cumulative_layout_shift = EXCLUDED.cumulative_layout_shift,
has_og_tags = EXCLUDED.has_og_tags,
og_title = EXCLUDED.og_title,
og_description = EXCLUDED.og_description,
og_image = EXCLUDED.og_image,
has_twitter_cards = EXCLUDED.has_twitter_cards,
html_lang = EXCLUDED.html_lang,
has_hreflang = EXCLUDED.has_hreflang,
word_count_homepage = EXCLUDED.word_count_homepage,
seo_audit_version = EXCLUDED.seo_audit_version,
seo_audited_at = EXCLUDED.seo_audited_at,
seo_audit_errors = EXCLUDED.seo_audit_errors,
seo_overall_score = EXCLUDED.seo_overall_score,
seo_health_score = EXCLUDED.seo_health_score,
seo_issues = EXCLUDED.seo_issues,
local_seo_score = EXCLUDED.local_seo_score,
has_local_business_schema = EXCLUDED.has_local_business_schema,
local_business_schema_fields = EXCLUDED.local_business_schema_fields,
nap_on_website = EXCLUDED.nap_on_website,
has_google_maps_embed = EXCLUDED.has_google_maps_embed,
has_local_keywords = EXCLUDED.has_local_keywords,
local_keywords_found = EXCLUDED.local_keywords_found,
citations_found = EXCLUDED.citations_found,
citations_count = EXCLUDED.citations_count,
content_freshness_score = EXCLUDED.content_freshness_score,
last_content_update = EXCLUDED.last_content_update
""")
# Check SSL certificate
ssl_info = {'has_ssl': False, 'ssl_expires_at': None, 'ssl_issuer': None}
website_url = result.get('website_url', '')
try:
from urllib.parse import urlparse
parsed = urlparse(website_url or result.get('final_url', ''))
domain = parsed.hostname
if domain:
ctx = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
ssl_info['has_ssl'] = True
not_after = cert.get('notAfter')
if not_after:
ssl_info['ssl_expires_at'] = datetime.strptime(
not_after, '%b %d %H:%M:%S %Y %Z'
).date()
issuer = cert.get('issuer')
if issuer:
issuer_dict = {}
for item in issuer:
for key, value in item:
issuer_dict[key] = value
ssl_info['ssl_issuer'] = (
issuer_dict.get('organizationName') or issuer_dict.get('commonName') or ''
)[:100]
except Exception:
pass # SSL check failed — has_ssl stays False
# Build issues list from errors
issues = []
for error in result.get('errors', []):
issues.append({
'severity': 'error',
'message': error,
})
# Get first H1 text
h1_texts = headings.get('h1_texts', [])
h1_text = h1_texts[0] if h1_texts else None
session.execute(upsert_query, {
'company_id': company_id,
'analyzed_at': result['audit_date'],
'website_url': result.get('website_url'),
'final_url': result.get('final_url'),
'http_status_code': result.get('http_status'),
'load_time_ms': result.get('load_time_ms'),
# PageSpeed scores
'pagespeed_seo_score': ps_scores.get('seo'),
'pagespeed_performance_score': ps_scores.get('performance'),
'pagespeed_accessibility_score': ps_scores.get('accessibility'),
'pagespeed_best_practices_score': ps_scores.get('best_practices'),
'pagespeed_audits': json.dumps(pagespeed.get('audits', {})) if pagespeed else None,
# On-page SEO
'meta_title': meta_tags.get('title', '')[:500] if meta_tags.get('title') else None,
'meta_description': meta_tags.get('description'),
'meta_keywords': meta_tags.get('keywords'),
'h1_count': headings.get('h1_count'),
'h2_count': headings.get('h2_count'),
'h3_count': headings.get('h3_count'),
'h1_text': h1_text[:500] if h1_text else None,
'total_images': images.get('total_images'),
'images_without_alt': images.get('images_without_alt'),
'images_with_alt': images.get('images_with_alt'),
'internal_links_count': links.get('internal_links'),
'external_links_count': links.get('external_links'),
'broken_links_count': links.get('broken_links'), # May be None if not checked
'has_structured_data': structured_data.get('has_structured_data', False),
'structured_data_types': structured_data.get('all_types', []),
'structured_data_json': json.dumps(structured_data.get('json_ld_data', [])) if structured_data.get('json_ld_data') else None,
# Technical SEO
'has_canonical': canonical.get('has_canonical', False),
'canonical_url': canonical.get('canonical_url', '')[:500] if canonical.get('canonical_url') else None,
'is_indexable': indexability.get('is_indexable', True),
'noindex_reason': indexability.get('noindex_source'),
'has_sitemap': sitemap.get('exists', False),
'has_robots_txt': robots.get('exists', False),
# Viewport and mobile-friendliness derived from meta_tags
'viewport_configured': bool(meta_tags.get('viewport')),
'is_mobile_friendly': 'width=device-width' in (meta_tags.get('viewport') or '').lower(),
# SSL
'has_ssl': ssl_info['has_ssl'],
'ssl_expires_at': ssl_info['ssl_expires_at'],
'ssl_issuer': ssl_info['ssl_issuer'],
# Core Web Vitals
'largest_contentful_paint_ms': cwv.get('lcp_ms'),
'interaction_to_next_paint_ms': cwv.get('inp_ms'),
'cumulative_layout_shift': cwv.get('cls'),
# Open Graph
'has_og_tags': bool(og.get('og_title')),
'og_title': og.get('og_title', '')[:500] if og.get('og_title') else None,
'og_description': og.get('og_description'),
'og_image': og.get('og_image', '')[:500] if og.get('og_image') else None,
'has_twitter_cards': bool(tc.get('card_type')),
# Language & International
'html_lang': onpage.get('lang_attribute', '')[:10] if onpage.get('lang_attribute') else None,
'has_hreflang': onpage.get('has_hreflang', False), # Detected by analyzer if present
# Word count
'word_count_homepage': onpage.get('word_count'),
# Audit metadata
'seo_audit_version': result.get('audit_version'),
'seo_audited_at': result['audit_date'],
'seo_audit_errors': result.get('errors', []),
'seo_overall_score': result.get('scores', {}).get('overall_seo'),
'seo_health_score': self._calculate_onpage_score(onpage) if onpage else None,
'seo_issues': json.dumps(issues) if issues else None,
# Local SEO
'local_seo_score': (result.get('local_seo') or {}).get('local_seo_score'),
'has_local_business_schema': (result.get('local_seo') or {}).get('has_local_business_schema', False),
'local_business_schema_fields': json.dumps((result.get('local_seo') or {}).get('local_business_schema_fields', {})),
'nap_on_website': json.dumps((result.get('local_seo') or {}).get('nap_on_website', {})),
'has_google_maps_embed': (result.get('local_seo') or {}).get('has_google_maps_embed', False),
'has_local_keywords': (result.get('local_seo') or {}).get('has_local_keywords', False),
'local_keywords_found': json.dumps((result.get('local_seo') or {}).get('local_keywords_found', [])),
# Citations
'citations_found': json.dumps(result.get('citations', [])),
'citations_count': sum(1 for c in result.get('citations', []) if c.get('status') == 'found'),
# Freshness
'content_freshness_score': (result.get('freshness') or {}).get('content_freshness_score'),
'last_content_update': (result.get('freshness') or {}).get('last_content_update'),
})
# Save individual citations
for citation in result.get('citations', []):
if citation.get('directory_name'):
citation_upsert = text("""
INSERT INTO company_citations (
company_id, directory_name, directory_url, listing_url,
status, checked_at
) VALUES (
:company_id, :directory_name, :directory_url, :listing_url,
:status, NOW()
)
ON CONFLICT (company_id, directory_name) DO UPDATE SET
listing_url = EXCLUDED.listing_url,
status = EXCLUDED.status,
checked_at = NOW()
""")
session.execute(citation_upsert, {
'company_id': company_id,
'directory_name': citation['directory_name'],
'directory_url': citation.get('directory_url'),
'listing_url': citation.get('listing_url'),
'status': citation.get('status', 'unknown'),
})
session.commit()
logger.info(f" Saved SEO audit for company {company_id}")
return True
except Exception as e:
logger.error(f"Failed to save audit result for company {result.get('company_id')}: {e}")
return False
def run_audit(self, company_ids: Optional[List[int]] = None,
batch_start: Optional[int] = None,
batch_end: Optional[int] = None,
dry_run: bool = False) -> Dict[str, Any]:
"""
Run SEO audit for specified companies.
Args:
company_ids: List of specific company IDs to audit.
batch_start: Start index for batch processing.
batch_end: End index for batch processing.
dry_run: If True, print results without saving to database.
Returns:
Summary dict with success/failed counts and results.
"""
start_time = time_module.time()
companies = self.get_companies(company_ids, batch_start, batch_end)
if not companies:
logger.warning("No companies found matching the specified criteria")
return {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'no_website': 0,
'unavailable': 0,
'timeout': 0,
'quota_remaining': self.pagespeed_client.get_remaining_quota(),
'duration_seconds': 0,
'results': [],
}
summary = {
'total': len(companies),
'success': 0,
'failed': 0,
'skipped': 0,
'no_website': 0, # Companies without website URL
'unavailable': 0, # Websites that returned 4xx/5xx
'timeout': 0, # Websites that timed out
'ssl_errors': 0, # SSL certificate issues
'connection_errors': 0, # Connection refused/DNS errors
'quota_exceeded': False,
'quota_remaining': self.pagespeed_client.get_remaining_quota(),
'quota_start': self.pagespeed_client.get_remaining_quota(),
'results': [],
}
logger.info("=" * 60)
logger.info(f"SEO AUDIT STARTING")
logger.info("=" * 60)
logger.info(f"Companies to audit: {len(companies)}")
logger.info(f"Mode: {'DRY RUN (no database writes)' if dry_run else 'LIVE'}")
logger.info(f"PageSpeed API quota remaining: {summary['quota_remaining']}")
logger.info("=" * 60)
for i, company in enumerate(companies, 1):
# Progress estimation
elapsed = time_module.time() - start_time
if i > 1:
avg_time_per_company = elapsed / (i - 1)
remaining_companies = len(companies) - i + 1
eta_seconds = avg_time_per_company * remaining_companies
eta_str = str(timedelta(seconds=int(eta_seconds)))
else:
eta_str = "calculating..."
logger.info("")
logger.info(f"[{i}/{len(companies)}] {company['name']} (ID: {company['id']}) - ETA: {eta_str}")
# Check for quota before proceeding
current_quota = self.pagespeed_client.get_remaining_quota()
if current_quota <= 0:
logger.warning(f" PageSpeed quota exhausted, skipping PageSpeed analysis")
summary['quota_exceeded'] = True
try:
result = self.audit_company(company)
# Categorize the result based on errors
result_status = self._categorize_result(result)
if result_status == 'no_website':
summary['no_website'] += 1
summary['skipped'] += 1
logger.info(f" → SKIPPED: No website URL configured")
elif result_status == 'unavailable':
summary['unavailable'] += 1
summary['failed'] += 1
logger.warning(f" → UNAVAILABLE: HTTP {result.get('http_status')}")
elif result_status == 'timeout':
summary['timeout'] += 1
summary['failed'] += 1
logger.warning(f" → TIMEOUT: Website did not respond")
elif result_status == 'ssl_error':
summary['ssl_errors'] += 1
# Still count as success if we got data via HTTP fallback
if result.get('onpage'):
summary['success'] += 1
logger.info(f" → SUCCESS (with SSL warning)")
else:
summary['failed'] += 1
logger.warning(f" → FAILED: SSL error, no fallback data")
elif result_status == 'connection_error':
summary['connection_errors'] += 1
summary['failed'] += 1
logger.warning(f" → FAILED: Connection error")
else:
summary['success'] += 1
score = result.get('scores', {}).get('overall_seo')
logger.info(f" → SUCCESS: Overall SEO score: {score}")
# Save to database or print in dry-run mode
if not dry_run:
if result_status not in ('no_website',):
if self.save_audit_result(result):
logger.debug(f" Saved to database")
else:
logger.error(f" Failed to save to database")
else:
self._print_dry_run_result(company, result)
# Build result entry
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': result_status,
'overall_score': result.get('scores', {}).get('overall_seo'),
'pagespeed_seo': result.get('scores', {}).get('pagespeed_seo'),
'http_status': result.get('http_status'),
'load_time_ms': result.get('load_time_ms'),
'errors_count': len(result.get('errors', [])),
'errors': result.get('errors', []),
})
except QuotaExceededError:
logger.error(f" PageSpeed API quota exceeded!")
summary['quota_exceeded'] = True
summary['skipped'] += 1
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': 'quota_exceeded',
'error': 'PageSpeed API quota exceeded',
})
except Exception as e:
logger.error(f" Unexpected error: {e}")
summary['failed'] += 1
summary['results'].append({
'company_id': company['id'],
'company_name': company['name'],
'status': 'error',
'error': str(e),
})
# Final summary
summary['quota_remaining'] = self.pagespeed_client.get_remaining_quota()
summary['quota_used'] = summary['quota_start'] - summary['quota_remaining']
summary['duration_seconds'] = int(time_module.time() - start_time)
return summary
def _categorize_result(self, result: Dict[str, Any]) -> str:
"""
Categorize audit result based on errors encountered.
Returns one of: 'success', 'no_website', 'unavailable', 'timeout',
'ssl_error', 'connection_error', 'error'
"""
errors = result.get('errors', [])
error_text = ' '.join(errors).lower()
# No website URL
if 'no website url' in error_text:
return 'no_website'
# Timeout
if 'timeout' in error_text:
return 'timeout'
# Connection errors
if 'connection error' in error_text or 'connection refused' in error_text:
return 'connection_error'
# SSL errors (without successful fallback)
if 'ssl error' in error_text:
return 'ssl_error'
# HTTP errors (4xx, 5xx)
http_status = result.get('http_status')
if http_status and http_status >= 400:
return 'unavailable'
# If we have errors but also have data, it's partial success
if errors and not result.get('onpage') and not result.get('technical'):
return 'error'
return 'success'
def _print_dry_run_result(self, company: Dict, result: Dict[str, Any]) -> None:
"""Print formatted result in dry-run mode."""
print("\n" + "-" * 60)
print(f"Company: {company['name']} (ID: {company['id']})")
print(f"Website: {result.get('website_url') or 'Not configured'}")
if result.get('http_status'):
print(f"HTTP Status: {result.get('http_status')}")
if result.get('load_time_ms'):
print(f"Load Time: {result.get('load_time_ms')}ms")
if result.get('final_url') and result.get('final_url') != result.get('website_url'):
print(f"Final URL (after redirects): {result.get('final_url')}")
scores = result.get('scores', {})
if any(scores.values()):
print(f"\nScores:")
if scores.get('overall_seo') is not None:
print(f" Overall SEO: {scores.get('overall_seo')}")
if scores.get('pagespeed_seo') is not None:
print(f" PageSpeed SEO: {scores.get('pagespeed_seo')}")
if scores.get('pagespeed_performance') is not None:
print(f" PageSpeed Performance: {scores.get('pagespeed_performance')}")
if scores.get('pagespeed_accessibility') is not None:
print(f" PageSpeed Accessibility: {scores.get('pagespeed_accessibility')}")
if scores.get('pagespeed_best_practices') is not None:
print(f" PageSpeed Best Practices: {scores.get('pagespeed_best_practices')}")
# On-page summary
onpage = result.get('onpage', {})
if onpage:
print(f"\nOn-Page SEO:")
meta = onpage.get('meta_tags', {})
if meta.get('title'):
print(f" Title: {meta.get('title')[:60]}...")
headings = onpage.get('headings', {})
print(f" H1 count: {headings.get('h1_count', 0)}")
images = onpage.get('images', {})
if images.get('total_images'):
print(f" Images: {images.get('total_images')} total, {images.get('images_without_alt', 0)} missing alt")
structured = onpage.get('structured_data', {})
print(f" Structured Data: {'Yes' if structured.get('has_structured_data') else 'No'}")
# Technical SEO summary
technical = result.get('technical', {})
if technical:
print(f"\nTechnical SEO:")
robots = technical.get('robots_txt', {})
print(f" robots.txt: {'Yes' if robots.get('exists') else 'No'}")
sitemap = technical.get('sitemap', {})
print(f" sitemap.xml: {'Yes' if sitemap.get('exists') else 'No'}")
indexability = technical.get('indexability', {})
print(f" Indexable: {'Yes' if indexability.get('is_indexable', True) else 'No'}")
if result.get('errors'):
print(f"\nIssues ({len(result['errors'])}):")
for err in result['errors'][:5]: # Show first 5 errors
print(f"{err}")
if len(result['errors']) > 5:
print(f" ... and {len(result['errors']) - 5} more")
print("-" * 60)
def parse_batch_argument(batch_str: str) -> Tuple[int, int]:
"""
Parse batch argument in format 'START-END'.
Args:
batch_str: String like '1-10' or '5-20'
Returns:
Tuple of (start, end) integers
Raises:
ValueError: If format is invalid
"""
if '-' not in batch_str:
raise ValueError(f"Invalid batch format '{batch_str}'. Use START-END (e.g., 1-10)")
parts = batch_str.split('-')
if len(parts) != 2:
raise ValueError(f"Invalid batch format '{batch_str}'. Use START-END (e.g., 1-10)")
try:
start = int(parts[0].strip())
end = int(parts[1].strip())
except ValueError:
raise ValueError(f"Invalid batch values '{batch_str}'. START and END must be numbers")
if start < 1:
raise ValueError(f"Invalid batch start '{start}'. Must be >= 1")
if end < start:
raise ValueError(f"Invalid batch range '{start}-{end}'. END must be >= START")
return start, end
def print_summary(summary: Dict[str, Any], dry_run: bool = False) -> None:
"""Print formatted audit summary."""
duration = summary.get('duration_seconds', 0)
duration_str = str(timedelta(seconds=duration))
print("\n")
print("=" * 70)
print(" SEO AUDIT COMPLETE")
print("=" * 70)
print("")
print(f" Mode: {'DRY RUN' if dry_run else 'LIVE'}")
print(f" Duration: {duration_str}")
print("")
print("-" * 70)
print(" RESULTS BREAKDOWN")
print("-" * 70)
print(f" Total companies: {summary['total']}")
print(f" ✓ Successful: {summary['success']}")
print(f" ✗ Failed: {summary['failed']}")
print(f" ○ Skipped: {summary['skipped']}")
print("")
# Edge case breakdown
if summary.get('no_website', 0) > 0:
print(f" - No website: {summary['no_website']}")
if summary.get('unavailable', 0) > 0:
print(f" - Unavailable: {summary['unavailable']}")
if summary.get('timeout', 0) > 0:
print(f" - Timeout: {summary['timeout']}")
if summary.get('ssl_errors', 0) > 0:
print(f" - SSL errors: {summary['ssl_errors']}")
if summary.get('connection_errors', 0) > 0:
print(f" - Connection errors: {summary['connection_errors']}")
print("")
print("-" * 70)
print(" PAGESPEED API QUOTA")
print("-" * 70)
print(f" Quota at start: {summary.get('quota_start', 'N/A')}")
print(f" Quota used: {summary.get('quota_used', 'N/A')}")
print(f" Quota remaining: {summary.get('quota_remaining', 'N/A')}")
if summary.get('quota_exceeded'):
print(" ⚠ WARNING: Quota was exceeded during this run!")
# Score distribution
results = summary.get('results', [])
scores = [r.get('overall_score') for r in results if r.get('overall_score') is not None]
if scores:
avg_score = sum(scores) / len(scores)
print("")
print("-" * 70)
print(" SEO SCORE DISTRIBUTION")
print("-" * 70)
print(f" Companies with scores: {len(scores)}")
print(f" Average SEO score: {avg_score:.1f}")
print(f" Highest score: {max(scores)}")
print(f" Lowest score: {min(scores)}")
print("")
# Score ranges with visual bars
excellent = sum(1 for s in scores if s >= 90)
good = sum(1 for s in scores if 70 <= s < 90)
fair = sum(1 for s in scores if 50 <= s < 70)
poor = sum(1 for s in scores if s < 50)
max_bar = 30
total = len(scores)
def bar(count, total, max_bar=30):
if total == 0:
return ""
width = int((count / total) * max_bar)
return "" * width + "" * (max_bar - width)
print(f" Excellent (90-100): {excellent:3d} {bar(excellent, total)}")
print(f" Good (70-89): {good:3d} {bar(good, total)}")
print(f" Fair (50-69): {fair:3d} {bar(fair, total)}")
print(f" Poor (<50): {poor:3d} {bar(poor, total)}")
# List failed companies
failed_results = [r for r in results if r.get('status') in ('unavailable', 'timeout', 'connection_error', 'error')]
if failed_results:
print("")
print("-" * 70)
print(" FAILED AUDITS")
print("-" * 70)
for r in failed_results[:10]: # Show first 10
status_icon = {
'unavailable': '🔴',
'timeout': '',
'connection_error': '🔌',
'error': '',
}.get(r['status'], '?')
errors = r.get('errors', [])
error_msg = errors[0][:50] if errors else r.get('status', 'Unknown')
print(f" {status_icon} {r['company_name'][:30]:<30} - {error_msg}")
if len(failed_results) > 10:
print(f" ... and {len(failed_results) - 10} more")
print("")
print("=" * 70)
def main():
"""Main entry point for CLI usage."""
parser = argparse.ArgumentParser(
description='SEO Audit for Norda Biznes member websites',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python seo_audit.py --company-id 26 # Audit single company
python seo_audit.py --batch 1-10 # Audit companies 1-10
python seo_audit.py --all # Audit all companies
python seo_audit.py --company-id 26 --dry-run # Test without saving
python seo_audit.py --all --json > report.json # Export to JSON
Exit codes:
0 - All audits completed successfully
1 - Argument error or invalid input
2 - Partial failures (some audits failed)
3 - All audits failed
4 - Database connection error
5 - API quota exceeded
"""
)
# Selection arguments (mutually exclusive in practice)
selection = parser.add_argument_group('Company Selection (choose one)')
selection.add_argument('--company-id', type=int, metavar='ID',
help='Audit single company by ID')
selection.add_argument('--company-ids', type=str, metavar='IDS',
help='Audit multiple companies by IDs (comma-separated, e.g., 1,5,10)')
selection.add_argument('--batch', type=str, metavar='RANGE',
help='Audit batch of companies by row offset (e.g., 1-10)')
selection.add_argument('--all', action='store_true',
help='Audit all companies')
# Options
options = parser.add_argument_group('Options')
options.add_argument('--dry-run', action='store_true',
help='Print results without saving to database')
options.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose/debug output')
options.add_argument('--quiet', '-q', action='store_true',
help='Suppress progress output (only show summary)')
options.add_argument('--json', action='store_true',
help='Output results as JSON (for scripting)')
options.add_argument('--database-url', type=str, metavar='URL',
help='Database connection URL (overrides DATABASE_URL env var)')
args = parser.parse_args()
# Configure logging level
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
elif args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Validate that at least one selection method is provided
selection_count = sum([
args.company_id is not None,
args.company_ids is not None,
args.batch is not None,
args.all
])
if selection_count == 0:
parser.print_help()
print("\n❌ Error: Please specify one of --company-id, --company-ids, --batch, or --all")
sys.exit(EXIT_ARGUMENT_ERROR)
if selection_count > 1:
print("❌ Error: Please specify only one selection method (--company-id, --company-ids, --batch, or --all)")
sys.exit(EXIT_ARGUMENT_ERROR)
# Parse batch argument if provided
batch_start, batch_end = None, None
if args.batch:
try:
batch_start, batch_end = parse_batch_argument(args.batch)
except ValueError as e:
print(f"❌ Error: {e}")
sys.exit(EXIT_ARGUMENT_ERROR)
# Parse company IDs if provided
company_ids = None
if args.company_id:
company_ids = [args.company_id]
elif args.company_ids:
try:
company_ids = [int(x.strip()) for x in args.company_ids.split(',')]
if not company_ids:
raise ValueError("Empty list")
except ValueError:
print(f"❌ Error: Invalid --company-ids format. Use comma-separated integers (e.g., 1,5,10)")
sys.exit(EXIT_ARGUMENT_ERROR)
# Determine database URL
database_url = args.database_url or DATABASE_URL
# Initialize auditor
try:
auditor = SEOAuditor(database_url=database_url)
except SQLAlchemyError as e:
logger.error(f"Failed to connect to database: {e}")
print(f"❌ Error: Database connection failed: {e}")
sys.exit(EXIT_DATABASE_ERROR)
except Exception as e:
logger.error(f"Failed to initialize auditor: {e}")
print(f"❌ Error: Failed to initialize SEO auditor: {e}")
sys.exit(EXIT_DATABASE_ERROR)
# Run audit
try:
summary = auditor.run_audit(
company_ids=company_ids,
batch_start=batch_start,
batch_end=batch_end,
dry_run=args.dry_run
)
except QuotaExceededError:
logger.error("PageSpeed API quota exceeded")
print("❌ Error: PageSpeed API quota exceeded. Try again tomorrow.")
sys.exit(EXIT_QUOTA_EXCEEDED)
except SQLAlchemyError as e:
logger.error(f"Database error during audit: {e}")
print(f"❌ Error: Database error: {e}")
sys.exit(EXIT_DATABASE_ERROR)
except Exception as e:
logger.error(f"Unexpected error during audit: {e}")
print(f"❌ Error: Unexpected error: {e}")
sys.exit(EXIT_ALL_FAILED)
# Output results
if args.json:
print(json.dumps(summary, default=str, indent=2))
else:
print_summary(summary, dry_run=args.dry_run)
# Determine exit code
if summary['total'] == 0:
logger.warning("No companies found to audit")
sys.exit(EXIT_ARGUMENT_ERROR)
elif summary.get('quota_exceeded'):
sys.exit(EXIT_QUOTA_EXCEEDED)
elif summary['failed'] == summary['total'] - summary['skipped']:
sys.exit(EXIT_ALL_FAILED)
elif summary['failed'] > 0:
sys.exit(EXIT_PARTIAL_FAILURES)
else:
sys.exit(EXIT_SUCCESS)
if __name__ == '__main__':
main()