nordabiz/scripts/pagespeed_client.py
Maciej Pienczyn 3c67968505
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: resolve pagespeed quota permission error and add lxml parser
Move quota file from scripts/ to /tmp/ (writable by gunicorn process).
Add lxml to requirements for faster, more reliable HTML parsing in SEO audits.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 20:49:21 +01:00

742 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Google PageSpeed Insights API Client
=====================================
Client for interacting with Google PageSpeed Insights API with built-in:
- Rate limiting (25,000 requests/day free tier)
- Exponential backoff retry logic
- Comprehensive error handling
Usage:
from pagespeed_client import GooglePageSpeedClient
client = GooglePageSpeedClient()
result = client.analyze_url('https://example.com')
Author: Claude Code
Date: 2026-01-08
"""
import os
import json
import time
import logging
from datetime import datetime, date
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field, asdict
from enum import Enum
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# API Configuration
PAGESPEED_API_URL = 'https://www.googleapis.com/pagespeedonline/v5/runPagespeed'
PAGESPEED_API_KEY = os.getenv('GOOGLE_PAGESPEED_API_KEY', '')
# Rate limiting configuration
DAILY_QUOTA_LIMIT = 25000 # Free tier limit
REQUESTS_PER_MINUTE = 60 # Conservative limit to avoid bursts
MIN_REQUEST_INTERVAL = 1.0 # Minimum seconds between requests
# Retry configuration
MAX_RETRIES = 3
INITIAL_BACKOFF = 1.0 # Initial backoff in seconds
MAX_BACKOFF = 60.0 # Maximum backoff in seconds
BACKOFF_MULTIPLIER = 2.0
# Request configuration
REQUEST_TIMEOUT = 60 # PageSpeed analysis can take a while
USER_AGENT = 'NordaBiznes-SEO-Auditor/1.0'
class Strategy(Enum):
"""PageSpeed analysis strategy (device type)."""
MOBILE = 'mobile'
DESKTOP = 'desktop'
class Category(Enum):
"""PageSpeed Lighthouse audit categories."""
PERFORMANCE = 'performance'
ACCESSIBILITY = 'accessibility'
BEST_PRACTICES = 'best-practices'
SEO = 'seo'
@dataclass
class PageSpeedScore:
"""Container for PageSpeed Lighthouse scores."""
performance: Optional[int] = None
accessibility: Optional[int] = None
best_practices: Optional[int] = None
seo: Optional[int] = None
def to_dict(self) -> Dict[str, Optional[int]]:
return asdict(self)
@dataclass
class CoreWebVitals:
"""Core Web Vitals metrics from PageSpeed."""
lcp_ms: Optional[int] = None # Largest Contentful Paint
inp_ms: Optional[int] = None # Interaction to Next Paint (replaced FID March 2024)
cls: Optional[float] = None # Cumulative Layout Shift
fcp_ms: Optional[int] = None # First Contentful Paint
ttfb_ms: Optional[int] = None # Time to First Byte
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class PageSpeedResult:
"""Complete PageSpeed analysis result."""
url: str
final_url: str
strategy: str
analyzed_at: datetime
scores: PageSpeedScore
core_web_vitals: CoreWebVitals
audits: Dict[str, Any] = field(default_factory=dict)
lighthouse_version: Optional[str] = None
fetch_time_ms: Optional[int] = None
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
result = {
'url': self.url,
'final_url': self.final_url,
'strategy': self.strategy,
'analyzed_at': self.analyzed_at.isoformat() if self.analyzed_at else None,
'scores': self.scores.to_dict(),
'core_web_vitals': self.core_web_vitals.to_dict(),
'audits': self.audits,
'lighthouse_version': self.lighthouse_version,
'fetch_time_ms': self.fetch_time_ms,
'error': self.error,
}
return result
class RateLimiter:
"""
Simple rate limiter with daily quota tracking.
Persists quota usage to a JSON file to track usage across script runs.
"""
def __init__(self, daily_limit: int = DAILY_QUOTA_LIMIT,
min_interval: float = MIN_REQUEST_INTERVAL,
quota_file: Optional[str] = None):
self.daily_limit = daily_limit
self.min_interval = min_interval
self.last_request_time: Optional[float] = None
# Quota persistence file
if quota_file:
self.quota_file = Path(quota_file)
else:
# Default to /tmp (writable by any user, resets on reboot which is fine for daily quota)
self.quota_file = Path('/tmp/.pagespeed_quota.json')
self._load_quota()
def _load_quota(self) -> None:
"""Load quota usage from persistent storage."""
self.today = date.today().isoformat()
self.requests_today = 0
if self.quota_file.exists():
try:
with open(self.quota_file, 'r') as f:
data = json.load(f)
if data.get('date') == self.today:
self.requests_today = data.get('requests', 0)
else:
# New day, reset counter
self._save_quota()
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load quota file: {e}")
self._save_quota()
else:
self._save_quota()
def _save_quota(self) -> None:
"""Persist quota usage to file."""
try:
with open(self.quota_file, 'w') as f:
json.dump({
'date': self.today,
'requests': self.requests_today,
'limit': self.daily_limit,
}, f)
except IOError as e:
logger.warning(f"Failed to save quota file: {e}")
def can_make_request(self) -> bool:
"""Check if we can make another request."""
# Check daily quota
if self.requests_today >= self.daily_limit:
return False
return True
def wait_if_needed(self) -> None:
"""Wait if necessary to respect rate limits."""
if self.last_request_time is not None:
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
sleep_time = self.min_interval - elapsed
logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s")
time.sleep(sleep_time)
def record_request(self) -> None:
"""Record that a request was made."""
self.last_request_time = time.time()
self.requests_today += 1
# Reset date if it's a new day
today = date.today().isoformat()
if today != self.today:
self.today = today
self.requests_today = 1
self._save_quota()
logger.debug(f"Quota: {self.requests_today}/{self.daily_limit} requests today")
def get_remaining_quota(self) -> int:
"""Get remaining requests for today."""
return max(0, self.daily_limit - self.requests_today)
def get_usage_stats(self) -> Dict[str, Any]:
"""Get current usage statistics."""
return {
'date': self.today,
'requests_today': self.requests_today,
'daily_limit': self.daily_limit,
'remaining': self.get_remaining_quota(),
'usage_percent': round(self.requests_today / self.daily_limit * 100, 1),
}
class PageSpeedAPIError(Exception):
"""Base exception for PageSpeed API errors."""
pass
class QuotaExceededError(PageSpeedAPIError):
"""Raised when daily quota is exceeded."""
pass
class RateLimitError(PageSpeedAPIError):
"""Raised when API returns 429 Too Many Requests."""
pass
class GooglePageSpeedClient:
"""
Client for Google PageSpeed Insights API.
Features:
- Rate limiting with daily quota tracking
- Exponential backoff retry for transient errors
- Comprehensive error handling
- Support for both mobile and desktop analysis
Usage:
client = GooglePageSpeedClient()
# Analyze a single URL
result = client.analyze_url('https://example.com')
# Analyze with both mobile and desktop
results = client.analyze_url_both_strategies('https://example.com')
# Check quota before batch processing
if client.get_remaining_quota() >= 80:
# Process all 80 companies
pass
"""
def __init__(self, api_key: Optional[str] = None,
rate_limiter: Optional[RateLimiter] = None):
"""
Initialize PageSpeed client.
Args:
api_key: Google PageSpeed API key. If not provided, uses
GOOGLE_PAGESPEED_API_KEY environment variable.
rate_limiter: Optional custom rate limiter instance.
"""
self.api_key = api_key or PAGESPEED_API_KEY
if not self.api_key:
logger.warning(
"No API key provided. PageSpeed API will work but with "
"stricter rate limits. Set GOOGLE_PAGESPEED_API_KEY env var."
)
self.rate_limiter = rate_limiter or RateLimiter()
self.session = requests.Session()
self.session.headers.update({'User-Agent': USER_AGENT})
def analyze_url(self, url: str,
strategy: Strategy = Strategy.MOBILE,
categories: Optional[List[Category]] = None) -> PageSpeedResult:
"""
Analyze a URL using PageSpeed Insights API.
Args:
url: The URL to analyze.
strategy: Device strategy (mobile or desktop).
categories: List of categories to analyze. Defaults to all.
Returns:
PageSpeedResult with scores and audit details.
Raises:
QuotaExceededError: If daily quota is exhausted.
PageSpeedAPIError: For other API errors.
"""
# Check quota before making request
if not self.rate_limiter.can_make_request():
raise QuotaExceededError(
f"Daily quota of {self.rate_limiter.daily_limit} requests exceeded. "
f"Try again tomorrow or use a different API key."
)
# Default to all categories
if categories is None:
categories = list(Category)
# Build request parameters
params = {
'url': url,
'strategy': strategy.value,
'category': [cat.value for cat in categories],
}
if self.api_key:
params['key'] = self.api_key
# Wait for rate limit
self.rate_limiter.wait_if_needed()
# Make request with retry logic
response = self._make_request_with_retry(params)
# Record successful request
self.rate_limiter.record_request()
# Parse response
return self._parse_response(response, url, strategy)
def analyze_url_both_strategies(self, url: str,
categories: Optional[List[Category]] = None
) -> Dict[str, PageSpeedResult]:
"""
Analyze URL for both mobile and desktop strategies.
Args:
url: The URL to analyze.
categories: List of categories to analyze.
Returns:
Dict with 'mobile' and 'desktop' PageSpeedResult.
"""
results = {}
for strategy in [Strategy.MOBILE, Strategy.DESKTOP]:
try:
results[strategy.value] = self.analyze_url(url, strategy, categories)
except PageSpeedAPIError as e:
logger.error(f"Failed to analyze {url} ({strategy.value}): {e}")
results[strategy.value] = PageSpeedResult(
url=url,
final_url=url,
strategy=strategy.value,
analyzed_at=datetime.now(),
scores=PageSpeedScore(),
core_web_vitals=CoreWebVitals(),
error=str(e),
)
return results
def _make_request_with_retry(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Make API request with exponential backoff retry.
Retries on:
- 429 Too Many Requests
- 5xx Server Errors
- Connection errors
Args:
params: Request parameters.
Returns:
Parsed JSON response.
Raises:
PageSpeedAPIError: If all retries fail.
"""
last_error: Optional[Exception] = None
backoff = INITIAL_BACKOFF
for attempt in range(MAX_RETRIES + 1):
try:
logger.debug(f"API request attempt {attempt + 1}/{MAX_RETRIES + 1}")
response = self.session.get(
PAGESPEED_API_URL,
params=params,
timeout=REQUEST_TIMEOUT,
)
# Handle rate limiting (429)
if response.status_code == 429:
retry_after = response.headers.get('Retry-After', backoff)
try:
retry_after = float(retry_after)
except ValueError:
retry_after = backoff
if attempt < MAX_RETRIES:
logger.warning(
f"Rate limited (429). Retrying in {retry_after}s "
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
)
time.sleep(retry_after)
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
continue
else:
raise RateLimitError(
f"Rate limited after {MAX_RETRIES + 1} attempts"
)
# Handle server errors (5xx)
if response.status_code >= 500:
if attempt < MAX_RETRIES:
logger.warning(
f"Server error ({response.status_code}). "
f"Retrying in {backoff}s "
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
)
time.sleep(backoff)
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
continue
else:
raise PageSpeedAPIError(
f"Server error {response.status_code} after "
f"{MAX_RETRIES + 1} attempts"
)
# Handle client errors (4xx except 429)
if response.status_code >= 400:
error_data = response.json().get('error', {})
error_message = error_data.get('message', response.text)
raise PageSpeedAPIError(
f"API error {response.status_code}: {error_message}"
)
# Success
return response.json()
except requests.exceptions.Timeout:
last_error = PageSpeedAPIError(
f"Request timed out after {REQUEST_TIMEOUT}s"
)
if attempt < MAX_RETRIES:
logger.warning(
f"Request timeout. Retrying in {backoff}s "
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
)
time.sleep(backoff)
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
continue
except requests.exceptions.ConnectionError as e:
last_error = PageSpeedAPIError(f"Connection error: {e}")
if attempt < MAX_RETRIES:
logger.warning(
f"Connection error. Retrying in {backoff}s "
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
)
time.sleep(backoff)
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
continue
except requests.exceptions.RequestException as e:
last_error = PageSpeedAPIError(f"Request failed: {e}")
if attempt < MAX_RETRIES:
logger.warning(
f"Request error. Retrying in {backoff}s "
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
)
time.sleep(backoff)
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
continue
# All retries exhausted
raise last_error or PageSpeedAPIError("Request failed after all retries")
def _parse_response(self, data: Dict[str, Any],
original_url: str,
strategy: Strategy) -> PageSpeedResult:
"""
Parse PageSpeed API response into structured result.
Args:
data: Raw API response.
original_url: The URL that was analyzed.
strategy: The analysis strategy used.
Returns:
PageSpeedResult with parsed data.
"""
lighthouse = data.get('lighthouseResult', {})
# Extract scores (0-1 float -> 0-100 int)
categories = lighthouse.get('categories', {})
scores = PageSpeedScore(
performance=self._extract_score(categories.get('performance')),
accessibility=self._extract_score(categories.get('accessibility')),
best_practices=self._extract_score(categories.get('best-practices')),
seo=self._extract_score(categories.get('seo')),
)
# Extract Core Web Vitals
audits = lighthouse.get('audits', {})
core_web_vitals = CoreWebVitals(
lcp_ms=self._extract_metric_ms(audits.get('largest-contentful-paint')),
inp_ms=self._extract_metric_ms(audits.get('interaction-to-next-paint') or audits.get('max-potential-fid')),
cls=self._extract_cls(audits.get('cumulative-layout-shift')),
fcp_ms=self._extract_metric_ms(audits.get('first-contentful-paint')),
ttfb_ms=self._extract_metric_ms(audits.get('server-response-time')),
)
# Extract relevant audits for SEO
seo_audits = self._extract_seo_audits(audits)
# Get timing info
timing = lighthouse.get('timing', {})
fetch_time = timing.get('total')
return PageSpeedResult(
url=original_url,
final_url=lighthouse.get('finalUrl', original_url),
strategy=strategy.value,
analyzed_at=datetime.now(),
scores=scores,
core_web_vitals=core_web_vitals,
audits=seo_audits,
lighthouse_version=lighthouse.get('lighthouseVersion'),
fetch_time_ms=int(fetch_time) if fetch_time else None,
)
def _extract_score(self, category_data: Optional[Dict]) -> Optional[int]:
"""Extract score from category data (0-1 float -> 0-100 int)."""
if not category_data:
return None
score = category_data.get('score')
if score is not None:
return int(round(score * 100))
return None
def _extract_metric_ms(self, audit_data: Optional[Dict]) -> Optional[int]:
"""Extract metric value in milliseconds."""
if not audit_data:
return None
value = audit_data.get('numericValue')
if value is not None:
return int(round(value))
return None
def _extract_cls(self, audit_data: Optional[Dict]) -> Optional[float]:
"""Extract Cumulative Layout Shift value."""
if not audit_data:
return None
value = audit_data.get('numericValue')
if value is not None:
return round(value, 3)
return None
def _extract_seo_audits(self, audits: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract SEO-relevant audits from Lighthouse results.
Returns a dict with audit results organized by category.
"""
seo_audits = {
'meta': {},
'crawlability': {},
'content': {},
'mobile': {},
'performance': {},
}
# Meta tags
meta_audits = [
'document-title',
'meta-description',
'viewport',
'hreflang',
'canonical',
'robots-txt',
]
for audit_id in meta_audits:
if audit_id in audits:
audit = audits[audit_id]
seo_audits['meta'][audit_id] = {
'score': audit.get('score'),
'title': audit.get('title'),
'description': audit.get('description'),
}
# Crawlability
crawl_audits = [
'is-crawlable',
'http-status-code',
'link-text',
'crawlable-anchors',
]
for audit_id in crawl_audits:
if audit_id in audits:
audit = audits[audit_id]
seo_audits['crawlability'][audit_id] = {
'score': audit.get('score'),
'title': audit.get('title'),
}
# Content
content_audits = [
'image-alt',
'structured-data',
'font-size',
'tap-targets',
]
for audit_id in content_audits:
if audit_id in audits:
audit = audits[audit_id]
seo_audits['content'][audit_id] = {
'score': audit.get('score'),
'title': audit.get('title'),
}
# Mobile
mobile_audits = [
'viewport',
'content-width',
]
for audit_id in mobile_audits:
if audit_id in audits:
audit = audits[audit_id]
seo_audits['mobile'][audit_id] = {
'score': audit.get('score'),
'title': audit.get('title'),
}
# Performance (affects SEO)
perf_audits = [
'speed-index',
'interactive',
'total-blocking-time',
]
for audit_id in perf_audits:
if audit_id in audits:
audit = audits[audit_id]
seo_audits['performance'][audit_id] = {
'score': audit.get('score'),
'numericValue': audit.get('numericValue'),
'displayValue': audit.get('displayValue'),
}
return seo_audits
def get_remaining_quota(self) -> int:
"""Get remaining API requests for today."""
return self.rate_limiter.get_remaining_quota()
def get_usage_stats(self) -> Dict[str, Any]:
"""Get API usage statistics."""
return self.rate_limiter.get_usage_stats()
# Convenience function for simple usage
def analyze_url(url: str, strategy: str = 'mobile') -> Dict[str, Any]:
"""
Convenience function to analyze a URL.
Args:
url: The URL to analyze.
strategy: 'mobile' or 'desktop'.
Returns:
Dict with analysis results.
"""
client = GooglePageSpeedClient()
strat = Strategy.MOBILE if strategy == 'mobile' else Strategy.DESKTOP
result = client.analyze_url(url, strat)
return result.to_dict()
if __name__ == '__main__':
# Quick test
import sys
if len(sys.argv) < 2:
print("Usage: python pagespeed_client.py <url>")
print("Example: python pagespeed_client.py https://pixlab.pl")
sys.exit(1)
test_url = sys.argv[1]
print(f"Analyzing: {test_url}")
print("-" * 60)
client = GooglePageSpeedClient()
print(f"API Key: {'Set' if client.api_key else 'Not set (using public API)'}")
print(f"Remaining quota: {client.get_remaining_quota()}")
print("-" * 60)
try:
result = client.analyze_url(test_url)
print(f"URL: {result.url}")
print(f"Final URL: {result.final_url}")
print(f"Strategy: {result.strategy}")
print(f"Analyzed at: {result.analyzed_at}")
print()
print("Scores:")
print(f" Performance: {result.scores.performance}")
print(f" Accessibility: {result.scores.accessibility}")
print(f" Best Practices: {result.scores.best_practices}")
print(f" SEO: {result.scores.seo}")
print()
print("Core Web Vitals:")
print(f" LCP: {result.core_web_vitals.lcp_ms}ms")
print(f" FCP: {result.core_web_vitals.fcp_ms}ms")
print(f" CLS: {result.core_web_vitals.cls}")
print(f" TTFB: {result.core_web_vitals.ttfb_ms}ms")
print()
print(f"Lighthouse version: {result.lighthouse_version}")
print(f"Fetch time: {result.fetch_time_ms}ms")
print()
print(f"Remaining quota: {client.get_remaining_quota()}")
except QuotaExceededError as e:
print(f"ERROR: Quota exceeded - {e}")
sys.exit(1)
except PageSpeedAPIError as e:
print(f"ERROR: API error - {e}")
sys.exit(1)