nordabiz/competitor_monitoring_service.py
Maciej Pienczyn 02696c59a4
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: Add new services, scripts, and competitor dashboard
- google_places_service.py: Google Places API integration
- competitor_monitoring_service.py: Competitor tracking service
- scripts/competitor_monitor_cron.py, scripts/generate_audit_report.py
- blueprints/admin/routes_competitors.py, templates/admin/competitor_dashboard.html

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 12:00:54 +01:00

384 lines
15 KiB
Python

"""
Competitor Monitoring Service for NordaBiz
==========================================
Discovers and monitors competitors via Google Places API.
Tracks changes in ratings, reviews, and profile activity.
Author: NordaBiz Development Team
Created: 2026-02-06
"""
import os
import json
import logging
from datetime import datetime, date, timedelta
from typing import Optional, Dict, List, Any
from decimal import Decimal
from sqlalchemy.orm import Session
from database import (
Company, CompanyCompetitor, CompetitorSnapshot,
CompanyWebsiteAnalysis, SessionLocal
)
try:
from google_places_service import GooglePlacesService
except ImportError:
GooglePlacesService = None
logger = logging.getLogger(__name__)
class CompetitorMonitoringService:
"""Discovers and monitors competitors for companies."""
def __init__(self, db: Session):
self.db = db
self._places_service = None
@property
def places_service(self) -> Optional['GooglePlacesService']:
"""Lazy-init Google Places service."""
if self._places_service is None and GooglePlacesService:
try:
self._places_service = GooglePlacesService()
except ValueError:
logger.warning("Google Places API key not configured")
return self._places_service
def discover_competitors(self, company_id: int, max_results: int = 10) -> List[Dict[str, Any]]:
"""
Find competitors via Google Places Nearby Search.
Uses the company's Google Place data to find similar businesses nearby.
"""
if not self.places_service:
logger.error("Google Places API not available for competitor discovery")
return []
company = self.db.query(Company).filter(Company.id == company_id).first()
if not company:
raise ValueError(f"Company {company_id} not found")
# Get company's Google location
analysis = self.db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company_id
).first()
place_id = analysis.google_place_id if analysis else None
if not place_id:
logger.warning(f"Company {company_id} has no Google Place ID, searching by name")
# Search for the company first
place = self.places_service.search_place(
f"{company.name} {company.address_city or 'Wejherowo'}"
)
if not place:
return []
place_id = place.get('id', '').replace('places/', '')
# Get place details to get location
place_data = self.places_service.get_place_details(place_id)
if not place_data:
return []
location = place_data.get('location', {})
latitude = location.get('latitude')
longitude = location.get('longitude')
primary_type = place_data.get('primaryType')
if not latitude or not longitude:
logger.error(f"No location data for company {company_id}")
return []
# Search nearby for similar businesses
included_types = [primary_type] if primary_type else None
nearby = self.places_service.search_nearby(
latitude=latitude,
longitude=longitude,
radius=5000.0,
included_types=included_types,
max_results=max_results + 1 # +1 because our own business may appear
)
# Filter out own business and save competitors
competitors = []
for place in nearby:
competitor_place_id = place.get('id', '').replace('places/', '')
if competitor_place_id == place_id:
continue # Skip own business
display_name = place.get('displayName', {})
name = display_name.get('text', '') if isinstance(display_name, dict) else str(display_name)
competitor_data = {
'competitor_place_id': competitor_place_id,
'competitor_name': name,
'competitor_address': place.get('formattedAddress', ''),
'competitor_rating': place.get('rating'),
'competitor_review_count': place.get('userRatingCount'),
'competitor_category': place.get('primaryType', ''),
'competitor_website': place.get('websiteUri', ''),
}
competitors.append(competitor_data)
if len(competitors) >= max_results:
break
return competitors
def save_competitors(self, company_id: int, competitors: List[Dict]) -> int:
"""Save discovered competitors to database. Returns count saved."""
saved = 0
for comp_data in competitors:
existing = self.db.query(CompanyCompetitor).filter(
CompanyCompetitor.company_id == company_id,
CompanyCompetitor.competitor_place_id == comp_data['competitor_place_id']
).first()
if existing:
# Update existing
existing.competitor_name = comp_data.get('competitor_name') or existing.competitor_name
existing.competitor_rating = comp_data.get('competitor_rating') or existing.competitor_rating
existing.competitor_review_count = comp_data.get('competitor_review_count') or existing.competitor_review_count
existing.updated_at = datetime.now()
else:
# Create new
competitor = CompanyCompetitor(
company_id=company_id,
competitor_place_id=comp_data['competitor_place_id'],
competitor_name=comp_data.get('competitor_name'),
competitor_address=comp_data.get('competitor_address'),
competitor_rating=comp_data.get('competitor_rating'),
competitor_review_count=comp_data.get('competitor_review_count'),
competitor_category=comp_data.get('competitor_category'),
competitor_website=comp_data.get('competitor_website'),
added_by='auto',
)
self.db.add(competitor)
saved += 1
self.db.commit()
return saved
def take_snapshot(self, competitor_id: int) -> Optional[CompetitorSnapshot]:
"""
Take a snapshot of a competitor's current state.
Compares with previous snapshot to detect changes.
"""
competitor = self.db.query(CompanyCompetitor).filter(
CompanyCompetitor.id == competitor_id
).first()
if not competitor or not competitor.is_active:
return None
if not self.places_service:
logger.error("Google Places API not available for snapshots")
return None
# Fetch current data from Google
place_data = self.places_service.get_place_details(competitor.competitor_place_id)
if not place_data:
logger.warning(f"Could not fetch data for competitor {competitor.competitor_name}")
return None
today = date.today()
# Check if snapshot already exists for today
existing = self.db.query(CompetitorSnapshot).filter(
CompetitorSnapshot.competitor_id == competitor_id,
CompetitorSnapshot.snapshot_date == today
).first()
if existing:
logger.info(f"Snapshot already exists for competitor {competitor_id} on {today}")
return existing
# Build snapshot data
photos = place_data.get('photos', [])
display_name = place_data.get('displayName', {})
snapshot_data = {
'name': display_name.get('text', '') if isinstance(display_name, dict) else str(display_name),
'address': place_data.get('formattedAddress', ''),
'rating': place_data.get('rating'),
'review_count': place_data.get('userRatingCount', 0),
'photo_count': len(photos),
'business_status': place_data.get('businessStatus', ''),
'types': place_data.get('types', []),
'website': place_data.get('websiteUri', ''),
}
# Get previous snapshot for change detection
previous = self.db.query(CompetitorSnapshot).filter(
CompetitorSnapshot.competitor_id == competitor_id
).order_by(CompetitorSnapshot.snapshot_date.desc()).first()
changes = {}
if previous:
prev_data = previous.data or {}
if snapshot_data.get('rating') != prev_data.get('rating'):
changes['rating'] = {
'old': prev_data.get('rating'),
'new': snapshot_data.get('rating')
}
if snapshot_data.get('review_count', 0) != prev_data.get('review_count', 0):
changes['review_count'] = {
'old': prev_data.get('review_count', 0),
'new': snapshot_data.get('review_count', 0),
'delta': (snapshot_data.get('review_count', 0) or 0) - (prev_data.get('review_count', 0) or 0)
}
if snapshot_data.get('photo_count', 0) != prev_data.get('photo_count', 0):
changes['photo_count'] = {
'old': prev_data.get('photo_count', 0),
'new': snapshot_data.get('photo_count', 0)
}
# Create snapshot
snapshot = CompetitorSnapshot(
competitor_id=competitor_id,
snapshot_date=today,
rating=snapshot_data.get('rating'),
review_count=snapshot_data.get('review_count'),
photo_count=snapshot_data.get('photo_count'),
has_website=bool(snapshot_data.get('website')),
has_description=bool(place_data.get('editorialSummary')),
data=snapshot_data,
changes=changes if changes else None,
)
self.db.add(snapshot)
# Update competitor record with latest data
competitor.competitor_rating = snapshot_data.get('rating')
competitor.competitor_review_count = snapshot_data.get('review_count')
competitor.updated_at = datetime.now()
self.db.commit()
self.db.refresh(snapshot)
if changes:
logger.info(f"Changes detected for {competitor.competitor_name}: {list(changes.keys())}")
return snapshot
def take_all_snapshots(self, company_id: int) -> Dict[str, Any]:
"""Take snapshots for all active competitors of a company."""
competitors = self.db.query(CompanyCompetitor).filter(
CompanyCompetitor.company_id == company_id,
CompanyCompetitor.is_active == True
).all()
results = {
'total': len(competitors),
'success': 0,
'failed': 0,
'changes_detected': 0,
}
for competitor in competitors:
try:
snapshot = self.take_snapshot(competitor.id)
if snapshot:
results['success'] += 1
if snapshot.changes:
results['changes_detected'] += 1
else:
results['failed'] += 1
except Exception as e:
logger.error(f"Snapshot failed for competitor {competitor.id}: {e}")
results['failed'] += 1
return results
def get_changes_report(self, company_id: int, days: int = 30) -> List[Dict[str, Any]]:
"""Get competitor changes report for the last N days."""
since = date.today() - timedelta(days=days)
competitors = self.db.query(CompanyCompetitor).filter(
CompanyCompetitor.company_id == company_id,
CompanyCompetitor.is_active == True
).all()
report = []
for competitor in competitors:
snapshots = self.db.query(CompetitorSnapshot).filter(
CompetitorSnapshot.competitor_id == competitor.id,
CompetitorSnapshot.snapshot_date >= since
).order_by(CompetitorSnapshot.snapshot_date.asc()).all()
changes = []
for snap in snapshots:
if snap.changes:
changes.append({
'date': snap.snapshot_date.isoformat(),
'changes': snap.changes,
})
report.append({
'competitor_id': competitor.id,
'competitor_name': competitor.competitor_name,
'current_rating': float(competitor.competitor_rating) if competitor.competitor_rating else None,
'current_review_count': competitor.competitor_review_count,
'snapshots_count': len(snapshots),
'changes': changes,
})
return report
def get_comparison(self, company_id: int) -> Dict[str, Any]:
"""Compare company with its competitors."""
company = self.db.query(Company).filter(Company.id == company_id).first()
if not company:
return {}
analysis = self.db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company_id
).first()
company_data = {
'name': company.name,
'rating': float(analysis.google_rating) if analysis and analysis.google_rating else None,
'review_count': analysis.google_reviews_count if analysis else 0,
'photo_count': analysis.google_photos_count if analysis else 0,
}
competitors = self.db.query(CompanyCompetitor).filter(
CompanyCompetitor.company_id == company_id,
CompanyCompetitor.is_active == True
).all()
competitor_data = []
for comp in competitors:
competitor_data.append({
'name': comp.competitor_name,
'rating': float(comp.competitor_rating) if comp.competitor_rating else None,
'review_count': comp.competitor_review_count or 0,
})
# Calculate averages
ratings = [c['rating'] for c in competitor_data if c['rating']]
review_counts = [c['review_count'] for c in competitor_data]
return {
'company': company_data,
'competitors': competitor_data,
'avg_competitor_rating': round(sum(ratings) / len(ratings), 1) if ratings else None,
'avg_competitor_reviews': round(sum(review_counts) / len(review_counts)) if review_counts else 0,
'company_rank_by_rating': self._calculate_rank(company_data.get('rating'), ratings),
'company_rank_by_reviews': self._calculate_rank(company_data.get('review_count'), review_counts),
}
@staticmethod
def _calculate_rank(value, others: List) -> Optional[int]:
"""Calculate rank (1 = best) among competitors."""
if value is None or not others:
return None
all_values = sorted([value] + [v for v in others if v is not None], reverse=True)
try:
return all_values.index(value) + 1
except ValueError:
return None