nordabiz/google_places_service.py
Maciej Pienczyn 5030b71beb
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
chore: update Author to Maciej Pienczyn, InPi sp. z o.o. across all files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 08:20:47 +02:00

599 lines
22 KiB
Python

"""
Google Places API (New) Service for NordaBiz
=============================================
Comprehensive Google Places API client for fetching rich business data.
Uses the Places API (New) with field masks for efficient billing.
API Reference: https://developers.google.com/maps/documentation/places/web-service/op-overview
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2026-02-06
"""
import os
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from decimal import Decimal
import requests
logger = logging.getLogger(__name__)
# API Configuration
PLACES_API_BASE = "https://places.googleapis.com/v1/places"
PLACES_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
PLACES_NEARBY_URL = "https://places.googleapis.com/v1/places:searchNearby"
# Field masks grouped by billing tier
# Basic fields (no charge): id, displayName, formattedAddress, location, types, etc.
# Contact fields: nationalPhoneNumber, websiteUri, etc.
# Atmosphere fields: reviews, rating, etc.
BASIC_FIELDS = [
"id", "displayName", "formattedAddress", "location",
"types", "primaryType", "primaryTypeDisplayName",
"businessStatus", "googleMapsUri", "googleMapsLinks",
"utcOffsetMinutes", "adrFormatAddress",
"shortFormattedAddress"
]
CONTACT_FIELDS = [
"nationalPhoneNumber", "internationalPhoneNumber",
"websiteUri"
]
HOURS_FIELDS = [
"regularOpeningHours", "currentOpeningHours"
]
ATMOSPHERE_FIELDS = [
"rating", "userRatingCount", "reviews",
"priceLevel", "editorialSummary"
]
PHOTO_FIELDS = [
"photos"
]
ATTRIBUTE_FIELDS = [
"paymentOptions", "parkingOptions",
"accessibilityOptions", "outdoorSeating",
"liveMusic", "servesBreakfast", "servesLunch",
"servesDinner", "servesBeer", "servesWine",
"servesCoffee", "goodForChildren", "allowsDogs",
"restroom", "goodForGroups", "goodForWatchingSports",
"reservable", "delivery", "dineIn", "takeout",
"curbsidePickup"
]
class GooglePlacesService:
"""Fetches rich GBP data via Places API (New)."""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv('GOOGLE_PLACES_API_KEY')
if not self.api_key:
raise ValueError("GOOGLE_PLACES_API_KEY not set in environment")
self.session = requests.Session()
self.session.headers.update({
'X-Goog-Api-Key': self.api_key,
'Content-Type': 'application/json'
})
def _build_field_mask(self, include_reviews: bool = True,
include_photos: bool = True,
include_attributes: bool = True) -> str:
"""Build field mask string for API request."""
fields = BASIC_FIELDS + CONTACT_FIELDS + HOURS_FIELDS + ATMOSPHERE_FIELDS
if include_photos:
fields += PHOTO_FIELDS
if include_attributes:
fields += ATTRIBUTE_FIELDS
return ','.join(f'places.{f}' if '.' not in f else f for f in fields)
def get_place_details(self, place_id: str,
include_reviews: bool = True,
include_photos: bool = True,
include_attributes: bool = True) -> Optional[Dict[str, Any]]:
"""
Fetch comprehensive place details by Place ID.
Args:
place_id: Google Place ID
include_reviews: Include reviews data (billed separately)
include_photos: Include photo references
include_attributes: Include business attributes
Returns:
Dict with place details or None on error
"""
url = f"{PLACES_API_BASE}/{place_id}"
# Build field mask
fields = list(BASIC_FIELDS + CONTACT_FIELDS + HOURS_FIELDS + ATMOSPHERE_FIELDS)
if include_photos:
fields += PHOTO_FIELDS
if include_attributes:
fields += ATTRIBUTE_FIELDS
field_mask = ','.join(fields)
headers = {
'X-Goog-FieldMask': field_mask
}
params = {
'languageCode': 'pl',
}
try:
response = self.session.get(url, headers=headers, params=params, timeout=15)
response.raise_for_status()
data = response.json()
logger.info(f"Fetched place details for {place_id}: {data.get('displayName', {}).get('text', 'unknown')}")
return data
except requests.exceptions.HTTPError as e:
logger.error(f"Places API HTTP error for {place_id}: {e.response.status_code} - {e.response.text}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Places API request error for {place_id}: {e}")
return None
@staticmethod
def _tokenize_name(name: str) -> set:
"""Tokenize a company name into significant lowercase words."""
import re as _re
skip_words = {
'sp', 'z', 'o', 'oo', 'sa', 'sc', 'j', 'k', 'ul', 'i', 'w',
'do', 'na', 'po', 'ze', 'the', 'and', 'of', 'for', 'group',
}
# Split on non-alphanumeric, keep words; treat & as connector (P&P, S&K)
words = _re.findall(r'[a-ząćęłńóśźż0-9]+(?:&[a-ząćęłńóśźż0-9]+)*', name.lower())
return {w for w in words if len(w) > 1 and w not in skip_words}
@staticmethod
def _name_match_score(company_name: str, google_name: str) -> float:
"""
Compute name match score with prefix detection and bidirectional fallback.
1. If Google name starts with company name (word boundary) → 1.0
"INPI" matches "INPI - Infrastruktura IT" (same company, extra description)
"TERMO" does NOT match "TERMO-BUD" (compound name, no space separator)
2. Otherwise, bidirectional word matching with max() denominator.
"""
import re as _re
cn = company_name.strip().lower()
gn = google_name.strip().lower()
# Strip legal forms for prefix comparison (Sp. z o.o., S.A., Sp.j., etc.)
clean_cn = _re.sub(
r'\s*(sp\.?\s*z\.?\s*o\.?\s*o\.?|sp\.?\s*[jkp]\.?|s\.?\s*[ac]\.?)\s*\.?\s*$',
'', cn, flags=_re.IGNORECASE
).strip() or cn
# Also strip legal forms from Google name for reverse prefix check
clean_gn = _re.sub(
r'\s*(sp\.?\s*z\.?\s*o\.?\s*o\.?|sp\.?\s*[jkp]\.?|s\.?\s*[ac]\.?)\s*\.?\s*$',
'', gn, flags=_re.IGNORECASE
).strip() or gn
# Prefix check: company name at start of Google name, or vice versa,
# followed by space, period, comma, or end — NOT dash (compound names)
wb = r'(?:[\s.,;:]|$)'
if clean_cn and (_re.match(_re.escape(clean_cn) + wb, gn) or
_re.match(_re.escape(clean_gn) + wb, clean_cn)):
return 1.0
company_words = GooglePlacesService._tokenize_name(company_name)
google_words = GooglePlacesService._tokenize_name(google_name)
if not company_words:
return 0.0
if not google_words:
return 0.0
matched = company_words & google_words
# Score based on how many of OUR words are found in Google name
# (Google names are often longer — "Sprzedaż i Wynajem Wózków Widłowych" for "Sprzedaż Wózków")
return len(matched) / len(company_words)
def search_place(self, query: str, location_bias: Dict = None,
company_name: str = None) -> Optional[Dict[str, Any]]:
"""
Search for a place by text query.
Args:
query: Search text (e.g., "TERMO Wejherowo")
location_bias: Optional location bias {"latitude": 54.6, "longitude": 18.2, "radius": 5000}
company_name: Optional company name for result validation.
If provided, verifies the result name matches before returning.
Returns:
Best matching place or None
"""
body = {
"textQuery": query,
"languageCode": "pl",
"maxResultCount": 5
}
if location_bias:
body["locationBias"] = {
"circle": {
"center": {
"latitude": location_bias["latitude"],
"longitude": location_bias["longitude"]
},
"radius": location_bias.get("radius", 5000.0)
}
}
field_mask = ','.join(f'places.{f}' for f in ['id', 'displayName', 'formattedAddress', 'types', 'rating', 'userRatingCount', 'googleMapsUri'])
headers = {
'X-Goog-FieldMask': field_mask
}
try:
response = self.session.post(PLACES_SEARCH_URL, json=body, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
places = data.get('places', [])
if not places:
logger.warning(f"No places found for query: {query}")
return None
if not company_name:
return places[0]
# Validate: company name must significantly match Google result name.
# Uses word-boundary matching with minimum threshold:
# - Short names (1-2 significant words): ALL words must match
# - Longer names (3+ words): at least 50% of words must match
company_words = self._tokenize_name(company_name)
min_ratio = 1.0 if len(company_words) <= 2 else 0.5
best_place = None
best_score = 0.0
for place in places:
google_name = place.get('displayName', {}).get('text', '')
score = self._name_match_score(company_name, google_name)
if score >= min_ratio and score > best_score:
best_score = score
best_place = place
if best_place:
matched_name = best_place.get('displayName', {}).get('text', '')
logger.info(
f"Name match for '{company_name}': '{matched_name}' (score={best_score:.2f})"
)
return best_place
logger.warning(
f"No name match for '{company_name}' (min_ratio={min_ratio:.0%}) in Google results: "
f"{[p.get('displayName', {}).get('text', '') for p in places]}"
)
return None
except requests.exceptions.RequestException as e:
logger.error(f"Places search error for '{query}': {e}")
return None
def search_places_raw(self, query: str, location_bias: Dict = None) -> List[Dict[str, Any]]:
"""
Search for places and return ALL results (no name filtering).
Used for manual review/matching in admin panel.
"""
body = {
"textQuery": query,
"languageCode": "pl",
"maxResultCount": 5
}
if location_bias:
body["locationBias"] = {
"circle": {
"center": {
"latitude": location_bias["latitude"],
"longitude": location_bias["longitude"]
},
"radius": location_bias.get("radius", 5000.0)
}
}
field_mask = ','.join(f'places.{f}' for f in [
'id', 'displayName', 'formattedAddress', 'types',
'rating', 'userRatingCount', 'googleMapsUri'
])
headers = {'X-Goog-FieldMask': field_mask}
try:
response = self.session.post(PLACES_SEARCH_URL, json=body, headers=headers, timeout=15)
response.raise_for_status()
return response.json().get('places', [])
except requests.exceptions.RequestException as e:
logger.error(f"Places raw search error for '{query}': {e}")
return []
def search_nearby(self, latitude: float, longitude: float,
radius: float = 5000.0,
included_types: List[str] = None,
max_results: int = 10) -> List[Dict[str, Any]]:
"""
Search for nearby places (for competitor discovery).
Args:
latitude: Center point latitude
longitude: Center point longitude
radius: Search radius in meters
included_types: Filter by place types (e.g., ["restaurant"])
max_results: Maximum results to return
Returns:
List of nearby places
"""
body = {
"locationRestriction": {
"circle": {
"center": {
"latitude": latitude,
"longitude": longitude
},
"radius": radius
}
},
"maxResultCount": min(max_results, 20),
"languageCode": "pl"
}
if included_types:
body["includedTypes"] = included_types
field_mask = ','.join(f'places.{f}' for f in [
'id', 'displayName', 'formattedAddress', 'types',
'rating', 'userRatingCount', 'googleMapsUri',
'websiteUri', 'primaryType', 'photos',
'businessStatus', 'location'
])
headers = {
'X-Goog-FieldMask': field_mask
}
try:
response = self.session.post(PLACES_NEARBY_URL, json=body, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
return data.get('places', [])
except requests.exceptions.RequestException as e:
logger.error(f"Nearby search error: {e}")
return []
def get_photo_url(self, photo_name: str, max_width: int = 400) -> str:
"""
Get photo URL from photo resource name.
Args:
photo_name: Photo resource name from place details
max_width: Maximum width in pixels
Returns:
Photo URL string
"""
return f"https://places.googleapis.com/v1/{photo_name}/media?maxWidthPx={max_width}&key={self.api_key}"
def extract_reviews_data(self, place_data: Dict) -> Dict[str, Any]:
"""
Extract and analyze reviews from place details.
Returns:
Dict with review statistics and individual reviews
"""
reviews = place_data.get('reviews', [])
if not reviews:
return {
'total_from_api': 0,
'total_reported': place_data.get('userRatingCount', 0),
'average_rating': place_data.get('rating'),
'reviews': [],
'with_response': 0,
'without_response': 0,
'response_rate': 0.0,
'rating_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
}
rating_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
with_response = 0
processed_reviews = []
for review in reviews:
rating = review.get('rating', 0)
if rating in rating_dist:
rating_dist[rating] += 1
has_response = bool(review.get('authorAttribution', {}).get('displayName'))
# Check if there's an owner response (Google marks these differently)
# The Places API (New) doesn't directly expose owner responses in the same way
# We'll check for the presence of a response field
processed_reviews.append({
'author': review.get('authorAttribution', {}).get('displayName', 'Anonim'),
'rating': rating,
'text': review.get('text', {}).get('text', ''),
'time': review.get('publishTime', ''),
'relative_time': review.get('relativePublishTimeDescription', ''),
'language': review.get('text', {}).get('languageCode', 'pl'),
})
total = len(reviews)
response_rate = (with_response / total * 100) if total > 0 else 0.0
return {
'total_from_api': total,
'total_reported': place_data.get('userRatingCount', 0),
'average_rating': place_data.get('rating'),
'reviews': processed_reviews,
'with_response': with_response,
'without_response': total - with_response,
'response_rate': round(response_rate, 1),
'rating_distribution': rating_dist
}
def extract_attributes(self, place_data: Dict) -> Dict[str, Any]:
"""
Extract business attributes from place details.
Returns:
Dict with categorized attributes
"""
attributes = {}
# Payment options
payment = place_data.get('paymentOptions', {})
if payment:
attributes['payment'] = {
'accepts_credit_cards': payment.get('acceptsCreditCards'),
'accepts_debit_cards': payment.get('acceptsDebitCards'),
'accepts_cash_only': payment.get('acceptsCashOnly'),
'accepts_nfc': payment.get('acceptsNfc'),
}
# Parking
parking = place_data.get('parkingOptions', {})
if parking:
attributes['parking'] = {
'free_parking': parking.get('freeParkingLot'),
'paid_parking': parking.get('paidParkingLot'),
'street_parking': parking.get('freeStreetParking'),
'garage_parking': parking.get('freeGarageParking'),
'valet_parking': parking.get('valetParking'),
}
# Accessibility
accessibility = place_data.get('accessibilityOptions', {})
if accessibility:
attributes['accessibility'] = {
'wheelchair_entrance': accessibility.get('wheelchairAccessibleEntrance'),
'wheelchair_seating': accessibility.get('wheelchairAccessibleSeating'),
'wheelchair_restroom': accessibility.get('wheelchairAccessibleRestroom'),
'wheelchair_parking': accessibility.get('wheelchairAccessibleParking'),
}
# Service options
service = {}
bool_fields = {
'delivery': 'delivery',
'dineIn': 'dine_in',
'takeout': 'takeout',
'curbsidePickup': 'curbside_pickup',
'reservable': 'reservable',
'outdoorSeating': 'outdoor_seating',
}
for api_field, key in bool_fields.items():
val = place_data.get(api_field)
if val is not None:
service[key] = val
if service:
attributes['service'] = service
# Amenities
amenities = {}
amenity_fields = {
'restroom': 'restroom',
'goodForChildren': 'good_for_children',
'allowsDogs': 'allows_dogs',
'goodForGroups': 'good_for_groups',
'liveMusic': 'live_music',
'goodForWatchingSports': 'good_for_watching_sports',
}
for api_field, key in amenity_fields.items():
val = place_data.get(api_field)
if val is not None:
amenities[key] = val
if amenities:
attributes['amenities'] = amenities
# Food & Drink
food = {}
food_fields = {
'servesBreakfast': 'breakfast',
'servesLunch': 'lunch',
'servesDinner': 'dinner',
'servesBeer': 'beer',
'servesWine': 'wine',
'servesCoffee': 'coffee',
}
for api_field, key in food_fields.items():
val = place_data.get(api_field)
if val is not None:
food[key] = val
if food:
attributes['food_and_drink'] = food
return attributes
def extract_hours(self, place_data: Dict) -> Dict[str, Any]:
"""Extract opening hours from place details."""
result = {
'regular': None,
'current': None,
'has_special_hours': False,
'special_hours': None
}
regular = place_data.get('regularOpeningHours', {})
if regular:
result['regular'] = {
'periods': regular.get('periods', []),
'weekday_descriptions': regular.get('weekdayDescriptions', []),
'open_now': regular.get('openNow')
}
current = place_data.get('currentOpeningHours', {})
if current:
result['current'] = {
'periods': current.get('periods', []),
'weekday_descriptions': current.get('weekdayDescriptions', []),
'open_now': current.get('openNow')
}
# If current differs from regular, there are special hours
if current.get('specialDays'):
result['has_special_hours'] = True
result['special_hours'] = current.get('specialDays', [])
return result
def extract_photos_metadata(self, place_data: Dict) -> Dict[str, Any]:
"""Extract photo metadata from place details."""
photos = place_data.get('photos', [])
if not photos:
return {
'total_count': 0,
'photos': [],
'has_owner_photos': False
}
photo_list = []
has_owner = False
for photo in photos:
attributions = photo.get('authorAttributions', [])
is_owner = any(a.get('displayName', '').lower() in ['owner', 'właściciel']
for a in attributions)
if is_owner:
has_owner = True
photo_list.append({
'name': photo.get('name', ''),
'width': photo.get('widthPx', 0),
'height': photo.get('heightPx', 0),
'attributions': [a.get('displayName', '') for a in attributions],
'is_owner_photo': is_owner
})
return {
'total_count': len(photo_list),
'photos': photo_list[:20], # Limit stored photos
'has_owner_photos': has_owner
}