Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
599 lines
22 KiB
Python
599 lines
22 KiB
Python
"""
|
|
Google Places API (New) Service for NordaBiz
|
|
=============================================
|
|
|
|
Comprehensive Google Places API client for fetching rich business data.
|
|
Uses the Places API (New) with field masks for efficient billing.
|
|
|
|
API Reference: https://developers.google.com/maps/documentation/places/web-service/op-overview
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Created: 2026-02-06
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Any
|
|
from decimal import Decimal
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# API Configuration
|
|
PLACES_API_BASE = "https://places.googleapis.com/v1/places"
|
|
PLACES_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
|
|
PLACES_NEARBY_URL = "https://places.googleapis.com/v1/places:searchNearby"
|
|
|
|
# Field masks grouped by billing tier
|
|
# Basic fields (no charge): id, displayName, formattedAddress, location, types, etc.
|
|
# Contact fields: nationalPhoneNumber, websiteUri, etc.
|
|
# Atmosphere fields: reviews, rating, etc.
|
|
BASIC_FIELDS = [
|
|
"id", "displayName", "formattedAddress", "location",
|
|
"types", "primaryType", "primaryTypeDisplayName",
|
|
"businessStatus", "googleMapsUri", "googleMapsLinks",
|
|
"utcOffsetMinutes", "adrFormatAddress",
|
|
"shortFormattedAddress"
|
|
]
|
|
|
|
CONTACT_FIELDS = [
|
|
"nationalPhoneNumber", "internationalPhoneNumber",
|
|
"websiteUri"
|
|
]
|
|
|
|
HOURS_FIELDS = [
|
|
"regularOpeningHours", "currentOpeningHours"
|
|
]
|
|
|
|
ATMOSPHERE_FIELDS = [
|
|
"rating", "userRatingCount", "reviews",
|
|
"priceLevel", "editorialSummary"
|
|
]
|
|
|
|
PHOTO_FIELDS = [
|
|
"photos"
|
|
]
|
|
|
|
ATTRIBUTE_FIELDS = [
|
|
"paymentOptions", "parkingOptions",
|
|
"accessibilityOptions", "outdoorSeating",
|
|
"liveMusic", "servesBreakfast", "servesLunch",
|
|
"servesDinner", "servesBeer", "servesWine",
|
|
"servesCoffee", "goodForChildren", "allowsDogs",
|
|
"restroom", "goodForGroups", "goodForWatchingSports",
|
|
"reservable", "delivery", "dineIn", "takeout",
|
|
"curbsidePickup"
|
|
]
|
|
|
|
|
|
class GooglePlacesService:
|
|
"""Fetches rich GBP data via Places API (New)."""
|
|
|
|
def __init__(self, api_key: str = None):
|
|
self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY')
|
|
if not self.api_key:
|
|
raise ValueError("GOOGLE_PLACES_API_KEY not set in environment")
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'X-Goog-Api-Key': self.api_key,
|
|
'Content-Type': 'application/json'
|
|
})
|
|
|
|
def _build_field_mask(self, include_reviews: bool = True,
|
|
include_photos: bool = True,
|
|
include_attributes: bool = True) -> str:
|
|
"""Build field mask string for API request."""
|
|
fields = BASIC_FIELDS + CONTACT_FIELDS + HOURS_FIELDS + ATMOSPHERE_FIELDS
|
|
if include_photos:
|
|
fields += PHOTO_FIELDS
|
|
if include_attributes:
|
|
fields += ATTRIBUTE_FIELDS
|
|
return ','.join(f'places.{f}' if '.' not in f else f for f in fields)
|
|
|
|
def get_place_details(self, place_id: str,
|
|
include_reviews: bool = True,
|
|
include_photos: bool = True,
|
|
include_attributes: bool = True) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch comprehensive place details by Place ID.
|
|
|
|
Args:
|
|
place_id: Google Place ID
|
|
include_reviews: Include reviews data (billed separately)
|
|
include_photos: Include photo references
|
|
include_attributes: Include business attributes
|
|
|
|
Returns:
|
|
Dict with place details or None on error
|
|
"""
|
|
url = f"{PLACES_API_BASE}/{place_id}"
|
|
|
|
# Build field mask
|
|
fields = list(BASIC_FIELDS + CONTACT_FIELDS + HOURS_FIELDS + ATMOSPHERE_FIELDS)
|
|
if include_photos:
|
|
fields += PHOTO_FIELDS
|
|
if include_attributes:
|
|
fields += ATTRIBUTE_FIELDS
|
|
|
|
field_mask = ','.join(fields)
|
|
|
|
headers = {
|
|
'X-Goog-FieldMask': field_mask
|
|
}
|
|
|
|
params = {
|
|
'languageCode': 'pl',
|
|
}
|
|
|
|
try:
|
|
response = self.session.get(url, headers=headers, params=params, timeout=15)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
logger.info(f"Fetched place details for {place_id}: {data.get('displayName', {}).get('text', 'unknown')}")
|
|
return data
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(f"Places API HTTP error for {place_id}: {e.response.status_code} - {e.response.text}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Places API request error for {place_id}: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def _tokenize_name(name: str) -> set:
|
|
"""Tokenize a company name into significant lowercase words."""
|
|
import re as _re
|
|
skip_words = {
|
|
'sp', 'z', 'o', 'oo', 'sa', 'sc', 'j', 'k', 'ul', 'i', 'w',
|
|
'do', 'na', 'po', 'ze', 'the', 'and', 'of', 'for', 'group',
|
|
}
|
|
# Split on non-alphanumeric, keep words; treat & as connector (P&P, S&K)
|
|
words = _re.findall(r'[a-ząćęłńóśźż0-9]+(?:&[a-ząćęłńóśźż0-9]+)*', name.lower())
|
|
return {w for w in words if len(w) > 1 and w not in skip_words}
|
|
|
|
@staticmethod
|
|
def _name_match_score(company_name: str, google_name: str) -> float:
|
|
"""
|
|
Compute name match score with prefix detection and bidirectional fallback.
|
|
|
|
1. If Google name starts with company name (word boundary) → 1.0
|
|
"INPI" matches "INPI - Infrastruktura IT" (same company, extra description)
|
|
"TERMO" does NOT match "TERMO-BUD" (compound name, no space separator)
|
|
2. Otherwise, bidirectional word matching with max() denominator.
|
|
"""
|
|
import re as _re
|
|
cn = company_name.strip().lower()
|
|
gn = google_name.strip().lower()
|
|
|
|
# Strip legal forms for prefix comparison (Sp. z o.o., S.A., Sp.j., etc.)
|
|
clean_cn = _re.sub(
|
|
r'\s*(sp\.?\s*z\.?\s*o\.?\s*o\.?|sp\.?\s*[jkp]\.?|s\.?\s*[ac]\.?)\s*\.?\s*$',
|
|
'', cn, flags=_re.IGNORECASE
|
|
).strip() or cn
|
|
|
|
# Also strip legal forms from Google name for reverse prefix check
|
|
clean_gn = _re.sub(
|
|
r'\s*(sp\.?\s*z\.?\s*o\.?\s*o\.?|sp\.?\s*[jkp]\.?|s\.?\s*[ac]\.?)\s*\.?\s*$',
|
|
'', gn, flags=_re.IGNORECASE
|
|
).strip() or gn
|
|
|
|
# Prefix check: company name at start of Google name, or vice versa,
|
|
# followed by space, period, comma, or end — NOT dash (compound names)
|
|
wb = r'(?:[\s.,;:]|$)'
|
|
if clean_cn and (_re.match(_re.escape(clean_cn) + wb, gn) or
|
|
_re.match(_re.escape(clean_gn) + wb, clean_cn)):
|
|
return 1.0
|
|
|
|
company_words = GooglePlacesService._tokenize_name(company_name)
|
|
google_words = GooglePlacesService._tokenize_name(google_name)
|
|
|
|
if not company_words:
|
|
return 0.0
|
|
|
|
if not google_words:
|
|
return 0.0
|
|
|
|
matched = company_words & google_words
|
|
# Score based on how many of OUR words are found in Google name
|
|
# (Google names are often longer — "Sprzedaż i Wynajem Wózków Widłowych" for "Sprzedaż Wózków")
|
|
return len(matched) / len(company_words)
|
|
|
|
def search_place(self, query: str, location_bias: Dict = None,
|
|
company_name: str = None) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Search for a place by text query.
|
|
|
|
Args:
|
|
query: Search text (e.g., "TERMO Wejherowo")
|
|
location_bias: Optional location bias {"latitude": 54.6, "longitude": 18.2, "radius": 5000}
|
|
company_name: Optional company name for result validation.
|
|
If provided, verifies the result name matches before returning.
|
|
|
|
Returns:
|
|
Best matching place or None
|
|
"""
|
|
body = {
|
|
"textQuery": query,
|
|
"languageCode": "pl",
|
|
"maxResultCount": 5
|
|
}
|
|
|
|
if location_bias:
|
|
body["locationBias"] = {
|
|
"circle": {
|
|
"center": {
|
|
"latitude": location_bias["latitude"],
|
|
"longitude": location_bias["longitude"]
|
|
},
|
|
"radius": location_bias.get("radius", 5000.0)
|
|
}
|
|
}
|
|
|
|
field_mask = ','.join(f'places.{f}' for f in ['id', 'displayName', 'formattedAddress', 'types', 'rating', 'userRatingCount', 'googleMapsUri'])
|
|
|
|
headers = {
|
|
'X-Goog-FieldMask': field_mask
|
|
}
|
|
|
|
try:
|
|
response = self.session.post(PLACES_SEARCH_URL, json=body, headers=headers, timeout=15)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
places = data.get('places', [])
|
|
if not places:
|
|
logger.warning(f"No places found for query: {query}")
|
|
return None
|
|
|
|
if not company_name:
|
|
return places[0]
|
|
|
|
# Validate: company name must significantly match Google result name.
|
|
# Uses word-boundary matching with minimum threshold:
|
|
# - Short names (1-2 significant words): ALL words must match
|
|
# - Longer names (3+ words): at least 50% of words must match
|
|
company_words = self._tokenize_name(company_name)
|
|
min_ratio = 1.0 if len(company_words) <= 2 else 0.5
|
|
|
|
best_place = None
|
|
best_score = 0.0
|
|
|
|
for place in places:
|
|
google_name = place.get('displayName', {}).get('text', '')
|
|
score = self._name_match_score(company_name, google_name)
|
|
if score >= min_ratio and score > best_score:
|
|
best_score = score
|
|
best_place = place
|
|
|
|
if best_place:
|
|
matched_name = best_place.get('displayName', {}).get('text', '')
|
|
logger.info(
|
|
f"Name match for '{company_name}': '{matched_name}' (score={best_score:.2f})"
|
|
)
|
|
return best_place
|
|
|
|
logger.warning(
|
|
f"No name match for '{company_name}' (min_ratio={min_ratio:.0%}) in Google results: "
|
|
f"{[p.get('displayName', {}).get('text', '') for p in places]}"
|
|
)
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Places search error for '{query}': {e}")
|
|
return None
|
|
|
|
def search_places_raw(self, query: str, location_bias: Dict = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for places and return ALL results (no name filtering).
|
|
Used for manual review/matching in admin panel.
|
|
"""
|
|
body = {
|
|
"textQuery": query,
|
|
"languageCode": "pl",
|
|
"maxResultCount": 5
|
|
}
|
|
if location_bias:
|
|
body["locationBias"] = {
|
|
"circle": {
|
|
"center": {
|
|
"latitude": location_bias["latitude"],
|
|
"longitude": location_bias["longitude"]
|
|
},
|
|
"radius": location_bias.get("radius", 5000.0)
|
|
}
|
|
}
|
|
|
|
field_mask = ','.join(f'places.{f}' for f in [
|
|
'id', 'displayName', 'formattedAddress', 'types',
|
|
'rating', 'userRatingCount', 'googleMapsUri'
|
|
])
|
|
headers = {'X-Goog-FieldMask': field_mask}
|
|
|
|
try:
|
|
response = self.session.post(PLACES_SEARCH_URL, json=body, headers=headers, timeout=15)
|
|
response.raise_for_status()
|
|
return response.json().get('places', [])
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Places raw search error for '{query}': {e}")
|
|
return []
|
|
|
|
def search_nearby(self, latitude: float, longitude: float,
|
|
radius: float = 5000.0,
|
|
included_types: List[str] = None,
|
|
max_results: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for nearby places (for competitor discovery).
|
|
|
|
Args:
|
|
latitude: Center point latitude
|
|
longitude: Center point longitude
|
|
radius: Search radius in meters
|
|
included_types: Filter by place types (e.g., ["restaurant"])
|
|
max_results: Maximum results to return
|
|
|
|
Returns:
|
|
List of nearby places
|
|
"""
|
|
body = {
|
|
"locationRestriction": {
|
|
"circle": {
|
|
"center": {
|
|
"latitude": latitude,
|
|
"longitude": longitude
|
|
},
|
|
"radius": radius
|
|
}
|
|
},
|
|
"maxResultCount": min(max_results, 20),
|
|
"languageCode": "pl"
|
|
}
|
|
|
|
if included_types:
|
|
body["includedTypes"] = included_types
|
|
|
|
field_mask = ','.join(f'places.{f}' for f in [
|
|
'id', 'displayName', 'formattedAddress', 'types',
|
|
'rating', 'userRatingCount', 'googleMapsUri',
|
|
'websiteUri', 'primaryType', 'photos',
|
|
'businessStatus', 'location'
|
|
])
|
|
|
|
headers = {
|
|
'X-Goog-FieldMask': field_mask
|
|
}
|
|
|
|
try:
|
|
response = self.session.post(PLACES_NEARBY_URL, json=body, headers=headers, timeout=15)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data.get('places', [])
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Nearby search error: {e}")
|
|
return []
|
|
|
|
def get_photo_url(self, photo_name: str, max_width: int = 400) -> str:
|
|
"""
|
|
Get photo URL from photo resource name.
|
|
|
|
Args:
|
|
photo_name: Photo resource name from place details
|
|
max_width: Maximum width in pixels
|
|
|
|
Returns:
|
|
Photo URL string
|
|
"""
|
|
return f"https://places.googleapis.com/v1/{photo_name}/media?maxWidthPx={max_width}&key={self.api_key}"
|
|
|
|
def extract_reviews_data(self, place_data: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Extract and analyze reviews from place details.
|
|
|
|
Returns:
|
|
Dict with review statistics and individual reviews
|
|
"""
|
|
reviews = place_data.get('reviews', [])
|
|
if not reviews:
|
|
return {
|
|
'total_from_api': 0,
|
|
'total_reported': place_data.get('userRatingCount', 0),
|
|
'average_rating': place_data.get('rating'),
|
|
'reviews': [],
|
|
'with_response': 0,
|
|
'without_response': 0,
|
|
'response_rate': 0.0,
|
|
'rating_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
|
|
}
|
|
|
|
rating_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
|
|
with_response = 0
|
|
processed_reviews = []
|
|
|
|
for review in reviews:
|
|
rating = review.get('rating', 0)
|
|
if rating in rating_dist:
|
|
rating_dist[rating] += 1
|
|
|
|
has_response = bool(review.get('authorAttribution', {}).get('displayName'))
|
|
# Check if there's an owner response (Google marks these differently)
|
|
# The Places API (New) doesn't directly expose owner responses in the same way
|
|
# We'll check for the presence of a response field
|
|
|
|
processed_reviews.append({
|
|
'author': review.get('authorAttribution', {}).get('displayName', 'Anonim'),
|
|
'rating': rating,
|
|
'text': review.get('text', {}).get('text', ''),
|
|
'time': review.get('publishTime', ''),
|
|
'relative_time': review.get('relativePublishTimeDescription', ''),
|
|
'language': review.get('text', {}).get('languageCode', 'pl'),
|
|
})
|
|
|
|
total = len(reviews)
|
|
response_rate = (with_response / total * 100) if total > 0 else 0.0
|
|
|
|
return {
|
|
'total_from_api': total,
|
|
'total_reported': place_data.get('userRatingCount', 0),
|
|
'average_rating': place_data.get('rating'),
|
|
'reviews': processed_reviews,
|
|
'with_response': with_response,
|
|
'without_response': total - with_response,
|
|
'response_rate': round(response_rate, 1),
|
|
'rating_distribution': rating_dist
|
|
}
|
|
|
|
def extract_attributes(self, place_data: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Extract business attributes from place details.
|
|
|
|
Returns:
|
|
Dict with categorized attributes
|
|
"""
|
|
attributes = {}
|
|
|
|
# Payment options
|
|
payment = place_data.get('paymentOptions', {})
|
|
if payment:
|
|
attributes['payment'] = {
|
|
'accepts_credit_cards': payment.get('acceptsCreditCards'),
|
|
'accepts_debit_cards': payment.get('acceptsDebitCards'),
|
|
'accepts_cash_only': payment.get('acceptsCashOnly'),
|
|
'accepts_nfc': payment.get('acceptsNfc'),
|
|
}
|
|
|
|
# Parking
|
|
parking = place_data.get('parkingOptions', {})
|
|
if parking:
|
|
attributes['parking'] = {
|
|
'free_parking': parking.get('freeParkingLot'),
|
|
'paid_parking': parking.get('paidParkingLot'),
|
|
'street_parking': parking.get('freeStreetParking'),
|
|
'garage_parking': parking.get('freeGarageParking'),
|
|
'valet_parking': parking.get('valetParking'),
|
|
}
|
|
|
|
# Accessibility
|
|
accessibility = place_data.get('accessibilityOptions', {})
|
|
if accessibility:
|
|
attributes['accessibility'] = {
|
|
'wheelchair_entrance': accessibility.get('wheelchairAccessibleEntrance'),
|
|
'wheelchair_seating': accessibility.get('wheelchairAccessibleSeating'),
|
|
'wheelchair_restroom': accessibility.get('wheelchairAccessibleRestroom'),
|
|
'wheelchair_parking': accessibility.get('wheelchairAccessibleParking'),
|
|
}
|
|
|
|
# Service options
|
|
service = {}
|
|
bool_fields = {
|
|
'delivery': 'delivery',
|
|
'dineIn': 'dine_in',
|
|
'takeout': 'takeout',
|
|
'curbsidePickup': 'curbside_pickup',
|
|
'reservable': 'reservable',
|
|
'outdoorSeating': 'outdoor_seating',
|
|
}
|
|
for api_field, key in bool_fields.items():
|
|
val = place_data.get(api_field)
|
|
if val is not None:
|
|
service[key] = val
|
|
if service:
|
|
attributes['service'] = service
|
|
|
|
# Amenities
|
|
amenities = {}
|
|
amenity_fields = {
|
|
'restroom': 'restroom',
|
|
'goodForChildren': 'good_for_children',
|
|
'allowsDogs': 'allows_dogs',
|
|
'goodForGroups': 'good_for_groups',
|
|
'liveMusic': 'live_music',
|
|
'goodForWatchingSports': 'good_for_watching_sports',
|
|
}
|
|
for api_field, key in amenity_fields.items():
|
|
val = place_data.get(api_field)
|
|
if val is not None:
|
|
amenities[key] = val
|
|
if amenities:
|
|
attributes['amenities'] = amenities
|
|
|
|
# Food & Drink
|
|
food = {}
|
|
food_fields = {
|
|
'servesBreakfast': 'breakfast',
|
|
'servesLunch': 'lunch',
|
|
'servesDinner': 'dinner',
|
|
'servesBeer': 'beer',
|
|
'servesWine': 'wine',
|
|
'servesCoffee': 'coffee',
|
|
}
|
|
for api_field, key in food_fields.items():
|
|
val = place_data.get(api_field)
|
|
if val is not None:
|
|
food[key] = val
|
|
if food:
|
|
attributes['food_and_drink'] = food
|
|
|
|
return attributes
|
|
|
|
def extract_hours(self, place_data: Dict) -> Dict[str, Any]:
|
|
"""Extract opening hours from place details."""
|
|
result = {
|
|
'regular': None,
|
|
'current': None,
|
|
'has_special_hours': False,
|
|
'special_hours': None
|
|
}
|
|
|
|
regular = place_data.get('regularOpeningHours', {})
|
|
if regular:
|
|
result['regular'] = {
|
|
'periods': regular.get('periods', []),
|
|
'weekday_descriptions': regular.get('weekdayDescriptions', []),
|
|
'open_now': regular.get('openNow')
|
|
}
|
|
|
|
current = place_data.get('currentOpeningHours', {})
|
|
if current:
|
|
result['current'] = {
|
|
'periods': current.get('periods', []),
|
|
'weekday_descriptions': current.get('weekdayDescriptions', []),
|
|
'open_now': current.get('openNow')
|
|
}
|
|
# If current differs from regular, there are special hours
|
|
if current.get('specialDays'):
|
|
result['has_special_hours'] = True
|
|
result['special_hours'] = current.get('specialDays', [])
|
|
|
|
return result
|
|
|
|
def extract_photos_metadata(self, place_data: Dict) -> Dict[str, Any]:
|
|
"""Extract photo metadata from place details."""
|
|
photos = place_data.get('photos', [])
|
|
if not photos:
|
|
return {
|
|
'total_count': 0,
|
|
'photos': [],
|
|
'has_owner_photos': False
|
|
}
|
|
|
|
photo_list = []
|
|
has_owner = False
|
|
for photo in photos:
|
|
attributions = photo.get('authorAttributions', [])
|
|
is_owner = any(a.get('displayName', '').lower() in ['owner', 'właściciel']
|
|
for a in attributions)
|
|
if is_owner:
|
|
has_owner = True
|
|
|
|
photo_list.append({
|
|
'name': photo.get('name', ''),
|
|
'width': photo.get('widthPx', 0),
|
|
'height': photo.get('heightPx', 0),
|
|
'attributions': [a.get('displayName', '') for a in attributions],
|
|
'is_owner_photo': is_owner
|
|
})
|
|
|
|
return {
|
|
'total_count': len(photo_list),
|
|
'photos': photo_list[:20], # Limit stored photos
|
|
'has_owner_photos': has_owner
|
|
}
|