Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Google replaced First Input Delay (FID) with Interaction to Next Paint (INP) as a Core Web Vital in March 2024. This renames the DB column from first_input_delay_ms to interaction_to_next_paint_ms, updates the PageSpeed client to prefer the INP audit key, and fixes all references across routes, services, scripts, and report generators. Updated INP thresholds: good ≤200ms, needs improvement ≤500ms. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
742 lines
25 KiB
Python
742 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Google PageSpeed Insights API Client
|
|
=====================================
|
|
|
|
Client for interacting with Google PageSpeed Insights API with built-in:
|
|
- Rate limiting (25,000 requests/day free tier)
|
|
- Exponential backoff retry logic
|
|
- Comprehensive error handling
|
|
|
|
Usage:
|
|
from pagespeed_client import GooglePageSpeedClient
|
|
|
|
client = GooglePageSpeedClient()
|
|
result = client.analyze_url('https://example.com')
|
|
|
|
Author: Claude Code
|
|
Date: 2026-01-08
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
from dataclasses import dataclass, field, asdict
|
|
from enum import Enum
|
|
|
|
import requests
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# API Configuration
|
|
PAGESPEED_API_URL = 'https://www.googleapis.com/pagespeedonline/v5/runPagespeed'
|
|
PAGESPEED_API_KEY = os.getenv('GOOGLE_PAGESPEED_API_KEY', '')
|
|
|
|
# Rate limiting configuration
|
|
DAILY_QUOTA_LIMIT = 25000 # Free tier limit
|
|
REQUESTS_PER_MINUTE = 60 # Conservative limit to avoid bursts
|
|
MIN_REQUEST_INTERVAL = 1.0 # Minimum seconds between requests
|
|
|
|
# Retry configuration
|
|
MAX_RETRIES = 3
|
|
INITIAL_BACKOFF = 1.0 # Initial backoff in seconds
|
|
MAX_BACKOFF = 60.0 # Maximum backoff in seconds
|
|
BACKOFF_MULTIPLIER = 2.0
|
|
|
|
# Request configuration
|
|
REQUEST_TIMEOUT = 60 # PageSpeed analysis can take a while
|
|
USER_AGENT = 'NordaBiznes-SEO-Auditor/1.0'
|
|
|
|
|
|
class Strategy(Enum):
|
|
"""PageSpeed analysis strategy (device type)."""
|
|
MOBILE = 'mobile'
|
|
DESKTOP = 'desktop'
|
|
|
|
|
|
class Category(Enum):
|
|
"""PageSpeed Lighthouse audit categories."""
|
|
PERFORMANCE = 'performance'
|
|
ACCESSIBILITY = 'accessibility'
|
|
BEST_PRACTICES = 'best-practices'
|
|
SEO = 'seo'
|
|
|
|
|
|
@dataclass
|
|
class PageSpeedScore:
|
|
"""Container for PageSpeed Lighthouse scores."""
|
|
performance: Optional[int] = None
|
|
accessibility: Optional[int] = None
|
|
best_practices: Optional[int] = None
|
|
seo: Optional[int] = None
|
|
|
|
def to_dict(self) -> Dict[str, Optional[int]]:
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class CoreWebVitals:
|
|
"""Core Web Vitals metrics from PageSpeed."""
|
|
lcp_ms: Optional[int] = None # Largest Contentful Paint
|
|
inp_ms: Optional[int] = None # Interaction to Next Paint (replaced FID March 2024)
|
|
cls: Optional[float] = None # Cumulative Layout Shift
|
|
fcp_ms: Optional[int] = None # First Contentful Paint
|
|
ttfb_ms: Optional[int] = None # Time to First Byte
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class PageSpeedResult:
|
|
"""Complete PageSpeed analysis result."""
|
|
url: str
|
|
final_url: str
|
|
strategy: str
|
|
analyzed_at: datetime
|
|
scores: PageSpeedScore
|
|
core_web_vitals: CoreWebVitals
|
|
audits: Dict[str, Any] = field(default_factory=dict)
|
|
lighthouse_version: Optional[str] = None
|
|
fetch_time_ms: Optional[int] = None
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
result = {
|
|
'url': self.url,
|
|
'final_url': self.final_url,
|
|
'strategy': self.strategy,
|
|
'analyzed_at': self.analyzed_at.isoformat() if self.analyzed_at else None,
|
|
'scores': self.scores.to_dict(),
|
|
'core_web_vitals': self.core_web_vitals.to_dict(),
|
|
'audits': self.audits,
|
|
'lighthouse_version': self.lighthouse_version,
|
|
'fetch_time_ms': self.fetch_time_ms,
|
|
'error': self.error,
|
|
}
|
|
return result
|
|
|
|
|
|
class RateLimiter:
|
|
"""
|
|
Simple rate limiter with daily quota tracking.
|
|
|
|
Persists quota usage to a JSON file to track usage across script runs.
|
|
"""
|
|
|
|
def __init__(self, daily_limit: int = DAILY_QUOTA_LIMIT,
|
|
min_interval: float = MIN_REQUEST_INTERVAL,
|
|
quota_file: Optional[str] = None):
|
|
self.daily_limit = daily_limit
|
|
self.min_interval = min_interval
|
|
self.last_request_time: Optional[float] = None
|
|
|
|
# Quota persistence file
|
|
if quota_file:
|
|
self.quota_file = Path(quota_file)
|
|
else:
|
|
# Default to scripts directory
|
|
self.quota_file = Path(__file__).parent / '.pagespeed_quota.json'
|
|
|
|
self._load_quota()
|
|
|
|
def _load_quota(self) -> None:
|
|
"""Load quota usage from persistent storage."""
|
|
self.today = date.today().isoformat()
|
|
self.requests_today = 0
|
|
|
|
if self.quota_file.exists():
|
|
try:
|
|
with open(self.quota_file, 'r') as f:
|
|
data = json.load(f)
|
|
if data.get('date') == self.today:
|
|
self.requests_today = data.get('requests', 0)
|
|
else:
|
|
# New day, reset counter
|
|
self._save_quota()
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.warning(f"Failed to load quota file: {e}")
|
|
self._save_quota()
|
|
else:
|
|
self._save_quota()
|
|
|
|
def _save_quota(self) -> None:
|
|
"""Persist quota usage to file."""
|
|
try:
|
|
with open(self.quota_file, 'w') as f:
|
|
json.dump({
|
|
'date': self.today,
|
|
'requests': self.requests_today,
|
|
'limit': self.daily_limit,
|
|
}, f)
|
|
except IOError as e:
|
|
logger.warning(f"Failed to save quota file: {e}")
|
|
|
|
def can_make_request(self) -> bool:
|
|
"""Check if we can make another request."""
|
|
# Check daily quota
|
|
if self.requests_today >= self.daily_limit:
|
|
return False
|
|
return True
|
|
|
|
def wait_if_needed(self) -> None:
|
|
"""Wait if necessary to respect rate limits."""
|
|
if self.last_request_time is not None:
|
|
elapsed = time.time() - self.last_request_time
|
|
if elapsed < self.min_interval:
|
|
sleep_time = self.min_interval - elapsed
|
|
logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s")
|
|
time.sleep(sleep_time)
|
|
|
|
def record_request(self) -> None:
|
|
"""Record that a request was made."""
|
|
self.last_request_time = time.time()
|
|
self.requests_today += 1
|
|
|
|
# Reset date if it's a new day
|
|
today = date.today().isoformat()
|
|
if today != self.today:
|
|
self.today = today
|
|
self.requests_today = 1
|
|
|
|
self._save_quota()
|
|
logger.debug(f"Quota: {self.requests_today}/{self.daily_limit} requests today")
|
|
|
|
def get_remaining_quota(self) -> int:
|
|
"""Get remaining requests for today."""
|
|
return max(0, self.daily_limit - self.requests_today)
|
|
|
|
def get_usage_stats(self) -> Dict[str, Any]:
|
|
"""Get current usage statistics."""
|
|
return {
|
|
'date': self.today,
|
|
'requests_today': self.requests_today,
|
|
'daily_limit': self.daily_limit,
|
|
'remaining': self.get_remaining_quota(),
|
|
'usage_percent': round(self.requests_today / self.daily_limit * 100, 1),
|
|
}
|
|
|
|
|
|
class PageSpeedAPIError(Exception):
|
|
"""Base exception for PageSpeed API errors."""
|
|
pass
|
|
|
|
|
|
class QuotaExceededError(PageSpeedAPIError):
|
|
"""Raised when daily quota is exceeded."""
|
|
pass
|
|
|
|
|
|
class RateLimitError(PageSpeedAPIError):
|
|
"""Raised when API returns 429 Too Many Requests."""
|
|
pass
|
|
|
|
|
|
class GooglePageSpeedClient:
|
|
"""
|
|
Client for Google PageSpeed Insights API.
|
|
|
|
Features:
|
|
- Rate limiting with daily quota tracking
|
|
- Exponential backoff retry for transient errors
|
|
- Comprehensive error handling
|
|
- Support for both mobile and desktop analysis
|
|
|
|
Usage:
|
|
client = GooglePageSpeedClient()
|
|
|
|
# Analyze a single URL
|
|
result = client.analyze_url('https://example.com')
|
|
|
|
# Analyze with both mobile and desktop
|
|
results = client.analyze_url_both_strategies('https://example.com')
|
|
|
|
# Check quota before batch processing
|
|
if client.get_remaining_quota() >= 80:
|
|
# Process all 80 companies
|
|
pass
|
|
"""
|
|
|
|
def __init__(self, api_key: Optional[str] = None,
|
|
rate_limiter: Optional[RateLimiter] = None):
|
|
"""
|
|
Initialize PageSpeed client.
|
|
|
|
Args:
|
|
api_key: Google PageSpeed API key. If not provided, uses
|
|
GOOGLE_PAGESPEED_API_KEY environment variable.
|
|
rate_limiter: Optional custom rate limiter instance.
|
|
"""
|
|
self.api_key = api_key or PAGESPEED_API_KEY
|
|
if not self.api_key:
|
|
logger.warning(
|
|
"No API key provided. PageSpeed API will work but with "
|
|
"stricter rate limits. Set GOOGLE_PAGESPEED_API_KEY env var."
|
|
)
|
|
|
|
self.rate_limiter = rate_limiter or RateLimiter()
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': USER_AGENT})
|
|
|
|
def analyze_url(self, url: str,
|
|
strategy: Strategy = Strategy.MOBILE,
|
|
categories: Optional[List[Category]] = None) -> PageSpeedResult:
|
|
"""
|
|
Analyze a URL using PageSpeed Insights API.
|
|
|
|
Args:
|
|
url: The URL to analyze.
|
|
strategy: Device strategy (mobile or desktop).
|
|
categories: List of categories to analyze. Defaults to all.
|
|
|
|
Returns:
|
|
PageSpeedResult with scores and audit details.
|
|
|
|
Raises:
|
|
QuotaExceededError: If daily quota is exhausted.
|
|
PageSpeedAPIError: For other API errors.
|
|
"""
|
|
# Check quota before making request
|
|
if not self.rate_limiter.can_make_request():
|
|
raise QuotaExceededError(
|
|
f"Daily quota of {self.rate_limiter.daily_limit} requests exceeded. "
|
|
f"Try again tomorrow or use a different API key."
|
|
)
|
|
|
|
# Default to all categories
|
|
if categories is None:
|
|
categories = list(Category)
|
|
|
|
# Build request parameters
|
|
params = {
|
|
'url': url,
|
|
'strategy': strategy.value,
|
|
'category': [cat.value for cat in categories],
|
|
}
|
|
|
|
if self.api_key:
|
|
params['key'] = self.api_key
|
|
|
|
# Wait for rate limit
|
|
self.rate_limiter.wait_if_needed()
|
|
|
|
# Make request with retry logic
|
|
response = self._make_request_with_retry(params)
|
|
|
|
# Record successful request
|
|
self.rate_limiter.record_request()
|
|
|
|
# Parse response
|
|
return self._parse_response(response, url, strategy)
|
|
|
|
def analyze_url_both_strategies(self, url: str,
|
|
categories: Optional[List[Category]] = None
|
|
) -> Dict[str, PageSpeedResult]:
|
|
"""
|
|
Analyze URL for both mobile and desktop strategies.
|
|
|
|
Args:
|
|
url: The URL to analyze.
|
|
categories: List of categories to analyze.
|
|
|
|
Returns:
|
|
Dict with 'mobile' and 'desktop' PageSpeedResult.
|
|
"""
|
|
results = {}
|
|
|
|
for strategy in [Strategy.MOBILE, Strategy.DESKTOP]:
|
|
try:
|
|
results[strategy.value] = self.analyze_url(url, strategy, categories)
|
|
except PageSpeedAPIError as e:
|
|
logger.error(f"Failed to analyze {url} ({strategy.value}): {e}")
|
|
results[strategy.value] = PageSpeedResult(
|
|
url=url,
|
|
final_url=url,
|
|
strategy=strategy.value,
|
|
analyzed_at=datetime.now(),
|
|
scores=PageSpeedScore(),
|
|
core_web_vitals=CoreWebVitals(),
|
|
error=str(e),
|
|
)
|
|
|
|
return results
|
|
|
|
def _make_request_with_retry(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Make API request with exponential backoff retry.
|
|
|
|
Retries on:
|
|
- 429 Too Many Requests
|
|
- 5xx Server Errors
|
|
- Connection errors
|
|
|
|
Args:
|
|
params: Request parameters.
|
|
|
|
Returns:
|
|
Parsed JSON response.
|
|
|
|
Raises:
|
|
PageSpeedAPIError: If all retries fail.
|
|
"""
|
|
last_error: Optional[Exception] = None
|
|
backoff = INITIAL_BACKOFF
|
|
|
|
for attempt in range(MAX_RETRIES + 1):
|
|
try:
|
|
logger.debug(f"API request attempt {attempt + 1}/{MAX_RETRIES + 1}")
|
|
|
|
response = self.session.get(
|
|
PAGESPEED_API_URL,
|
|
params=params,
|
|
timeout=REQUEST_TIMEOUT,
|
|
)
|
|
|
|
# Handle rate limiting (429)
|
|
if response.status_code == 429:
|
|
retry_after = response.headers.get('Retry-After', backoff)
|
|
try:
|
|
retry_after = float(retry_after)
|
|
except ValueError:
|
|
retry_after = backoff
|
|
|
|
if attempt < MAX_RETRIES:
|
|
logger.warning(
|
|
f"Rate limited (429). Retrying in {retry_after}s "
|
|
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
|
|
)
|
|
time.sleep(retry_after)
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
continue
|
|
else:
|
|
raise RateLimitError(
|
|
f"Rate limited after {MAX_RETRIES + 1} attempts"
|
|
)
|
|
|
|
# Handle server errors (5xx)
|
|
if response.status_code >= 500:
|
|
if attempt < MAX_RETRIES:
|
|
logger.warning(
|
|
f"Server error ({response.status_code}). "
|
|
f"Retrying in {backoff}s "
|
|
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
|
|
)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
continue
|
|
else:
|
|
raise PageSpeedAPIError(
|
|
f"Server error {response.status_code} after "
|
|
f"{MAX_RETRIES + 1} attempts"
|
|
)
|
|
|
|
# Handle client errors (4xx except 429)
|
|
if response.status_code >= 400:
|
|
error_data = response.json().get('error', {})
|
|
error_message = error_data.get('message', response.text)
|
|
raise PageSpeedAPIError(
|
|
f"API error {response.status_code}: {error_message}"
|
|
)
|
|
|
|
# Success
|
|
return response.json()
|
|
|
|
except requests.exceptions.Timeout:
|
|
last_error = PageSpeedAPIError(
|
|
f"Request timed out after {REQUEST_TIMEOUT}s"
|
|
)
|
|
if attempt < MAX_RETRIES:
|
|
logger.warning(
|
|
f"Request timeout. Retrying in {backoff}s "
|
|
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
|
|
)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
continue
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
last_error = PageSpeedAPIError(f"Connection error: {e}")
|
|
if attempt < MAX_RETRIES:
|
|
logger.warning(
|
|
f"Connection error. Retrying in {backoff}s "
|
|
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
|
|
)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
continue
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = PageSpeedAPIError(f"Request failed: {e}")
|
|
if attempt < MAX_RETRIES:
|
|
logger.warning(
|
|
f"Request error. Retrying in {backoff}s "
|
|
f"(attempt {attempt + 1}/{MAX_RETRIES + 1})"
|
|
)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * BACKOFF_MULTIPLIER, MAX_BACKOFF)
|
|
continue
|
|
|
|
# All retries exhausted
|
|
raise last_error or PageSpeedAPIError("Request failed after all retries")
|
|
|
|
def _parse_response(self, data: Dict[str, Any],
|
|
original_url: str,
|
|
strategy: Strategy) -> PageSpeedResult:
|
|
"""
|
|
Parse PageSpeed API response into structured result.
|
|
|
|
Args:
|
|
data: Raw API response.
|
|
original_url: The URL that was analyzed.
|
|
strategy: The analysis strategy used.
|
|
|
|
Returns:
|
|
PageSpeedResult with parsed data.
|
|
"""
|
|
lighthouse = data.get('lighthouseResult', {})
|
|
|
|
# Extract scores (0-1 float -> 0-100 int)
|
|
categories = lighthouse.get('categories', {})
|
|
scores = PageSpeedScore(
|
|
performance=self._extract_score(categories.get('performance')),
|
|
accessibility=self._extract_score(categories.get('accessibility')),
|
|
best_practices=self._extract_score(categories.get('best-practices')),
|
|
seo=self._extract_score(categories.get('seo')),
|
|
)
|
|
|
|
# Extract Core Web Vitals
|
|
audits = lighthouse.get('audits', {})
|
|
core_web_vitals = CoreWebVitals(
|
|
lcp_ms=self._extract_metric_ms(audits.get('largest-contentful-paint')),
|
|
inp_ms=self._extract_metric_ms(audits.get('interaction-to-next-paint') or audits.get('max-potential-fid')),
|
|
cls=self._extract_cls(audits.get('cumulative-layout-shift')),
|
|
fcp_ms=self._extract_metric_ms(audits.get('first-contentful-paint')),
|
|
ttfb_ms=self._extract_metric_ms(audits.get('server-response-time')),
|
|
)
|
|
|
|
# Extract relevant audits for SEO
|
|
seo_audits = self._extract_seo_audits(audits)
|
|
|
|
# Get timing info
|
|
timing = lighthouse.get('timing', {})
|
|
fetch_time = timing.get('total')
|
|
|
|
return PageSpeedResult(
|
|
url=original_url,
|
|
final_url=lighthouse.get('finalUrl', original_url),
|
|
strategy=strategy.value,
|
|
analyzed_at=datetime.now(),
|
|
scores=scores,
|
|
core_web_vitals=core_web_vitals,
|
|
audits=seo_audits,
|
|
lighthouse_version=lighthouse.get('lighthouseVersion'),
|
|
fetch_time_ms=int(fetch_time) if fetch_time else None,
|
|
)
|
|
|
|
def _extract_score(self, category_data: Optional[Dict]) -> Optional[int]:
|
|
"""Extract score from category data (0-1 float -> 0-100 int)."""
|
|
if not category_data:
|
|
return None
|
|
score = category_data.get('score')
|
|
if score is not None:
|
|
return int(round(score * 100))
|
|
return None
|
|
|
|
def _extract_metric_ms(self, audit_data: Optional[Dict]) -> Optional[int]:
|
|
"""Extract metric value in milliseconds."""
|
|
if not audit_data:
|
|
return None
|
|
value = audit_data.get('numericValue')
|
|
if value is not None:
|
|
return int(round(value))
|
|
return None
|
|
|
|
def _extract_cls(self, audit_data: Optional[Dict]) -> Optional[float]:
|
|
"""Extract Cumulative Layout Shift value."""
|
|
if not audit_data:
|
|
return None
|
|
value = audit_data.get('numericValue')
|
|
if value is not None:
|
|
return round(value, 3)
|
|
return None
|
|
|
|
def _extract_seo_audits(self, audits: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract SEO-relevant audits from Lighthouse results.
|
|
|
|
Returns a dict with audit results organized by category.
|
|
"""
|
|
seo_audits = {
|
|
'meta': {},
|
|
'crawlability': {},
|
|
'content': {},
|
|
'mobile': {},
|
|
'performance': {},
|
|
}
|
|
|
|
# Meta tags
|
|
meta_audits = [
|
|
'document-title',
|
|
'meta-description',
|
|
'viewport',
|
|
'hreflang',
|
|
'canonical',
|
|
'robots-txt',
|
|
]
|
|
for audit_id in meta_audits:
|
|
if audit_id in audits:
|
|
audit = audits[audit_id]
|
|
seo_audits['meta'][audit_id] = {
|
|
'score': audit.get('score'),
|
|
'title': audit.get('title'),
|
|
'description': audit.get('description'),
|
|
}
|
|
|
|
# Crawlability
|
|
crawl_audits = [
|
|
'is-crawlable',
|
|
'http-status-code',
|
|
'link-text',
|
|
'crawlable-anchors',
|
|
]
|
|
for audit_id in crawl_audits:
|
|
if audit_id in audits:
|
|
audit = audits[audit_id]
|
|
seo_audits['crawlability'][audit_id] = {
|
|
'score': audit.get('score'),
|
|
'title': audit.get('title'),
|
|
}
|
|
|
|
# Content
|
|
content_audits = [
|
|
'image-alt',
|
|
'structured-data',
|
|
'font-size',
|
|
'tap-targets',
|
|
]
|
|
for audit_id in content_audits:
|
|
if audit_id in audits:
|
|
audit = audits[audit_id]
|
|
seo_audits['content'][audit_id] = {
|
|
'score': audit.get('score'),
|
|
'title': audit.get('title'),
|
|
}
|
|
|
|
# Mobile
|
|
mobile_audits = [
|
|
'viewport',
|
|
'content-width',
|
|
]
|
|
for audit_id in mobile_audits:
|
|
if audit_id in audits:
|
|
audit = audits[audit_id]
|
|
seo_audits['mobile'][audit_id] = {
|
|
'score': audit.get('score'),
|
|
'title': audit.get('title'),
|
|
}
|
|
|
|
# Performance (affects SEO)
|
|
perf_audits = [
|
|
'speed-index',
|
|
'interactive',
|
|
'total-blocking-time',
|
|
]
|
|
for audit_id in perf_audits:
|
|
if audit_id in audits:
|
|
audit = audits[audit_id]
|
|
seo_audits['performance'][audit_id] = {
|
|
'score': audit.get('score'),
|
|
'numericValue': audit.get('numericValue'),
|
|
'displayValue': audit.get('displayValue'),
|
|
}
|
|
|
|
return seo_audits
|
|
|
|
def get_remaining_quota(self) -> int:
|
|
"""Get remaining API requests for today."""
|
|
return self.rate_limiter.get_remaining_quota()
|
|
|
|
def get_usage_stats(self) -> Dict[str, Any]:
|
|
"""Get API usage statistics."""
|
|
return self.rate_limiter.get_usage_stats()
|
|
|
|
|
|
# Convenience function for simple usage
|
|
def analyze_url(url: str, strategy: str = 'mobile') -> Dict[str, Any]:
|
|
"""
|
|
Convenience function to analyze a URL.
|
|
|
|
Args:
|
|
url: The URL to analyze.
|
|
strategy: 'mobile' or 'desktop'.
|
|
|
|
Returns:
|
|
Dict with analysis results.
|
|
"""
|
|
client = GooglePageSpeedClient()
|
|
strat = Strategy.MOBILE if strategy == 'mobile' else Strategy.DESKTOP
|
|
result = client.analyze_url(url, strat)
|
|
return result.to_dict()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Quick test
|
|
import sys
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python pagespeed_client.py <url>")
|
|
print("Example: python pagespeed_client.py https://pixlab.pl")
|
|
sys.exit(1)
|
|
|
|
test_url = sys.argv[1]
|
|
|
|
print(f"Analyzing: {test_url}")
|
|
print("-" * 60)
|
|
|
|
client = GooglePageSpeedClient()
|
|
|
|
print(f"API Key: {'Set' if client.api_key else 'Not set (using public API)'}")
|
|
print(f"Remaining quota: {client.get_remaining_quota()}")
|
|
print("-" * 60)
|
|
|
|
try:
|
|
result = client.analyze_url(test_url)
|
|
|
|
print(f"URL: {result.url}")
|
|
print(f"Final URL: {result.final_url}")
|
|
print(f"Strategy: {result.strategy}")
|
|
print(f"Analyzed at: {result.analyzed_at}")
|
|
print()
|
|
print("Scores:")
|
|
print(f" Performance: {result.scores.performance}")
|
|
print(f" Accessibility: {result.scores.accessibility}")
|
|
print(f" Best Practices: {result.scores.best_practices}")
|
|
print(f" SEO: {result.scores.seo}")
|
|
print()
|
|
print("Core Web Vitals:")
|
|
print(f" LCP: {result.core_web_vitals.lcp_ms}ms")
|
|
print(f" FCP: {result.core_web_vitals.fcp_ms}ms")
|
|
print(f" CLS: {result.core_web_vitals.cls}")
|
|
print(f" TTFB: {result.core_web_vitals.ttfb_ms}ms")
|
|
print()
|
|
print(f"Lighthouse version: {result.lighthouse_version}")
|
|
print(f"Fetch time: {result.fetch_time_ms}ms")
|
|
print()
|
|
print(f"Remaining quota: {client.get_remaining_quota()}")
|
|
|
|
except QuotaExceededError as e:
|
|
print(f"ERROR: Quota exceeded - {e}")
|
|
sys.exit(1)
|
|
except PageSpeedAPIError as e:
|
|
print(f"ERROR: API error - {e}")
|
|
sys.exit(1)
|