#!/usr/bin/env python3 """ SEO Audit Script for Norda Biznes ================================= Performs comprehensive SEO audit of company websites using: - Google PageSpeed Insights API (performance, accessibility, SEO scores) - On-page SEO analysis (meta tags, headings, images, links, structured data) - Technical SEO checks (robots.txt, sitemap, canonical, indexability) Designed to run in batches with rate limiting for API quota management. Usage: python seo_audit.py --company-id 26 python seo_audit.py --batch 1-10 python seo_audit.py --all python seo_audit.py --company-id 26 --dry-run Exit codes: 0 - All audits completed successfully 1 - Argument error or invalid input 2 - Partial failures (some audits failed) 3 - All audits failed 4 - Database connection error 5 - API quota exceeded Author: Maciej Pienczyn, InPi sp. z o.o. Date: 2026-01-08 """ import os import sys import re import json import ssl import socket import argparse import logging import time as time_module from datetime import datetime, timedelta from typing import Optional, Dict, List, Any, Tuple from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '.env')) import requests from bs4 import BeautifulSoup from sqlalchemy import create_engine, text from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import sessionmaker # Import SEO analysis components from pagespeed_client import ( GooglePageSpeedClient, PageSpeedResult, PageSpeedAPIError, QuotaExceededError, Strategy, ) from seo_analyzer import ( OnPageSEOAnalyzer, OnPageSEOResult, TechnicalSEOChecker, TechnicalSEOResult, ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Exit codes EXIT_SUCCESS = 0 EXIT_ARGUMENT_ERROR = 1 EXIT_PARTIAL_FAILURES = 2 EXIT_ALL_FAILED = 3 EXIT_DATABASE_ERROR = 4 EXIT_QUOTA_EXCEEDED = 5 # Database configuration # WARNING: The fallback DATABASE_URL uses a placeholder password. # Production credentials MUST be set via the DATABASE_URL environment variable. # NEVER commit real credentials to version control (CWE-798). DATABASE_URL = os.getenv( 'DATABASE_URL', 'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz' ) # Request configuration REQUEST_TIMEOUT = 30 USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 NordaBiznes-SEO-Auditor/1.0' # SEO Audit version for tracking SEO_AUDIT_VERSION = '1.0.0' class LocalSEOAnalyzer: """Analyzes Local SEO factors for business websites.""" def __init__(self): self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def analyze(self, html_content: str, url: str, company_data: Dict = None) -> Dict[str, Any]: """Run all local SEO checks on HTML content.""" result = { 'local_seo_score': 0, 'has_local_business_schema': False, 'local_business_schema_fields': {}, 'nap_on_website': {}, 'has_google_maps_embed': False, 'has_local_keywords': False, 'local_keywords_found': [], } soup = BeautifulSoup(html_content, 'html.parser') # Check LocalBusiness schema schema_result = self._check_local_business_schema(html_content) result.update(schema_result) # Extract NAP from website nap = self._extract_nap(soup, html_content) result['nap_on_website'] = nap # Check Google Maps embed result['has_google_maps_embed'] = self._check_google_maps(html_content) # Check local keywords city = (company_data or {}).get('address_city', 'Wejherowo') keywords = self._find_local_keywords(soup, html_content, city) result['has_local_keywords'] = len(keywords) > 0 result['local_keywords_found'] = keywords[:20] # Calculate local SEO score result['local_seo_score'] = self._calculate_local_score(result) return result def _check_local_business_schema(self, html: str) -> Dict[str, Any]: """Check for Schema.org LocalBusiness structured data.""" import json as json_mod result = { 'has_local_business_schema': False, 'local_business_schema_fields': {}, } # Find JSON-LD blocks ld_pattern = re.compile(r']*type=["\']application/ld\+json["\'][^>]*>(.*?)', re.DOTALL | re.IGNORECASE) matches = ld_pattern.findall(html) local_types = ['LocalBusiness', 'Organization', 'Store', 'Restaurant', 'ProfessionalService', 'AutoRepair', 'HealthAndBeautyBusiness', 'LodgingBusiness', 'FoodEstablishment', 'FinancialService'] for match in matches: try: data = json_mod.loads(match.strip()) items = [data] if isinstance(data, dict) else data if isinstance(data, list) else [] for item in items: item_type = item.get('@type', '') if isinstance(item_type, list): item_type = item_type[0] if item_type else '' if item_type in local_types: result['has_local_business_schema'] = True # Check which fields are present important_fields = ['name', 'address', 'telephone', 'email', 'url', 'openingHours', 'openingHoursSpecification', 'geo', 'image', 'description', 'priceRange', 'areaServed', 'aggregateRating'] for field in important_fields: result['local_business_schema_fields'][field] = field in item and bool(item[field]) break except (json_mod.JSONDecodeError, TypeError): continue return result def _extract_nap(self, soup, html: str) -> Dict[str, Any]: """Extract Name, Address, Phone from website HTML.""" nap = {'name': None, 'address': None, 'phone': None} text = soup.get_text(separator=' ') # Phone patterns (Polish format) phone_patterns = [ r'(?:tel\.?|telefon|phone|zadzwoń)[:\s]*([+]?\d[\d\s\-]{7,15})', r'(?:href="tel:)([+]?\d[\d\-]{7,15})"', r'(\+48[\s\-]?\d{3}[\s\-]?\d{3}[\s\-]?\d{3})', r'(\d{2}[\s\-]\d{3}[\s\-]\d{2}[\s\-]\d{2})', ] for pattern in phone_patterns: match = re.search(pattern, html, re.IGNORECASE) if match: phone = re.sub(r'[\s\-]', '', match.group(1)) if len(phone) >= 9: nap['phone'] = match.group(1).strip() break # Address patterns (Polish) address_patterns = [ r'(?:ul\.?|ulica)\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144\s]+\s+\d+[a-zA-Z]?(?:/\d+)?(?:,?\s+\d{2}-\d{3}\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144]+)?', r'\d{2}-\d{3}\s+[A-Z\u0141\u00d3\u015a\u017b\u0179\u0106\u0104\u0118\u0143][a-z\u0105\u0119\u00f3\u0142\u015b\u017c\u017a\u0107\u0144]+', ] for pattern in address_patterns: match = re.search(pattern, text) if match: nap['address'] = match.group(0).strip()[:200] break # Business name from structured data or og:site_name og_site = soup.find('meta', property='og:site_name') if og_site and og_site.get('content'): nap['name'] = og_site['content'].strip()[:200] return nap def _check_google_maps(self, html: str) -> bool: """Check if page has embedded Google Maps.""" maps_patterns = [ r'maps\.googleapis\.com', r'maps\.google\.com/maps', r'google\.com/maps/embed', r']*google[^>]*maps[^>]*>', ] return any(re.search(p, html, re.IGNORECASE) for p in maps_patterns) def _find_local_keywords(self, soup, html: str, city: str) -> List[str]: """Find local keywords in page content (service + city patterns).""" keywords_found = [] text = soup.get_text(separator=' ').lower() # Common service keywords for Polish businesses service_keywords = [ 'hydraulik', 'elektryk', 'mechanik', 'fryzjer', 'dentysta', 'prawnik', 'adwokat', 'księgowy', 'architekt', 'fotograf', 'restauracja', 'hotel', 'sklep', 'serwis', 'naprawa', 'instalacje', 'remonty', 'transport', 'catering', 'szkolenia', 'kursy', 'gabinet', 'klinika', 'studio', ] city_lower = city.lower() if city else 'wejherowo' nearby_cities = ['wejherowo', 'rumia', 'reda', 'gdynia', 'gdańsk', 'sopot', 'puck', 'luzino'] # Polish city name declensions (common forms) city_forms = { 'wejherowo': ['wejherowo', 'wejherowa', 'wejherowie', 'wejherowem'], 'rumia': ['rumia', 'rumi', 'rumię', 'rumią'], 'reda': ['reda', 'redy', 'redzie', 'redą'], 'gdynia': ['gdynia', 'gdyni', 'gdynię', 'gdynią'], 'gdańsk': ['gdańsk', 'gdańska', 'gdańsku', 'gdańskiem'], 'sopot': ['sopot', 'sopotu', 'sopocie', 'sopotem'], 'puck': ['puck', 'pucka', 'pucku', 'puckiem'], 'luzino': ['luzino', 'luzina', 'luzinie', 'luzinem'], } for keyword in service_keywords: for c in nearby_cities: forms = city_forms.get(c, [c]) for form in forms: phrase = f'{keyword} {form}' if phrase in text: keywords_found.append(f'{keyword} {c}') break # Also check meta title and description title = (soup.title.string if soup.title else '').lower() meta_desc = '' desc_tag = soup.find('meta', {'name': 'description'}) if desc_tag: meta_desc = (desc_tag.get('content', '') or '').lower() # Check city name in title/description (any declension form) all_city_forms = city_forms.get(city_lower, [city_lower]) for form in all_city_forms: if form in title: keywords_found.append(f'city_in_title:{city_lower}') break for form in all_city_forms: if form in meta_desc: keywords_found.append(f'city_in_description:{city_lower}') break return list(set(keywords_found)) def _calculate_local_score(self, data: Dict) -> int: """Calculate Local SEO score 0-100.""" score = 0 if data.get('has_local_business_schema'): score += 25 # Bonus for complete schema fields = data.get('local_business_schema_fields', {}) filled = sum(1 for v in fields.values() if v) total = len(fields) if total > 0: score += int(10 * (filled / total)) nap = data.get('nap_on_website', {}) if nap.get('name'): score += 10 if nap.get('address'): score += 10 if nap.get('phone'): score += 10 if data.get('has_google_maps_embed'): score += 15 if data.get('has_local_keywords'): score += 15 # Bonus for multiple local keywords kw_count = len(data.get('local_keywords_found', [])) if kw_count >= 5: score += 5 return min(score, 100) class CitationChecker: """Checks company presence in Polish local business directories.""" # Polish business directories to check DIRECTORIES = [ {'name': 'panoramafirm.pl', 'url': 'https://panoramafirm.pl', 'search_domain': 'panoramafirm.pl'}, {'name': 'pkt.pl', 'url': 'https://pkt.pl', 'search_domain': 'pkt.pl'}, {'name': 'aleo.com', 'url': 'https://aleo.com', 'search_domain': 'aleo.com'}, {'name': 'firmy.net', 'url': 'https://firmy.net', 'search_domain': 'firmy.net'}, {'name': 'zumi.pl', 'url': 'https://zumi.pl', 'search_domain': 'zumi.pl'}, {'name': 'gowork.pl', 'url': 'https://gowork.pl', 'search_domain': 'gowork.pl'}, {'name': 'oferteo.pl', 'url': 'https://oferteo.pl', 'search_domain': 'oferteo.pl'}, {'name': 'google.com/maps', 'url': 'https://google.com/maps', 'search_domain': 'google.com/maps'}, {'name': 'facebook.com', 'url': 'https://facebook.com', 'search_domain': 'facebook.com'}, {'name': 'yelp.com', 'url': 'https://yelp.com', 'search_domain': 'yelp.com'}, ] def __init__(self): self.brave_api_key = os.getenv('BRAVE_API_KEY') self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def check_citations(self, company_name: str, city: str = 'Wejherowo') -> List[Dict[str, Any]]: """Check if company is listed in directories.""" results = [] if not self.brave_api_key: logger.warning("BRAVE_API_KEY not set, citation check skipped") return results for directory in self.DIRECTORIES: try: citation = self._check_single_directory(company_name, city, directory) results.append(citation) # Rate limit time_module.sleep(0.5) except Exception as e: logger.warning(f"Citation check failed for {directory['name']}: {e}") results.append({ 'directory_name': directory['name'], 'directory_url': directory['url'], 'status': 'error', 'listing_url': None, }) return results def _check_single_directory(self, company_name: str, city: str, directory: Dict) -> Dict: """Check one directory using Brave Search.""" query = f'"{company_name}" site:{directory["search_domain"]}' try: resp = self.session.get( 'https://api.search.brave.com/res/v1/web/search', params={'q': query, 'count': 3}, headers={'X-Subscription-Token': self.brave_api_key}, timeout=10 ) resp.raise_for_status() data = resp.json() results = data.get('web', {}).get('results', []) if results: return { 'directory_name': directory['name'], 'directory_url': directory['url'], 'listing_url': results[0].get('url'), 'status': 'found', } else: return { 'directory_name': directory['name'], 'directory_url': directory['url'], 'listing_url': None, 'status': 'not_found', } except Exception as e: logger.debug(f"Brave search for {directory['name']}: {e}") return { 'directory_name': directory['name'], 'directory_url': directory['url'], 'listing_url': None, 'status': 'error', } class ContentFreshnessChecker: """Checks content freshness of a website.""" def __init__(self): self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def check_freshness(self, url: str, html_content: str = None) -> Dict[str, Any]: """Check content freshness indicators.""" result = { 'last_content_update': None, 'content_freshness_score': 0, } # Check Last-Modified header try: resp = self.session.head(url, timeout=10, allow_redirects=True) last_modified = resp.headers.get('Last-Modified') if last_modified: from email.utils import parsedate_to_datetime try: result['last_content_update'] = parsedate_to_datetime(last_modified) except Exception: pass except Exception: pass # Check dates in HTML content if html_content: soup = BeautifulSoup(html_content, 'html.parser') # Look for date patterns in the page date_patterns = [ r'20\d{2}[-./]\d{1,2}[-./]\d{1,2}', r'\d{1,2}[-./]\d{1,2}[-./]20\d{2}', ] text = soup.get_text() latest_date = None for pattern in date_patterns: matches = re.findall(pattern, text) for m in matches: try: # Try parsing various formats for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%d.%m.%Y', '%d-%m-%Y', '%d/%m/%Y']: try: d = datetime.strptime(m, fmt) if d.year >= 2020 and d <= datetime.now(): if latest_date is None or d > latest_date: latest_date = d break except ValueError: continue except Exception: continue if latest_date: existing = result['last_content_update'] if existing is not None and existing.tzinfo is not None: existing = existing.replace(tzinfo=None) if existing is None or latest_date > existing: result['last_content_update'] = latest_date # Calculate freshness score if result['last_content_update']: update_dt = result['last_content_update'] # Normalize: strip tzinfo if present so subtraction works with naive datetime.now() if update_dt.tzinfo is not None: update_dt = update_dt.replace(tzinfo=None) days_old = (datetime.now() - update_dt).days if days_old <= 30: result['content_freshness_score'] = 100 elif days_old <= 90: result['content_freshness_score'] = 80 elif days_old <= 180: result['content_freshness_score'] = 60 elif days_old <= 365: result['content_freshness_score'] = 40 else: result['content_freshness_score'] = 20 else: result['content_freshness_score'] = 10 # Unknown = low score return result class SEOAuditor: """ Main SEO auditor class that coordinates website SEO auditing. Follows the same pattern as SocialMediaAuditor from social_media_audit.py. Orchestrates PageSpeed API, on-page analysis, and technical SEO checks. """ def __init__(self, database_url: str = DATABASE_URL): """ Initialize SEO Auditor. Args: database_url: Database connection string. """ self.engine = create_engine(database_url) self.Session = sessionmaker(bind=self.engine) # Initialize analysis components self.pagespeed_client = GooglePageSpeedClient() self.onpage_analyzer = OnPageSEOAnalyzer() self.technical_checker = TechnicalSEOChecker() self.local_seo_analyzer = LocalSEOAnalyzer() self.citation_checker = CitationChecker() self.freshness_checker = ContentFreshnessChecker() # HTTP session for fetching pages self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) def get_companies(self, company_ids: Optional[List[int]] = None, batch_start: Optional[int] = None, batch_end: Optional[int] = None) -> List[Dict]: """ Fetch companies from database. Args: company_ids: List of specific company IDs to fetch. batch_start: Start index for batch processing (1-indexed). batch_end: End index for batch processing (1-indexed). Returns: List of company dicts with id, name, slug, website. """ with self.Session() as session: if company_ids: # Use IN clause for SQLite/PostgreSQL compatibility placeholders = ', '.join([f':id_{i}' for i in range(len(company_ids))]) query = text(f""" SELECT id, name, slug, website, address_city FROM companies WHERE id IN ({placeholders}) ORDER BY id """) params = {f'id_{i}': cid for i, cid in enumerate(company_ids)} result = session.execute(query, params) elif batch_start is not None and batch_end is not None: query = text(""" SELECT id, name, slug, website, address_city FROM companies ORDER BY id OFFSET :offset LIMIT :limit """) result = session.execute(query, { 'offset': batch_start - 1, 'limit': batch_end - batch_start + 1 }) else: query = text(""" SELECT id, name, slug, website, address_city FROM companies ORDER BY id """) result = session.execute(query) return [dict(row._mapping) for row in result] def audit_company(self, company: Dict) -> Dict[str, Any]: """ Perform full SEO audit for a single company. Args: company: Company dict with id, name, slug, website. Returns: Comprehensive SEO audit result dict. """ logger.info(f"Auditing SEO for: {company['name']} (ID: {company['id']})") result = { 'company_id': company['id'], 'company_name': company['name'], 'company_slug': company['slug'], 'audit_date': datetime.now(), 'audit_version': SEO_AUDIT_VERSION, 'website_url': company.get('website'), 'pagespeed': None, 'onpage': None, 'technical': None, 'scores': { 'pagespeed_seo': None, 'pagespeed_performance': None, 'pagespeed_accessibility': None, 'pagespeed_best_practices': None, 'overall_seo': None, }, 'errors': [], } website_url = company.get('website') # Check if company has a website if not website_url: result['errors'].append('No website URL configured') logger.warning(f" Company {company['id']} has no website URL") return result # Normalize URL if not website_url.startswith(('http://', 'https://')): website_url = 'https://' + website_url result['website_url'] = website_url # 1. Fetch page HTML for on-page analysis html_content = None final_url = website_url http_status = None load_time_ms = None try: logger.info(f" Fetching page: {website_url}") start_time = time_module.time() response = self.session.get( website_url, timeout=REQUEST_TIMEOUT, allow_redirects=True ) load_time_ms = int((time_module.time() - start_time) * 1000) http_status = response.status_code final_url = response.url if response.status_code == 200: # Fix encoding: requests defaults to ISO-8859-1 when charset missing if response.encoding and response.encoding.lower() == 'iso-8859-1': response.encoding = response.apparent_encoding html_content = response.text logger.info(f" Page fetched successfully ({load_time_ms}ms)") else: result['errors'].append(f'HTTP {response.status_code}') logger.warning(f" HTTP {response.status_code} for {website_url}") except requests.exceptions.SSLError as e: result['errors'].append(f'SSL Error: {str(e)[:100]}') logger.warning(f" SSL error for {website_url}: {e}") # Try HTTP fallback try: http_url = website_url.replace('https://', 'http://') response = self.session.get(http_url, timeout=REQUEST_TIMEOUT) http_status = response.status_code final_url = response.url if response.status_code == 200: if response.encoding and response.encoding.lower() == 'iso-8859-1': response.encoding = response.apparent_encoding html_content = response.text except Exception as e2: result['errors'].append(f'HTTP fallback failed: {str(e2)[:50]}') except requests.exceptions.Timeout: result['errors'].append(f'Timeout after {REQUEST_TIMEOUT}s') logger.warning(f" Timeout for {website_url}") except requests.exceptions.ConnectionError as e: result['errors'].append(f'Connection error: {str(e)[:100]}') logger.warning(f" Connection error for {website_url}") except requests.exceptions.RequestException as e: result['errors'].append(f'Request error: {str(e)[:100]}') logger.warning(f" Request error for {website_url}: {e}") # Store HTTP info result['http_status'] = http_status result['load_time_ms'] = load_time_ms result['final_url'] = final_url # 2. On-page SEO analysis (if we have HTML) if html_content: try: logger.info(" Running on-page SEO analysis...") onpage_result = self.onpage_analyzer.analyze_html( html_content, base_url=final_url ) result['onpage'] = onpage_result.to_dict() logger.info(f" On-page analysis complete") except Exception as e: result['errors'].append(f'On-page analysis failed: {str(e)[:100]}') logger.error(f" On-page analysis error: {e}") # 3. Technical SEO checks (robots.txt, sitemap, etc.) try: logger.info(" Running technical SEO checks...") technical_result = self.technical_checker.check_url(final_url) result['technical'] = technical_result.to_dict() logger.info(f" Technical checks complete") except Exception as e: result['errors'].append(f'Technical checks failed: {str(e)[:100]}') logger.error(f" Technical checks error: {e}") # 4. PageSpeed Insights API (if quota available) try: remaining_quota = self.pagespeed_client.get_remaining_quota() if remaining_quota > 0: logger.info(f" Running PageSpeed Insights (quota: {remaining_quota})...") pagespeed_result = self.pagespeed_client.analyze_url( final_url, strategy=Strategy.MOBILE ) result['pagespeed'] = pagespeed_result.to_dict() # Extract scores result['scores']['pagespeed_seo'] = pagespeed_result.scores.seo result['scores']['pagespeed_performance'] = pagespeed_result.scores.performance result['scores']['pagespeed_accessibility'] = pagespeed_result.scores.accessibility result['scores']['pagespeed_best_practices'] = pagespeed_result.scores.best_practices logger.info(f" PageSpeed complete - SEO: {pagespeed_result.scores.seo}, " f"Perf: {pagespeed_result.scores.performance}") else: result['errors'].append('PageSpeed API quota exceeded') logger.warning(" PageSpeed quota exceeded, skipping") except QuotaExceededError: result['errors'].append('PageSpeed API quota exceeded') logger.warning(" PageSpeed quota exceeded") except PageSpeedAPIError as e: result['errors'].append(f'PageSpeed API error: {str(e)[:100]}') logger.error(f" PageSpeed error: {e}") except Exception as e: result['errors'].append(f'PageSpeed unexpected error: {str(e)[:100]}') logger.error(f" PageSpeed unexpected error: {e}") # 6. Local SEO analysis if html_content: try: logger.info(" Running Local SEO analysis...") local_seo = self.local_seo_analyzer.analyze(html_content, final_url, company) result['local_seo'] = local_seo logger.info(f" Local SEO score: {local_seo.get('local_seo_score', 0)}") except Exception as e: result['errors'].append(f'Local SEO analysis failed: {str(e)[:100]}') logger.error(f" Local SEO error: {e}") # 7. Citation check try: city = company.get('address_city', 'Wejherowo') logger.info(f" Checking citations for '{company['name']}' in {city}...") citations = self.citation_checker.check_citations(company['name'], city) result['citations'] = citations found_count = sum(1 for c in citations if c.get('status') == 'found') logger.info(f" Citations found: {found_count}/{len(citations)}") except Exception as e: result['errors'].append(f'Citation check failed: {str(e)[:100]}') logger.error(f" Citation check error: {e}") # 8. Content freshness try: logger.info(" Checking content freshness...") freshness = self.freshness_checker.check_freshness(final_url, html_content) result['freshness'] = freshness logger.info(f" Freshness score: {freshness.get('content_freshness_score', 0)}") except Exception as e: result['errors'].append(f'Freshness check failed: {str(e)[:100]}') # 5. Calculate overall SEO score result['scores']['overall_seo'] = self._calculate_overall_score(result) return result def _calculate_overall_score(self, result: Dict[str, Any]) -> Optional[int]: """ Calculate an overall SEO score based on all available metrics. Args: result: Full audit result dict. Returns: Overall SEO score 0-100, or None if insufficient data. """ scores = [] weights = [] # PageSpeed SEO score (weight: 3) if result.get('scores', {}).get('pagespeed_seo') is not None: scores.append(result['scores']['pagespeed_seo']) weights.append(3) # PageSpeed Performance (weight: 2) if result.get('scores', {}).get('pagespeed_performance') is not None: scores.append(result['scores']['pagespeed_performance']) weights.append(2) # On-page factors score (calculated from analysis) onpage = result.get('onpage') if onpage: onpage_score = self._calculate_onpage_score(onpage) if onpage_score is not None: scores.append(onpage_score) weights.append(2) # Technical SEO score technical = result.get('technical') if technical: technical_score = self._calculate_technical_score(technical) if technical_score is not None: scores.append(technical_score) weights.append(2) # Calculate weighted average if scores and weights: weighted_sum = sum(s * w for s, w in zip(scores, weights)) total_weight = sum(weights) return int(round(weighted_sum / total_weight)) return None def _calculate_onpage_score(self, onpage: Dict[str, Any]) -> Optional[int]: """Calculate on-page SEO score from analysis results.""" score = 100 deductions = 0 # Meta tags checks meta = onpage.get('meta_tags', {}) if not meta.get('title'): deductions += 15 elif meta.get('title_length', 0) < 30 or meta.get('title_length', 0) > 70: deductions += 5 if not meta.get('description'): deductions += 10 elif meta.get('description_length', 0) < 120 or meta.get('description_length', 0) > 160: deductions += 5 if not meta.get('canonical_url'): deductions += 5 # Headings check headings = onpage.get('headings', {}) if headings.get('h1_count', 0) == 0: deductions += 10 elif headings.get('h1_count', 0) > 1: deductions += 5 if not headings.get('has_proper_hierarchy', True): deductions += 5 # Images check images = onpage.get('images', {}) total_images = images.get('total_images', 0) images_without_alt = images.get('images_without_alt', 0) if total_images > 0 and images_without_alt > 0: alt_ratio = images_without_alt / total_images if alt_ratio > 0.5: deductions += 10 elif alt_ratio > 0.2: deductions += 5 # Structured data check structured = onpage.get('structured_data', {}) if not structured.get('has_structured_data', False): deductions += 5 # Open Graph check og = onpage.get('open_graph', {}) if not og.get('og_title'): deductions += 3 return max(0, score - deductions) def _calculate_technical_score(self, technical: Dict[str, Any]) -> Optional[int]: """Calculate technical SEO score from check results.""" score = 100 deductions = 0 # Robots.txt check robots = technical.get('robots_txt', {}) if not robots.get('exists', False): deductions += 10 elif robots.get('blocks_googlebot', False): deductions += 20 # Sitemap check sitemap = technical.get('sitemap', {}) if not sitemap.get('exists', False): deductions += 10 elif not sitemap.get('is_valid_xml', False): deductions += 5 # Redirect chain check redirects = technical.get('redirect_chain', {}) chain_length = redirects.get('chain_length', 0) if chain_length > 3: deductions += 10 elif chain_length > 1: deductions += 5 if redirects.get('has_redirect_loop', False): deductions += 20 # Indexability check indexability = technical.get('indexability', {}) if not indexability.get('is_indexable', True): deductions += 15 # Canonical check canonical = technical.get('canonical', {}) if canonical.get('has_canonical', False): if canonical.get('points_to_different_domain', False): deductions += 10 return max(0, score - deductions) def save_audit_result(self, result: Dict) -> bool: """ Save audit result to database. Uses ON CONFLICT DO UPDATE for idempotent upserts. Args: result: Full audit result dict. Returns: True if save was successful, False otherwise. """ try: with self.Session() as session: company_id = result['company_id'] # Extract values from result (use 'or {}' to handle None values) onpage = result.get('onpage') or {} technical = result.get('technical') or {} pagespeed = result.get('pagespeed') or {} meta_tags = onpage.get('meta_tags') or {} headings = onpage.get('headings') or {} images = onpage.get('images') or {} links = onpage.get('links') or {} structured_data = onpage.get('structured_data') or {} og = onpage.get('open_graph') or {} tc = onpage.get('twitter_card') or {} robots = technical.get('robots_txt') or {} sitemap = technical.get('sitemap') or {} canonical = technical.get('canonical') or {} indexability = technical.get('indexability') or {} cwv = pagespeed.get('core_web_vitals') or {} ps_scores = pagespeed.get('scores') or {} # Upsert query for company_website_analysis # Uses ON CONFLICT DO UPDATE for idempotent upserts upsert_query = text(""" INSERT INTO company_website_analysis ( company_id, analyzed_at, website_url, final_url, http_status_code, load_time_ms, -- PageSpeed Insights pagespeed_seo_score, pagespeed_performance_score, pagespeed_accessibility_score, pagespeed_best_practices_score, pagespeed_audits, -- On-page SEO meta_title, meta_description, meta_keywords, h1_count, h2_count, h3_count, h1_text, total_images, images_without_alt, images_with_alt, internal_links_count, external_links_count, broken_links_count, has_structured_data, structured_data_types, structured_data_json, -- Technical SEO has_canonical, canonical_url, is_indexable, noindex_reason, has_sitemap, has_robots_txt, viewport_configured, is_mobile_friendly, -- SSL has_ssl, ssl_expires_at, ssl_issuer, -- Core Web Vitals largest_contentful_paint_ms, interaction_to_next_paint_ms, cumulative_layout_shift, -- Open Graph has_og_tags, og_title, og_description, og_image, has_twitter_cards, -- Language & International html_lang, has_hreflang, -- Word count word_count_homepage, -- SEO Audit metadata seo_audit_version, seo_audited_at, seo_audit_errors, seo_overall_score, seo_health_score, seo_issues, -- Local SEO local_seo_score, has_local_business_schema, local_business_schema_fields, nap_on_website, has_google_maps_embed, has_local_keywords, local_keywords_found, -- Citations citations_found, citations_count, -- Content freshness content_freshness_score, last_content_update ) VALUES ( :company_id, :analyzed_at, :website_url, :final_url, :http_status_code, :load_time_ms, :pagespeed_seo_score, :pagespeed_performance_score, :pagespeed_accessibility_score, :pagespeed_best_practices_score, :pagespeed_audits, :meta_title, :meta_description, :meta_keywords, :h1_count, :h2_count, :h3_count, :h1_text, :total_images, :images_without_alt, :images_with_alt, :internal_links_count, :external_links_count, :broken_links_count, :has_structured_data, :structured_data_types, :structured_data_json, :has_canonical, :canonical_url, :is_indexable, :noindex_reason, :has_sitemap, :has_robots_txt, :viewport_configured, :is_mobile_friendly, :has_ssl, :ssl_expires_at, :ssl_issuer, :largest_contentful_paint_ms, :interaction_to_next_paint_ms, :cumulative_layout_shift, :has_og_tags, :og_title, :og_description, :og_image, :has_twitter_cards, :html_lang, :has_hreflang, :word_count_homepage, :seo_audit_version, :seo_audited_at, :seo_audit_errors, :seo_overall_score, :seo_health_score, :seo_issues, :local_seo_score, :has_local_business_schema, :local_business_schema_fields, :nap_on_website, :has_google_maps_embed, :has_local_keywords, :local_keywords_found, :citations_found, :citations_count, :content_freshness_score, :last_content_update ) ON CONFLICT (company_id) DO UPDATE SET analyzed_at = EXCLUDED.analyzed_at, website_url = EXCLUDED.website_url, final_url = EXCLUDED.final_url, http_status_code = EXCLUDED.http_status_code, load_time_ms = EXCLUDED.load_time_ms, pagespeed_seo_score = EXCLUDED.pagespeed_seo_score, pagespeed_performance_score = EXCLUDED.pagespeed_performance_score, pagespeed_accessibility_score = EXCLUDED.pagespeed_accessibility_score, pagespeed_best_practices_score = EXCLUDED.pagespeed_best_practices_score, pagespeed_audits = EXCLUDED.pagespeed_audits, meta_title = EXCLUDED.meta_title, meta_description = EXCLUDED.meta_description, meta_keywords = EXCLUDED.meta_keywords, h1_count = EXCLUDED.h1_count, h2_count = EXCLUDED.h2_count, h3_count = EXCLUDED.h3_count, h1_text = EXCLUDED.h1_text, total_images = EXCLUDED.total_images, images_without_alt = EXCLUDED.images_without_alt, images_with_alt = EXCLUDED.images_with_alt, internal_links_count = EXCLUDED.internal_links_count, external_links_count = EXCLUDED.external_links_count, broken_links_count = EXCLUDED.broken_links_count, has_structured_data = EXCLUDED.has_structured_data, structured_data_types = EXCLUDED.structured_data_types, structured_data_json = EXCLUDED.structured_data_json, has_canonical = EXCLUDED.has_canonical, canonical_url = EXCLUDED.canonical_url, is_indexable = EXCLUDED.is_indexable, noindex_reason = EXCLUDED.noindex_reason, has_sitemap = EXCLUDED.has_sitemap, has_robots_txt = EXCLUDED.has_robots_txt, viewport_configured = EXCLUDED.viewport_configured, is_mobile_friendly = EXCLUDED.is_mobile_friendly, has_ssl = EXCLUDED.has_ssl, ssl_expires_at = EXCLUDED.ssl_expires_at, ssl_issuer = EXCLUDED.ssl_issuer, largest_contentful_paint_ms = EXCLUDED.largest_contentful_paint_ms, interaction_to_next_paint_ms = EXCLUDED.interaction_to_next_paint_ms, cumulative_layout_shift = EXCLUDED.cumulative_layout_shift, has_og_tags = EXCLUDED.has_og_tags, og_title = EXCLUDED.og_title, og_description = EXCLUDED.og_description, og_image = EXCLUDED.og_image, has_twitter_cards = EXCLUDED.has_twitter_cards, html_lang = EXCLUDED.html_lang, has_hreflang = EXCLUDED.has_hreflang, word_count_homepage = EXCLUDED.word_count_homepage, seo_audit_version = EXCLUDED.seo_audit_version, seo_audited_at = EXCLUDED.seo_audited_at, seo_audit_errors = EXCLUDED.seo_audit_errors, seo_overall_score = EXCLUDED.seo_overall_score, seo_health_score = EXCLUDED.seo_health_score, seo_issues = EXCLUDED.seo_issues, local_seo_score = EXCLUDED.local_seo_score, has_local_business_schema = EXCLUDED.has_local_business_schema, local_business_schema_fields = EXCLUDED.local_business_schema_fields, nap_on_website = EXCLUDED.nap_on_website, has_google_maps_embed = EXCLUDED.has_google_maps_embed, has_local_keywords = EXCLUDED.has_local_keywords, local_keywords_found = EXCLUDED.local_keywords_found, citations_found = EXCLUDED.citations_found, citations_count = EXCLUDED.citations_count, content_freshness_score = EXCLUDED.content_freshness_score, last_content_update = EXCLUDED.last_content_update """) # Check SSL certificate ssl_info = {'has_ssl': False, 'ssl_expires_at': None, 'ssl_issuer': None} website_url = result.get('website_url', '') try: from urllib.parse import urlparse parsed = urlparse(website_url or result.get('final_url', '')) domain = parsed.hostname if domain: ctx = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=10) as sock: with ctx.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() ssl_info['has_ssl'] = True not_after = cert.get('notAfter') if not_after: ssl_info['ssl_expires_at'] = datetime.strptime( not_after, '%b %d %H:%M:%S %Y %Z' ).date() issuer = cert.get('issuer') if issuer: issuer_dict = {} for item in issuer: for key, value in item: issuer_dict[key] = value ssl_info['ssl_issuer'] = ( issuer_dict.get('organizationName') or issuer_dict.get('commonName') or '' )[:100] except Exception: pass # SSL check failed — has_ssl stays False # Build issues list from errors issues = [] for error in result.get('errors', []): issues.append({ 'severity': 'error', 'message': error, }) # Get first H1 text h1_texts = headings.get('h1_texts', []) h1_text = h1_texts[0] if h1_texts else None session.execute(upsert_query, { 'company_id': company_id, 'analyzed_at': result['audit_date'], 'website_url': result.get('website_url'), 'final_url': result.get('final_url'), 'http_status_code': result.get('http_status'), 'load_time_ms': result.get('load_time_ms'), # PageSpeed scores 'pagespeed_seo_score': ps_scores.get('seo'), 'pagespeed_performance_score': ps_scores.get('performance'), 'pagespeed_accessibility_score': ps_scores.get('accessibility'), 'pagespeed_best_practices_score': ps_scores.get('best_practices'), 'pagespeed_audits': json.dumps(pagespeed.get('audits', {})) if pagespeed else None, # On-page SEO 'meta_title': meta_tags.get('title', '')[:500] if meta_tags.get('title') else None, 'meta_description': meta_tags.get('description'), 'meta_keywords': meta_tags.get('keywords'), 'h1_count': headings.get('h1_count'), 'h2_count': headings.get('h2_count'), 'h3_count': headings.get('h3_count'), 'h1_text': h1_text[:500] if h1_text else None, 'total_images': images.get('total_images'), 'images_without_alt': images.get('images_without_alt'), 'images_with_alt': images.get('images_with_alt'), 'internal_links_count': links.get('internal_links'), 'external_links_count': links.get('external_links'), 'broken_links_count': links.get('broken_links'), # May be None if not checked 'has_structured_data': structured_data.get('has_structured_data', False), 'structured_data_types': structured_data.get('all_types', []), 'structured_data_json': json.dumps(structured_data.get('json_ld_data', [])) if structured_data.get('json_ld_data') else None, # Technical SEO 'has_canonical': canonical.get('has_canonical', False), 'canonical_url': canonical.get('canonical_url', '')[:500] if canonical.get('canonical_url') else None, 'is_indexable': indexability.get('is_indexable', True), 'noindex_reason': indexability.get('noindex_source'), 'has_sitemap': sitemap.get('exists', False), 'has_robots_txt': robots.get('exists', False), # Viewport and mobile-friendliness derived from meta_tags 'viewport_configured': bool(meta_tags.get('viewport')), 'is_mobile_friendly': 'width=device-width' in (meta_tags.get('viewport') or '').lower(), # SSL 'has_ssl': ssl_info['has_ssl'], 'ssl_expires_at': ssl_info['ssl_expires_at'], 'ssl_issuer': ssl_info['ssl_issuer'], # Core Web Vitals 'largest_contentful_paint_ms': cwv.get('lcp_ms'), 'interaction_to_next_paint_ms': cwv.get('inp_ms'), 'cumulative_layout_shift': cwv.get('cls'), # Open Graph 'has_og_tags': bool(og.get('og_title')), 'og_title': og.get('og_title', '')[:500] if og.get('og_title') else None, 'og_description': og.get('og_description'), 'og_image': og.get('og_image', '')[:500] if og.get('og_image') else None, 'has_twitter_cards': bool(tc.get('card_type')), # Language & International 'html_lang': onpage.get('lang_attribute', '')[:10] if onpage.get('lang_attribute') else None, 'has_hreflang': onpage.get('has_hreflang', False), # Detected by analyzer if present # Word count 'word_count_homepage': onpage.get('word_count'), # Audit metadata 'seo_audit_version': result.get('audit_version'), 'seo_audited_at': result['audit_date'], 'seo_audit_errors': result.get('errors', []), 'seo_overall_score': result.get('scores', {}).get('overall_seo'), 'seo_health_score': self._calculate_onpage_score(onpage) if onpage else None, 'seo_issues': json.dumps(issues) if issues else None, # Local SEO 'local_seo_score': (result.get('local_seo') or {}).get('local_seo_score'), 'has_local_business_schema': (result.get('local_seo') or {}).get('has_local_business_schema', False), 'local_business_schema_fields': json.dumps((result.get('local_seo') or {}).get('local_business_schema_fields', {})), 'nap_on_website': json.dumps((result.get('local_seo') or {}).get('nap_on_website', {})), 'has_google_maps_embed': (result.get('local_seo') or {}).get('has_google_maps_embed', False), 'has_local_keywords': (result.get('local_seo') or {}).get('has_local_keywords', False), 'local_keywords_found': json.dumps((result.get('local_seo') or {}).get('local_keywords_found', [])), # Citations 'citations_found': json.dumps(result.get('citations', [])), 'citations_count': sum(1 for c in result.get('citations', []) if c.get('status') == 'found'), # Freshness 'content_freshness_score': (result.get('freshness') or {}).get('content_freshness_score'), 'last_content_update': (result.get('freshness') or {}).get('last_content_update'), }) # Save individual citations for citation in result.get('citations', []): if citation.get('directory_name'): citation_upsert = text(""" INSERT INTO company_citations ( company_id, directory_name, directory_url, listing_url, status, checked_at ) VALUES ( :company_id, :directory_name, :directory_url, :listing_url, :status, NOW() ) ON CONFLICT (company_id, directory_name) DO UPDATE SET listing_url = EXCLUDED.listing_url, status = EXCLUDED.status, checked_at = NOW() """) session.execute(citation_upsert, { 'company_id': company_id, 'directory_name': citation['directory_name'], 'directory_url': citation.get('directory_url'), 'listing_url': citation.get('listing_url'), 'status': citation.get('status', 'unknown'), }) session.commit() logger.info(f" Saved SEO audit for company {company_id}") return True except Exception as e: logger.error(f"Failed to save audit result for company {result.get('company_id')}: {e}") return False def run_audit(self, company_ids: Optional[List[int]] = None, batch_start: Optional[int] = None, batch_end: Optional[int] = None, dry_run: bool = False) -> Dict[str, Any]: """ Run SEO audit for specified companies. Args: company_ids: List of specific company IDs to audit. batch_start: Start index for batch processing. batch_end: End index for batch processing. dry_run: If True, print results without saving to database. Returns: Summary dict with success/failed counts and results. """ start_time = time_module.time() companies = self.get_companies(company_ids, batch_start, batch_end) if not companies: logger.warning("No companies found matching the specified criteria") return { 'total': 0, 'success': 0, 'failed': 0, 'skipped': 0, 'no_website': 0, 'unavailable': 0, 'timeout': 0, 'quota_remaining': self.pagespeed_client.get_remaining_quota(), 'duration_seconds': 0, 'results': [], } summary = { 'total': len(companies), 'success': 0, 'failed': 0, 'skipped': 0, 'no_website': 0, # Companies without website URL 'unavailable': 0, # Websites that returned 4xx/5xx 'timeout': 0, # Websites that timed out 'ssl_errors': 0, # SSL certificate issues 'connection_errors': 0, # Connection refused/DNS errors 'quota_exceeded': False, 'quota_remaining': self.pagespeed_client.get_remaining_quota(), 'quota_start': self.pagespeed_client.get_remaining_quota(), 'results': [], } logger.info("=" * 60) logger.info(f"SEO AUDIT STARTING") logger.info("=" * 60) logger.info(f"Companies to audit: {len(companies)}") logger.info(f"Mode: {'DRY RUN (no database writes)' if dry_run else 'LIVE'}") logger.info(f"PageSpeed API quota remaining: {summary['quota_remaining']}") logger.info("=" * 60) for i, company in enumerate(companies, 1): # Progress estimation elapsed = time_module.time() - start_time if i > 1: avg_time_per_company = elapsed / (i - 1) remaining_companies = len(companies) - i + 1 eta_seconds = avg_time_per_company * remaining_companies eta_str = str(timedelta(seconds=int(eta_seconds))) else: eta_str = "calculating..." logger.info("") logger.info(f"[{i}/{len(companies)}] {company['name']} (ID: {company['id']}) - ETA: {eta_str}") # Check for quota before proceeding current_quota = self.pagespeed_client.get_remaining_quota() if current_quota <= 0: logger.warning(f" PageSpeed quota exhausted, skipping PageSpeed analysis") summary['quota_exceeded'] = True try: result = self.audit_company(company) # Categorize the result based on errors result_status = self._categorize_result(result) if result_status == 'no_website': summary['no_website'] += 1 summary['skipped'] += 1 logger.info(f" → SKIPPED: No website URL configured") elif result_status == 'unavailable': summary['unavailable'] += 1 summary['failed'] += 1 logger.warning(f" → UNAVAILABLE: HTTP {result.get('http_status')}") elif result_status == 'timeout': summary['timeout'] += 1 summary['failed'] += 1 logger.warning(f" → TIMEOUT: Website did not respond") elif result_status == 'ssl_error': summary['ssl_errors'] += 1 # Still count as success if we got data via HTTP fallback if result.get('onpage'): summary['success'] += 1 logger.info(f" → SUCCESS (with SSL warning)") else: summary['failed'] += 1 logger.warning(f" → FAILED: SSL error, no fallback data") elif result_status == 'connection_error': summary['connection_errors'] += 1 summary['failed'] += 1 logger.warning(f" → FAILED: Connection error") else: summary['success'] += 1 score = result.get('scores', {}).get('overall_seo') logger.info(f" → SUCCESS: Overall SEO score: {score}") # Save to database or print in dry-run mode if not dry_run: if result_status not in ('no_website',): if self.save_audit_result(result): logger.debug(f" Saved to database") else: logger.error(f" Failed to save to database") else: self._print_dry_run_result(company, result) # Build result entry summary['results'].append({ 'company_id': company['id'], 'company_name': company['name'], 'status': result_status, 'overall_score': result.get('scores', {}).get('overall_seo'), 'pagespeed_seo': result.get('scores', {}).get('pagespeed_seo'), 'http_status': result.get('http_status'), 'load_time_ms': result.get('load_time_ms'), 'errors_count': len(result.get('errors', [])), 'errors': result.get('errors', []), }) except QuotaExceededError: logger.error(f" PageSpeed API quota exceeded!") summary['quota_exceeded'] = True summary['skipped'] += 1 summary['results'].append({ 'company_id': company['id'], 'company_name': company['name'], 'status': 'quota_exceeded', 'error': 'PageSpeed API quota exceeded', }) except Exception as e: logger.error(f" Unexpected error: {e}") summary['failed'] += 1 summary['results'].append({ 'company_id': company['id'], 'company_name': company['name'], 'status': 'error', 'error': str(e), }) # Final summary summary['quota_remaining'] = self.pagespeed_client.get_remaining_quota() summary['quota_used'] = summary['quota_start'] - summary['quota_remaining'] summary['duration_seconds'] = int(time_module.time() - start_time) return summary def _categorize_result(self, result: Dict[str, Any]) -> str: """ Categorize audit result based on errors encountered. Returns one of: 'success', 'no_website', 'unavailable', 'timeout', 'ssl_error', 'connection_error', 'error' """ errors = result.get('errors', []) error_text = ' '.join(errors).lower() # No website URL if 'no website url' in error_text: return 'no_website' # Timeout if 'timeout' in error_text: return 'timeout' # Connection errors if 'connection error' in error_text or 'connection refused' in error_text: return 'connection_error' # SSL errors (without successful fallback) if 'ssl error' in error_text: return 'ssl_error' # HTTP errors (4xx, 5xx) http_status = result.get('http_status') if http_status and http_status >= 400: return 'unavailable' # If we have errors but also have data, it's partial success if errors and not result.get('onpage') and not result.get('technical'): return 'error' return 'success' def _print_dry_run_result(self, company: Dict, result: Dict[str, Any]) -> None: """Print formatted result in dry-run mode.""" print("\n" + "-" * 60) print(f"Company: {company['name']} (ID: {company['id']})") print(f"Website: {result.get('website_url') or 'Not configured'}") if result.get('http_status'): print(f"HTTP Status: {result.get('http_status')}") if result.get('load_time_ms'): print(f"Load Time: {result.get('load_time_ms')}ms") if result.get('final_url') and result.get('final_url') != result.get('website_url'): print(f"Final URL (after redirects): {result.get('final_url')}") scores = result.get('scores', {}) if any(scores.values()): print(f"\nScores:") if scores.get('overall_seo') is not None: print(f" Overall SEO: {scores.get('overall_seo')}") if scores.get('pagespeed_seo') is not None: print(f" PageSpeed SEO: {scores.get('pagespeed_seo')}") if scores.get('pagespeed_performance') is not None: print(f" PageSpeed Performance: {scores.get('pagespeed_performance')}") if scores.get('pagespeed_accessibility') is not None: print(f" PageSpeed Accessibility: {scores.get('pagespeed_accessibility')}") if scores.get('pagespeed_best_practices') is not None: print(f" PageSpeed Best Practices: {scores.get('pagespeed_best_practices')}") # On-page summary onpage = result.get('onpage', {}) if onpage: print(f"\nOn-Page SEO:") meta = onpage.get('meta_tags', {}) if meta.get('title'): print(f" Title: {meta.get('title')[:60]}...") headings = onpage.get('headings', {}) print(f" H1 count: {headings.get('h1_count', 0)}") images = onpage.get('images', {}) if images.get('total_images'): print(f" Images: {images.get('total_images')} total, {images.get('images_without_alt', 0)} missing alt") structured = onpage.get('structured_data', {}) print(f" Structured Data: {'Yes' if structured.get('has_structured_data') else 'No'}") # Technical SEO summary technical = result.get('technical', {}) if technical: print(f"\nTechnical SEO:") robots = technical.get('robots_txt', {}) print(f" robots.txt: {'Yes' if robots.get('exists') else 'No'}") sitemap = technical.get('sitemap', {}) print(f" sitemap.xml: {'Yes' if sitemap.get('exists') else 'No'}") indexability = technical.get('indexability', {}) print(f" Indexable: {'Yes' if indexability.get('is_indexable', True) else 'No'}") if result.get('errors'): print(f"\nIssues ({len(result['errors'])}):") for err in result['errors'][:5]: # Show first 5 errors print(f" ⚠ {err}") if len(result['errors']) > 5: print(f" ... and {len(result['errors']) - 5} more") print("-" * 60) def parse_batch_argument(batch_str: str) -> Tuple[int, int]: """ Parse batch argument in format 'START-END'. Args: batch_str: String like '1-10' or '5-20' Returns: Tuple of (start, end) integers Raises: ValueError: If format is invalid """ if '-' not in batch_str: raise ValueError(f"Invalid batch format '{batch_str}'. Use START-END (e.g., 1-10)") parts = batch_str.split('-') if len(parts) != 2: raise ValueError(f"Invalid batch format '{batch_str}'. Use START-END (e.g., 1-10)") try: start = int(parts[0].strip()) end = int(parts[1].strip()) except ValueError: raise ValueError(f"Invalid batch values '{batch_str}'. START and END must be numbers") if start < 1: raise ValueError(f"Invalid batch start '{start}'. Must be >= 1") if end < start: raise ValueError(f"Invalid batch range '{start}-{end}'. END must be >= START") return start, end def print_summary(summary: Dict[str, Any], dry_run: bool = False) -> None: """Print formatted audit summary.""" duration = summary.get('duration_seconds', 0) duration_str = str(timedelta(seconds=duration)) print("\n") print("=" * 70) print(" SEO AUDIT COMPLETE") print("=" * 70) print("") print(f" Mode: {'DRY RUN' if dry_run else 'LIVE'}") print(f" Duration: {duration_str}") print("") print("-" * 70) print(" RESULTS BREAKDOWN") print("-" * 70) print(f" Total companies: {summary['total']}") print(f" ✓ Successful: {summary['success']}") print(f" ✗ Failed: {summary['failed']}") print(f" ○ Skipped: {summary['skipped']}") print("") # Edge case breakdown if summary.get('no_website', 0) > 0: print(f" - No website: {summary['no_website']}") if summary.get('unavailable', 0) > 0: print(f" - Unavailable: {summary['unavailable']}") if summary.get('timeout', 0) > 0: print(f" - Timeout: {summary['timeout']}") if summary.get('ssl_errors', 0) > 0: print(f" - SSL errors: {summary['ssl_errors']}") if summary.get('connection_errors', 0) > 0: print(f" - Connection errors: {summary['connection_errors']}") print("") print("-" * 70) print(" PAGESPEED API QUOTA") print("-" * 70) print(f" Quota at start: {summary.get('quota_start', 'N/A')}") print(f" Quota used: {summary.get('quota_used', 'N/A')}") print(f" Quota remaining: {summary.get('quota_remaining', 'N/A')}") if summary.get('quota_exceeded'): print(" ⚠ WARNING: Quota was exceeded during this run!") # Score distribution results = summary.get('results', []) scores = [r.get('overall_score') for r in results if r.get('overall_score') is not None] if scores: avg_score = sum(scores) / len(scores) print("") print("-" * 70) print(" SEO SCORE DISTRIBUTION") print("-" * 70) print(f" Companies with scores: {len(scores)}") print(f" Average SEO score: {avg_score:.1f}") print(f" Highest score: {max(scores)}") print(f" Lowest score: {min(scores)}") print("") # Score ranges with visual bars excellent = sum(1 for s in scores if s >= 90) good = sum(1 for s in scores if 70 <= s < 90) fair = sum(1 for s in scores if 50 <= s < 70) poor = sum(1 for s in scores if s < 50) max_bar = 30 total = len(scores) def bar(count, total, max_bar=30): if total == 0: return "" width = int((count / total) * max_bar) return "█" * width + "░" * (max_bar - width) print(f" Excellent (90-100): {excellent:3d} {bar(excellent, total)}") print(f" Good (70-89): {good:3d} {bar(good, total)}") print(f" Fair (50-69): {fair:3d} {bar(fair, total)}") print(f" Poor (<50): {poor:3d} {bar(poor, total)}") # List failed companies failed_results = [r for r in results if r.get('status') in ('unavailable', 'timeout', 'connection_error', 'error')] if failed_results: print("") print("-" * 70) print(" FAILED AUDITS") print("-" * 70) for r in failed_results[:10]: # Show first 10 status_icon = { 'unavailable': '🔴', 'timeout': '⏱', 'connection_error': '🔌', 'error': '❌', }.get(r['status'], '?') errors = r.get('errors', []) error_msg = errors[0][:50] if errors else r.get('status', 'Unknown') print(f" {status_icon} {r['company_name'][:30]:<30} - {error_msg}") if len(failed_results) > 10: print(f" ... and {len(failed_results) - 10} more") print("") print("=" * 70) def main(): """Main entry point for CLI usage.""" parser = argparse.ArgumentParser( description='SEO Audit for Norda Biznes member websites', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python seo_audit.py --company-id 26 # Audit single company python seo_audit.py --batch 1-10 # Audit companies 1-10 python seo_audit.py --all # Audit all companies python seo_audit.py --company-id 26 --dry-run # Test without saving python seo_audit.py --all --json > report.json # Export to JSON Exit codes: 0 - All audits completed successfully 1 - Argument error or invalid input 2 - Partial failures (some audits failed) 3 - All audits failed 4 - Database connection error 5 - API quota exceeded """ ) # Selection arguments (mutually exclusive in practice) selection = parser.add_argument_group('Company Selection (choose one)') selection.add_argument('--company-id', type=int, metavar='ID', help='Audit single company by ID') selection.add_argument('--company-ids', type=str, metavar='IDS', help='Audit multiple companies by IDs (comma-separated, e.g., 1,5,10)') selection.add_argument('--batch', type=str, metavar='RANGE', help='Audit batch of companies by row offset (e.g., 1-10)') selection.add_argument('--all', action='store_true', help='Audit all companies') # Options options = parser.add_argument_group('Options') options.add_argument('--dry-run', action='store_true', help='Print results without saving to database') options.add_argument('--verbose', '-v', action='store_true', help='Enable verbose/debug output') options.add_argument('--quiet', '-q', action='store_true', help='Suppress progress output (only show summary)') options.add_argument('--json', action='store_true', help='Output results as JSON (for scripting)') options.add_argument('--database-url', type=str, metavar='URL', help='Database connection URL (overrides DATABASE_URL env var)') args = parser.parse_args() # Configure logging level if args.quiet: logging.getLogger().setLevel(logging.WARNING) elif args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Validate that at least one selection method is provided selection_count = sum([ args.company_id is not None, args.company_ids is not None, args.batch is not None, args.all ]) if selection_count == 0: parser.print_help() print("\n❌ Error: Please specify one of --company-id, --company-ids, --batch, or --all") sys.exit(EXIT_ARGUMENT_ERROR) if selection_count > 1: print("❌ Error: Please specify only one selection method (--company-id, --company-ids, --batch, or --all)") sys.exit(EXIT_ARGUMENT_ERROR) # Parse batch argument if provided batch_start, batch_end = None, None if args.batch: try: batch_start, batch_end = parse_batch_argument(args.batch) except ValueError as e: print(f"❌ Error: {e}") sys.exit(EXIT_ARGUMENT_ERROR) # Parse company IDs if provided company_ids = None if args.company_id: company_ids = [args.company_id] elif args.company_ids: try: company_ids = [int(x.strip()) for x in args.company_ids.split(',')] if not company_ids: raise ValueError("Empty list") except ValueError: print(f"❌ Error: Invalid --company-ids format. Use comma-separated integers (e.g., 1,5,10)") sys.exit(EXIT_ARGUMENT_ERROR) # Determine database URL database_url = args.database_url or DATABASE_URL # Initialize auditor try: auditor = SEOAuditor(database_url=database_url) except SQLAlchemyError as e: logger.error(f"Failed to connect to database: {e}") print(f"❌ Error: Database connection failed: {e}") sys.exit(EXIT_DATABASE_ERROR) except Exception as e: logger.error(f"Failed to initialize auditor: {e}") print(f"❌ Error: Failed to initialize SEO auditor: {e}") sys.exit(EXIT_DATABASE_ERROR) # Run audit try: summary = auditor.run_audit( company_ids=company_ids, batch_start=batch_start, batch_end=batch_end, dry_run=args.dry_run ) except QuotaExceededError: logger.error("PageSpeed API quota exceeded") print("❌ Error: PageSpeed API quota exceeded. Try again tomorrow.") sys.exit(EXIT_QUOTA_EXCEEDED) except SQLAlchemyError as e: logger.error(f"Database error during audit: {e}") print(f"❌ Error: Database error: {e}") sys.exit(EXIT_DATABASE_ERROR) except Exception as e: logger.error(f"Unexpected error during audit: {e}") print(f"❌ Error: Unexpected error: {e}") sys.exit(EXIT_ALL_FAILED) # Output results if args.json: print(json.dumps(summary, default=str, indent=2)) else: print_summary(summary, dry_run=args.dry_run) # Determine exit code if summary['total'] == 0: logger.warning("No companies found to audit") sys.exit(EXIT_ARGUMENT_ERROR) elif summary.get('quota_exceeded'): sys.exit(EXIT_QUOTA_EXCEEDED) elif summary['failed'] == summary['total'] - summary['skipped']: sys.exit(EXIT_ALL_FAILED) elif summary['failed'] > 0: sys.exit(EXIT_PARTIAL_FAILURES) else: sys.exit(EXIT_SUCCESS) if __name__ == '__main__': main()