nordabiz/scripts/competitor_monitor_cron.py
Maciej Pienczyn 02696c59a4
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: Add new services, scripts, and competitor dashboard
- google_places_service.py: Google Places API integration
- competitor_monitoring_service.py: Competitor tracking service
- scripts/competitor_monitor_cron.py, scripts/generate_audit_report.py
- blueprints/admin/routes_competitors.py, templates/admin/competitor_dashboard.html

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 12:00:54 +01:00

185 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
Competitor Monitor Cron Job
===========================
Periodically takes snapshots of competitors for all companies with
Google Place IDs. Designed to run weekly via cron.
Usage:
python competitor_monitor_cron.py
python competitor_monitor_cron.py --company-id 26
python competitor_monitor_cron.py --discover # Auto-discover competitors first
Cron entry (weekly, Sunday 3 AM):
0 3 * * 0 cd /var/www/nordabiznes && /var/www/nordabiznes/venv/bin/python3 scripts/competitor_monitor_cron.py >> /var/log/nordabiznes/competitor_monitor.log 2>&1
Author: NordaBiz Development Team
Created: 2026-02-06
"""
import os
import sys
import argparse
import logging
from datetime import datetime
from pathlib import Path
# Load .env file from project root
try:
from dotenv import load_dotenv
script_dir = Path(__file__).resolve().parent
project_root = script_dir.parent
env_path = project_root / '.env'
if env_path.exists():
load_dotenv(env_path)
except ImportError:
pass
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from database import Company, CompanyCompetitor, CompanyWebsiteAnalysis
from competitor_monitoring_service import CompetitorMonitoringService
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
DATABASE_URL = os.getenv(
'DATABASE_URL',
'postgresql://nordabiz_app:CHANGE_ME@127.0.0.1:5432/nordabiz'
)
def run_snapshots(session, company_ids=None):
"""Take snapshots for competitors."""
service = CompetitorMonitoringService(session)
if company_ids:
companies = session.query(Company).filter(Company.id.in_(company_ids)).all()
else:
# Get companies that have competitors tracked
companies_with_competitors = session.query(CompanyCompetitor.company_id).distinct().all()
company_ids_with_competitors = [c[0] for c in companies_with_competitors]
companies = session.query(Company).filter(Company.id.in_(company_ids_with_competitors)).all()
total_results = {
'companies_processed': 0,
'total_snapshots': 0,
'total_changes': 0,
'errors': 0,
}
for company in companies:
logger.info(f"Processing competitors for: {company.name} (ID: {company.id})")
try:
results = service.take_all_snapshots(company.id)
total_results['companies_processed'] += 1
total_results['total_snapshots'] += results['success']
total_results['total_changes'] += results['changes_detected']
total_results['errors'] += results['failed']
logger.info(
f" Snapshots: {results['success']}/{results['total']}, "
f"Changes: {results['changes_detected']}"
)
except Exception as e:
logger.error(f" Error: {e}")
total_results['errors'] += 1
return total_results
def run_discovery(session, company_ids=None, max_per_company=5):
"""Discover competitors for companies."""
service = CompetitorMonitoringService(session)
if company_ids:
companies = session.query(Company).filter(Company.id.in_(company_ids)).all()
else:
# Companies with Google Place IDs but no competitors
companies_with_place = session.query(Company).join(
CompanyWebsiteAnalysis,
Company.id == CompanyWebsiteAnalysis.company_id
).filter(
CompanyWebsiteAnalysis.google_place_id.isnot(None)
).all()
# Filter out those that already have competitors
companies = []
for company in companies_with_place:
existing = session.query(CompanyCompetitor).filter(
CompanyCompetitor.company_id == company.id
).count()
if existing == 0:
companies.append(company)
total_discovered = 0
for company in companies:
logger.info(f"Discovering competitors for: {company.name} (ID: {company.id})")
try:
competitors = service.discover_competitors(company.id, max_results=max_per_company)
saved = service.save_competitors(company.id, competitors)
total_discovered += saved
logger.info(f" Found {len(competitors)} competitors, saved {saved} new")
except Exception as e:
logger.error(f" Discovery error: {e}")
return total_discovered
def main():
parser = argparse.ArgumentParser(description='Competitor Monitor Cron Job')
parser.add_argument('--company-id', type=int, help='Process single company')
parser.add_argument('--discover', action='store_true', help='Discover competitors first')
parser.add_argument('--max-competitors', type=int, default=5, help='Max competitors per company')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
company_ids = [args.company_id] if args.company_id else None
try:
logger.info("=" * 60)
logger.info("COMPETITOR MONITOR CRON JOB")
logger.info(f"Started at: {datetime.now()}")
logger.info("=" * 60)
if args.discover:
logger.info("\n--- COMPETITOR DISCOVERY ---")
discovered = run_discovery(session, company_ids, args.max_competitors)
logger.info(f"Total new competitors discovered: {discovered}")
logger.info("\n--- COMPETITOR SNAPSHOTS ---")
results = run_snapshots(session, company_ids)
logger.info("\n" + "=" * 60)
logger.info("SUMMARY")
logger.info("=" * 60)
logger.info(f"Companies processed: {results['companies_processed']}")
logger.info(f"Snapshots taken: {results['total_snapshots']}")
logger.info(f"Changes detected: {results['total_changes']}")
logger.info(f"Errors: {results['errors']}")
logger.info("=" * 60)
finally:
session.close()
if __name__ == '__main__':
main()