Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
821 lines
30 KiB
Python
821 lines
30 KiB
Python
"""
|
|
Google Gemini AI Service
|
|
========================
|
|
Reusable service for interacting with Google Gemini API.
|
|
|
|
Features:
|
|
- Multiple model support (Flash, Pro, Gemini 3)
|
|
- Thinking Mode support for Gemini 3 models
|
|
- Error handling and retries
|
|
- Cost tracking
|
|
- Safety settings configuration
|
|
|
|
Author: Maciej Pienczyn, InPi sp. z o.o.
|
|
Updated: 2026-01-29 (Gemini 3 SDK migration)
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import hashlib
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
# New Gemini SDK (google-genai) with thinking mode support
|
|
from google import genai
|
|
from google.genai import types
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database imports for cost tracking
|
|
try:
|
|
from database import SessionLocal, AIAPICostLog, AIUsageLog
|
|
DB_AVAILABLE = True
|
|
except ImportError:
|
|
logger.warning("Database not available - cost tracking disabled")
|
|
DB_AVAILABLE = False
|
|
|
|
# Available Gemini models (2026 - Gemini 3 generation available)
|
|
GEMINI_MODELS = {
|
|
'flash': 'gemini-2.5-flash', # Balanced cost/quality
|
|
'flash-lite': 'gemini-2.5-flash-lite', # Ultra cheap - $0.10/$0.40 per 1M tokens
|
|
'pro': 'gemini-2.5-pro', # High quality 2.5 gen
|
|
'3-flash': 'gemini-3-flash-preview', # Gemini 3 Flash - thinking mode
|
|
'3-pro': 'gemini-3.1-pro-preview', # Gemini 3.1 Pro (alias zachowany dla kompatybilności)
|
|
'3.1-pro': 'gemini-3.1-pro-preview', # Gemini 3.1 Pro - najlepszy reasoning
|
|
'3.1-flash-lite': 'gemini-3.1-flash-lite-preview', # Gemini 3.1 Flash Lite - szybki, tani
|
|
}
|
|
|
|
# Models that support thinking mode
|
|
THINKING_MODELS = {'gemini-3-flash-preview', 'gemini-3.1-pro-preview', 'gemini-3.1-flash-lite-preview'}
|
|
|
|
# Preview models — monitor for GA release to switch for better stability
|
|
# Track at: https://ai.google.dev/gemini-api/docs/models
|
|
PREVIEW_MODELS = {'gemini-3-flash-preview', 'gemini-3.1-pro-preview', 'gemini-3.1-flash-lite-preview'}
|
|
|
|
# Fallback chain for rate limit (429) resilience — Paid Tier 1
|
|
# Order: primary → quality fallback → cheapest fallback
|
|
MODEL_FALLBACK_CHAIN = [
|
|
'gemini-3-flash-preview', # 10K RPD - thinking mode, primary
|
|
'gemini-3.1-flash-lite-preview', # Quality fallback - gen 3.1
|
|
'gemini-2.5-flash-lite', # Unlimited RPD - cheapest, last resort
|
|
]
|
|
|
|
# Available thinking levels for Gemini 3 Flash
|
|
THINKING_LEVELS = {
|
|
'minimal': 'MINIMAL', # Lowest latency, minimal reasoning
|
|
'low': 'LOW', # Fast, simple tasks
|
|
'medium': 'MEDIUM', # Balanced (Gemini 3 Flash only)
|
|
'high': 'HIGH', # Maximum reasoning depth (default)
|
|
}
|
|
|
|
# Pricing per 1M tokens (USD) - updated 2026-01-29
|
|
# Note: Flash on Free Tier = $0.00, Pro on Paid Tier = paid pricing
|
|
GEMINI_PRICING = {
|
|
# Prices per 1M tokens (USD). Verified 2026-03-25 against ai.google.dev/gemini-api/docs/pricing
|
|
# Google bills thinking tokens at the OUTPUT rate (included in output pricing).
|
|
'gemini-2.5-flash': {'input': 0.30, 'output': 2.50},
|
|
'gemini-2.5-flash-lite': {'input': 0.10, 'output': 0.40},
|
|
'gemini-2.5-pro': {'input': 1.25, 'output': 10.00},
|
|
'gemini-3-flash-preview': {'input': 0.50, 'output': 3.00}, # Paid tier
|
|
'gemini-3.1-pro-preview': {'input': 2.00, 'output': 12.00}, # Paid tier
|
|
'gemini-3.1-flash-lite-preview': {'input': 0.25, 'output': 1.50}, # Paid tier
|
|
}
|
|
|
|
|
|
class GeminiService:
|
|
"""Service class for Google Gemini API interactions with Thinking Mode support."""
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: Optional[str] = None,
|
|
model: str = 'flash',
|
|
thinking_level: str = 'high',
|
|
include_thoughts: bool = False,
|
|
fallback_models: Optional[List[str]] = None
|
|
):
|
|
"""
|
|
Initialize Gemini service.
|
|
|
|
Args:
|
|
api_key: Google AI API key (reads from env if not provided)
|
|
model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro')
|
|
thinking_level: Reasoning depth ('minimal', 'low', 'medium', 'high')
|
|
include_thoughts: Whether to include thinking process in response (for debugging)
|
|
fallback_models: List of full model names for 429 fallback (default: MODEL_FALLBACK_CHAIN)
|
|
|
|
API Key:
|
|
- GOOGLE_GEMINI_API_KEY: Paid tier 1 (all models)
|
|
"""
|
|
# Always use paid tier API key
|
|
if api_key:
|
|
self.api_key = api_key
|
|
else:
|
|
self.api_key = os.getenv('GOOGLE_GEMINI_API_KEY')
|
|
|
|
# Debug: Log API key (masked)
|
|
if self.api_key:
|
|
logger.info("Gemini API key loaded successfully")
|
|
else:
|
|
logger.error("API key is None or empty!")
|
|
|
|
if not self.api_key or self.api_key == 'TWOJ_KLUCZ_API_TUTAJ':
|
|
raise ValueError(
|
|
"GOOGLE_GEMINI_API_KEY not configured. "
|
|
"Please add your API key to .env file."
|
|
)
|
|
|
|
# Initialize new Gemini client
|
|
self.client = genai.Client(api_key=self.api_key)
|
|
|
|
# Set model
|
|
self.model_name = GEMINI_MODELS.get(model, GEMINI_MODELS['flash'])
|
|
|
|
# Fallback chain for 429 rate limit resilience
|
|
self.fallback_models = fallback_models if fallback_models is not None else MODEL_FALLBACK_CHAIN
|
|
|
|
# Thinking mode configuration
|
|
self.thinking_level = thinking_level
|
|
self.include_thoughts = include_thoughts
|
|
self._thinking_enabled = self.model_name in THINKING_MODELS
|
|
|
|
# Safety settings
|
|
self.safety_settings = [
|
|
types.SafetySetting(
|
|
category="HARM_CATEGORY_HATE_SPEECH",
|
|
threshold="BLOCK_NONE"
|
|
),
|
|
types.SafetySetting(
|
|
category="HARM_CATEGORY_DANGEROUS_CONTENT",
|
|
threshold="BLOCK_NONE"
|
|
),
|
|
types.SafetySetting(
|
|
category="HARM_CATEGORY_SEXUALLY_EXPLICIT",
|
|
threshold="BLOCK_NONE"
|
|
),
|
|
types.SafetySetting(
|
|
category="HARM_CATEGORY_HARASSMENT",
|
|
threshold="BLOCK_NONE"
|
|
),
|
|
]
|
|
|
|
chain_str = ' → '.join(self.fallback_models)
|
|
logger.info(
|
|
f"Gemini service initialized: model={self.model_name}, "
|
|
f"thinking={self._thinking_enabled}, level={thinking_level}, "
|
|
f"fallback_chain=[{chain_str}]"
|
|
)
|
|
|
|
# Warn if using preview model — monitor for GA release
|
|
if self.model_name in PREVIEW_MODELS:
|
|
logger.warning(
|
|
f"Using PREVIEW model: {self.model_name}. "
|
|
f"Monitor https://ai.google.dev/gemini-api/docs/models for GA release. "
|
|
f"Switch to GA model when available for better stability."
|
|
)
|
|
|
|
@property
|
|
def thinking_enabled(self) -> bool:
|
|
"""Whether thinking mode is enabled for current model."""
|
|
return self._thinking_enabled
|
|
|
|
@property
|
|
def thinking_level_display(self) -> str:
|
|
"""Human-readable thinking level for UI."""
|
|
if not self._thinking_enabled:
|
|
return "Wyłączony"
|
|
return {
|
|
'minimal': 'Minimalny',
|
|
'low': 'Niski',
|
|
'medium': 'Średni',
|
|
'high': 'Wysoki'
|
|
}.get(self.thinking_level, self.thinking_level)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get service status for UI display."""
|
|
return {
|
|
'model': self.model_name,
|
|
'thinking_enabled': self._thinking_enabled,
|
|
'thinking_level': self.thinking_level,
|
|
'thinking_level_display': self.thinking_level_display,
|
|
'include_thoughts': self.include_thoughts
|
|
}
|
|
|
|
@staticmethod
|
|
def _is_rate_limited(error: Exception) -> bool:
|
|
"""Check if error is a 429 / RESOURCE_EXHAUSTED rate limit error."""
|
|
error_str = str(error)
|
|
return '429' in error_str or 'RESOURCE_EXHAUSTED' in error_str
|
|
|
|
@staticmethod
|
|
def _is_retryable(error: Exception) -> bool:
|
|
"""Check if error is retryable (rate limit or server overload)."""
|
|
error_str = str(error)
|
|
return ('429' in error_str or 'RESOURCE_EXHAUSTED' in error_str or
|
|
'503' in error_str or 'UNAVAILABLE' in error_str)
|
|
|
|
def _build_generation_config(self, model: str, temperature: float,
|
|
max_tokens: Optional[int],
|
|
thinking_level: Optional[str],
|
|
response_schema: Optional[Dict] = None) -> types.GenerateContentConfig:
|
|
"""Build GenerateContentConfig, adjusting thinking mode per model."""
|
|
config_params = {
|
|
'temperature': temperature,
|
|
}
|
|
if max_tokens:
|
|
config_params['max_output_tokens'] = max_tokens
|
|
|
|
# Structured output: enforce JSON schema on response
|
|
if response_schema:
|
|
config_params['response_mime_type'] = 'application/json'
|
|
config_params['response_schema'] = response_schema
|
|
|
|
# Only add thinking config for models that support it
|
|
if model in THINKING_MODELS:
|
|
level = thinking_level or self.thinking_level
|
|
thinking_config = types.ThinkingConfig(
|
|
thinking_level=THINKING_LEVELS.get(level, 'HIGH'),
|
|
include_thoughts=self.include_thoughts
|
|
)
|
|
config_params['thinking_config'] = thinking_config
|
|
|
|
return types.GenerateContentConfig(
|
|
**config_params,
|
|
safety_settings=self.safety_settings
|
|
)
|
|
|
|
def generate_text(
|
|
self,
|
|
prompt: str,
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
stream: bool = False,
|
|
thinking_level: Optional[str] = None,
|
|
feature: str = 'general',
|
|
user_id: Optional[int] = None,
|
|
company_id: Optional[int] = None,
|
|
related_entity_type: Optional[str] = None,
|
|
related_entity_id: Optional[int] = None,
|
|
model: Optional[str] = None,
|
|
response_schema: Optional[Dict] = None
|
|
) -> str:
|
|
"""
|
|
Generate text using Gemini API with automatic fallback, cost tracking and thinking mode.
|
|
|
|
On 429/503, automatically retries with the next model in fallback chain.
|
|
|
|
Args:
|
|
prompt: Text prompt to send to the model
|
|
temperature: Sampling temperature (0.0-1.0). Higher = more creative
|
|
max_tokens: Maximum tokens to generate (None = model default)
|
|
stream: Whether to stream the response
|
|
thinking_level: Override default thinking level for this call
|
|
feature: Feature name for cost tracking ('chat', 'news_evaluation', etc.)
|
|
user_id: Optional user ID for cost tracking
|
|
company_id: Optional company ID for context
|
|
related_entity_type: Entity type ('zopk_news', 'chat_message', etc.)
|
|
related_entity_id: Entity ID for reference
|
|
model: Override model for this call (alias like '3-pro' or full name like 'gemini-3-pro-preview')
|
|
response_schema: Optional JSON schema dict to enforce structured output (Gemini format)
|
|
|
|
Returns:
|
|
Generated text response
|
|
|
|
Raises:
|
|
Exception: If API call fails on all models
|
|
"""
|
|
# Resolve model override: accept alias ('3-pro') or full name ('gemini-3-pro-preview')
|
|
primary_model = self.model_name
|
|
if model:
|
|
primary_model = GEMINI_MODELS.get(model, model)
|
|
|
|
# Build ordered list of models to try: primary first, then fallbacks
|
|
models_to_try = [primary_model]
|
|
for m in self.fallback_models:
|
|
if m not in models_to_try:
|
|
models_to_try.append(m)
|
|
|
|
start_time = time.time()
|
|
last_error = None
|
|
|
|
for model in models_to_try:
|
|
try:
|
|
generation_config = self._build_generation_config(
|
|
model=model,
|
|
temperature=temperature,
|
|
max_tokens=max_tokens,
|
|
thinking_level=thinking_level,
|
|
response_schema=response_schema
|
|
)
|
|
|
|
# Call API
|
|
response = self.client.models.generate_content(
|
|
model=model,
|
|
contents=prompt,
|
|
config=generation_config
|
|
)
|
|
|
|
if stream:
|
|
return response
|
|
|
|
# Extract response text
|
|
response_text = response.text
|
|
|
|
# Count tokens and log cost
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
|
|
input_tokens = self._count_tokens_from_response(response, 'input')
|
|
output_tokens = self._count_tokens_from_response(response, 'output')
|
|
thinking_tokens = self._count_tokens_from_response(response, 'thinking')
|
|
|
|
# Log with model & thinking info
|
|
level = thinking_level or self.thinking_level
|
|
is_thinking = model in THINKING_MODELS
|
|
is_fallback = model != self.model_name
|
|
logger.info(
|
|
f"Gemini API call successful. "
|
|
f"Tokens: {input_tokens}+{output_tokens}"
|
|
f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, "
|
|
f"Latency: {latency_ms}ms, "
|
|
f"Model: {model}{'(fallback)' if is_fallback else ''}, "
|
|
f"Thinking: {level.upper() if is_thinking else 'OFF'}"
|
|
)
|
|
|
|
# Log to database for cost tracking (use actual model used)
|
|
self._log_api_cost(
|
|
prompt=prompt,
|
|
response_text=response_text,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
thinking_tokens=thinking_tokens,
|
|
latency_ms=latency_ms,
|
|
success=True,
|
|
feature=feature,
|
|
user_id=user_id,
|
|
company_id=company_id,
|
|
related_entity_type=related_entity_type,
|
|
related_entity_id=related_entity_id,
|
|
model_override=model if is_fallback else None
|
|
)
|
|
|
|
return response_text
|
|
|
|
except Exception as e:
|
|
if self._is_retryable(e) and model != models_to_try[-1]:
|
|
logger.warning(f"Retryable error on {model} ({type(e).__name__}), trying next fallback...")
|
|
last_error = e
|
|
continue
|
|
|
|
# Non-retryable error or last model in chain — fail
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
|
|
self._log_api_cost(
|
|
prompt=prompt,
|
|
response_text='',
|
|
input_tokens=self._estimate_tokens(prompt),
|
|
output_tokens=0,
|
|
thinking_tokens=0,
|
|
latency_ms=latency_ms,
|
|
success=False,
|
|
error_message=str(e),
|
|
feature=feature,
|
|
user_id=user_id,
|
|
company_id=company_id,
|
|
related_entity_type=related_entity_type,
|
|
related_entity_id=related_entity_id,
|
|
model_override=model if model != self.model_name else None
|
|
)
|
|
|
|
logger.error(f"Gemini API error on {model}: {str(e)}")
|
|
raise Exception(f"Gemini API call failed: {str(e)}")
|
|
|
|
# All models exhausted (all returned 429)
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
logger.error(f"All fallback models exhausted. Last error: {last_error}")
|
|
self._log_api_cost(
|
|
prompt=prompt,
|
|
response_text='',
|
|
input_tokens=self._estimate_tokens(prompt),
|
|
output_tokens=0,
|
|
thinking_tokens=0,
|
|
latency_ms=latency_ms,
|
|
success=False,
|
|
error_message=f"All models rate limited: {last_error}",
|
|
feature=feature,
|
|
user_id=user_id,
|
|
company_id=company_id,
|
|
related_entity_type=related_entity_type,
|
|
related_entity_id=related_entity_id
|
|
)
|
|
raise Exception(f"All Gemini models rate limited. Last error: {last_error}")
|
|
|
|
def chat(self, messages: List[Dict[str, str]]) -> str:
|
|
"""
|
|
Multi-turn chat conversation.
|
|
|
|
Args:
|
|
messages: List of message dicts with 'role' and 'content' keys
|
|
Example: [
|
|
{'role': 'user', 'content': 'Hello'},
|
|
{'role': 'model', 'content': 'Hi there!'},
|
|
{'role': 'user', 'content': 'How are you?'}
|
|
]
|
|
|
|
Returns:
|
|
Model's response to the last message
|
|
"""
|
|
try:
|
|
# Build contents from messages
|
|
contents = []
|
|
for msg in messages:
|
|
role = 'user' if msg['role'] == 'user' else 'model'
|
|
contents.append(types.Content(
|
|
role=role,
|
|
parts=[types.Part(text=msg['content'])]
|
|
))
|
|
|
|
# Build config with thinking if available
|
|
config_params = {'temperature': 0.7}
|
|
if self._thinking_enabled:
|
|
config_params['thinking_config'] = types.ThinkingConfig(
|
|
thinking_level=THINKING_LEVELS.get(self.thinking_level, 'HIGH'),
|
|
include_thoughts=self.include_thoughts
|
|
)
|
|
|
|
generation_config = types.GenerateContentConfig(
|
|
**config_params,
|
|
safety_settings=self.safety_settings
|
|
)
|
|
|
|
response = self.client.models.generate_content(
|
|
model=self.model_name,
|
|
contents=contents,
|
|
config=generation_config
|
|
)
|
|
|
|
return response.text
|
|
|
|
except Exception as e:
|
|
logger.error(f"Gemini chat error: {str(e)}")
|
|
raise Exception(f"Gemini chat failed: {str(e)}")
|
|
|
|
def analyze_image(self, image_path: str, prompt: str) -> str:
|
|
"""
|
|
Analyze image with Gemini Vision.
|
|
|
|
Args:
|
|
image_path: Path to image file
|
|
prompt: Text prompt describing what to analyze
|
|
|
|
Returns:
|
|
Analysis result
|
|
"""
|
|
try:
|
|
import PIL.Image
|
|
|
|
img = PIL.Image.open(image_path)
|
|
|
|
# Convert image to bytes
|
|
import io
|
|
img_bytes = io.BytesIO()
|
|
img.save(img_bytes, format=img.format or 'PNG')
|
|
img_bytes = img_bytes.getvalue()
|
|
|
|
contents = [
|
|
types.Part(text=prompt),
|
|
types.Part(
|
|
inline_data=types.Blob(
|
|
mime_type=f"image/{(img.format or 'png').lower()}",
|
|
data=img_bytes
|
|
)
|
|
)
|
|
]
|
|
|
|
response = self.client.models.generate_content(
|
|
model=self.model_name,
|
|
contents=contents
|
|
)
|
|
|
|
return response.text
|
|
|
|
except Exception as e:
|
|
logger.error(f"Gemini image analysis error: {str(e)}")
|
|
raise Exception(f"Image analysis failed: {str(e)}")
|
|
|
|
def count_tokens(self, text: str) -> int:
|
|
"""
|
|
Count tokens in text.
|
|
|
|
Args:
|
|
text: Text to count tokens for
|
|
|
|
Returns:
|
|
Number of tokens
|
|
"""
|
|
try:
|
|
result = self.client.models.count_tokens(
|
|
model=self.model_name,
|
|
contents=text
|
|
)
|
|
return result.total_tokens
|
|
except Exception as e:
|
|
logger.warning(f"Token counting failed: {e}")
|
|
return self._estimate_tokens(text)
|
|
|
|
def _estimate_tokens(self, text: str) -> int:
|
|
"""Estimate tokens when API counting fails (~4 chars per token)."""
|
|
return len(text) // 4
|
|
|
|
def _count_tokens_from_response(self, response, token_type: str) -> int:
|
|
"""Extract token count from API response metadata."""
|
|
try:
|
|
usage = response.usage_metadata
|
|
if not usage:
|
|
return 0
|
|
if token_type == 'input':
|
|
return getattr(usage, 'prompt_token_count', 0) or 0
|
|
elif token_type == 'output':
|
|
return getattr(usage, 'candidates_token_count', 0) or 0
|
|
elif token_type == 'thinking':
|
|
# SDK may not expose thinking_token_count directly.
|
|
# Derive from: total - prompt - candidates = thinking tokens.
|
|
thinking = getattr(usage, 'thinking_token_count', 0) or 0
|
|
if not thinking:
|
|
total = getattr(usage, 'total_token_count', 0) or 0
|
|
prompt = getattr(usage, 'prompt_token_count', 0) or 0
|
|
candidates = getattr(usage, 'candidates_token_count', 0) or 0
|
|
thinking = max(0, total - prompt - candidates)
|
|
return thinking
|
|
except Exception:
|
|
return 0
|
|
return 0
|
|
|
|
def _log_api_cost(
|
|
self,
|
|
prompt: str,
|
|
response_text: str,
|
|
input_tokens: int,
|
|
output_tokens: int,
|
|
thinking_tokens: int = 0,
|
|
latency_ms: int = 0,
|
|
success: bool = True,
|
|
error_message: Optional[str] = None,
|
|
feature: str = 'general',
|
|
user_id: Optional[int] = None,
|
|
company_id: Optional[int] = None,
|
|
related_entity_type: Optional[str] = None,
|
|
related_entity_id: Optional[int] = None,
|
|
model_override: Optional[str] = None
|
|
):
|
|
"""
|
|
Log API call costs to database for monitoring.
|
|
|
|
Args:
|
|
prompt: Input prompt text
|
|
response_text: Output response text
|
|
input_tokens: Number of input tokens used
|
|
output_tokens: Number of output tokens generated
|
|
thinking_tokens: Number of thinking tokens (Gemini 3)
|
|
latency_ms: Response time in milliseconds
|
|
success: Whether API call succeeded
|
|
error_message: Error details if failed
|
|
feature: Feature name ('chat', 'news_evaluation', 'user_creation', etc.)
|
|
user_id: Optional user ID
|
|
company_id: Optional company ID for context
|
|
related_entity_type: Entity type ('zopk_news', 'chat_message', etc.)
|
|
related_entity_id: Entity ID for reference
|
|
model_override: Actual model used (if different from self.model_name due to fallback)
|
|
"""
|
|
if not DB_AVAILABLE:
|
|
return
|
|
|
|
actual_model = model_override or self.model_name
|
|
|
|
try:
|
|
# Calculate costs using actual model pricing
|
|
# Google bills thinking tokens at the output rate
|
|
pricing = GEMINI_PRICING.get(actual_model, {'input': 0.50, 'output': 3.00})
|
|
input_cost = (input_tokens / 1_000_000) * pricing['input']
|
|
output_cost = ((output_tokens + thinking_tokens) / 1_000_000) * pricing['output']
|
|
total_cost = input_cost + output_cost
|
|
|
|
# Cost in cents for AIUsageLog
|
|
cost_cents = total_cost * 100
|
|
|
|
# Create prompt hash (for debugging, not storing full prompt for privacy)
|
|
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()
|
|
|
|
# Save to database
|
|
db = SessionLocal()
|
|
try:
|
|
# Log to legacy AIAPICostLog table
|
|
legacy_log = AIAPICostLog(
|
|
timestamp=datetime.now(),
|
|
api_provider='gemini',
|
|
model_name=actual_model,
|
|
feature=feature,
|
|
user_id=user_id,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens + thinking_tokens, # Combined for legacy
|
|
total_tokens=input_tokens + output_tokens + thinking_tokens,
|
|
input_cost=input_cost,
|
|
output_cost=output_cost, # Includes thinking (billed at output rate)
|
|
total_cost=total_cost,
|
|
success=success,
|
|
error_message=error_message,
|
|
latency_ms=latency_ms,
|
|
prompt_hash=prompt_hash
|
|
)
|
|
db.add(legacy_log)
|
|
|
|
# Log to new AIUsageLog table
|
|
usage_log = AIUsageLog(
|
|
request_type=feature,
|
|
model=actual_model,
|
|
tokens_input=input_tokens,
|
|
tokens_output=output_tokens + thinking_tokens,
|
|
cost_cents=cost_cents,
|
|
user_id=user_id,
|
|
company_id=company_id,
|
|
related_entity_type=related_entity_type,
|
|
related_entity_id=related_entity_id,
|
|
prompt_length=len(prompt),
|
|
response_length=len(response_text),
|
|
response_time_ms=latency_ms,
|
|
success=success,
|
|
error_message=error_message
|
|
)
|
|
db.add(usage_log)
|
|
|
|
db.commit()
|
|
|
|
logger.info(
|
|
f"API cost logged: {feature} - ${total_cost:.6f} "
|
|
f"({input_tokens}+{output_tokens}"
|
|
f"{f'+{thinking_tokens}t' if thinking_tokens else ''} tokens, {latency_ms}ms)"
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log API cost: {e}")
|
|
|
|
def generate_embedding(
|
|
self,
|
|
text: str,
|
|
task_type: str = 'RETRIEVAL_DOCUMENT',
|
|
title: Optional[str] = None,
|
|
user_id: Optional[int] = None,
|
|
feature: str = 'embedding'
|
|
) -> Optional[List[float]]:
|
|
"""
|
|
Generate embedding vector for text using Google's text-embedding model.
|
|
|
|
Args:
|
|
text: Text to embed
|
|
task_type: One of:
|
|
- 'RETRIEVAL_DOCUMENT': For documents to be retrieved
|
|
- 'RETRIEVAL_QUERY': For search queries
|
|
- 'SEMANTIC_SIMILARITY': For comparing texts
|
|
- 'CLASSIFICATION': For text classification
|
|
- 'CLUSTERING': For text clustering
|
|
title: Optional title for document (improves quality)
|
|
user_id: User ID for cost tracking
|
|
feature: Feature name for cost tracking
|
|
|
|
Returns:
|
|
768-dimensional embedding vector or None on error
|
|
"""
|
|
if not text or not text.strip():
|
|
logger.warning("Empty text provided for embedding")
|
|
return None
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Build content with optional title
|
|
content_parts = []
|
|
if title:
|
|
content_parts.append(types.Part(text=f"Title: {title}\n\n"))
|
|
content_parts.append(types.Part(text=text))
|
|
|
|
result = self.client.models.embed_content(
|
|
model='gemini-embedding-001',
|
|
contents=types.Content(parts=content_parts),
|
|
config=types.EmbedContentConfig(
|
|
task_type=task_type,
|
|
output_dimensionality=768
|
|
)
|
|
)
|
|
|
|
embedding = result.embeddings[0].values if result.embeddings else None
|
|
|
|
if not embedding:
|
|
logger.error("No embedding returned from API")
|
|
return None
|
|
|
|
# Log cost
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
token_count = len(text) // 4
|
|
cost_usd = (token_count / 1000) * 0.00001
|
|
|
|
logger.debug(
|
|
f"Embedding generated: {len(embedding)} dims, "
|
|
f"{token_count} tokens, {latency_ms}ms, ${cost_usd:.8f}"
|
|
)
|
|
|
|
return list(embedding)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Embedding generation error: {e}")
|
|
return None
|
|
|
|
def generate_embeddings_batch(
|
|
self,
|
|
texts: List[str],
|
|
task_type: str = 'RETRIEVAL_DOCUMENT',
|
|
user_id: Optional[int] = None
|
|
) -> List[Optional[List[float]]]:
|
|
"""
|
|
Generate embeddings for multiple texts.
|
|
|
|
Args:
|
|
texts: List of texts to embed
|
|
task_type: Task type for all embeddings
|
|
user_id: User ID for cost tracking
|
|
|
|
Returns:
|
|
List of embedding vectors (None for failed items)
|
|
"""
|
|
results = []
|
|
for text in texts:
|
|
embedding = self.generate_embedding(
|
|
text=text,
|
|
task_type=task_type,
|
|
user_id=user_id,
|
|
feature='embedding_batch'
|
|
)
|
|
results.append(embedding)
|
|
return results
|
|
|
|
|
|
# Global service instance (initialized in app.py)
|
|
_gemini_service: Optional[GeminiService] = None
|
|
|
|
|
|
def init_gemini_service(
|
|
api_key: Optional[str] = None,
|
|
model: str = 'flash',
|
|
thinking_level: str = 'high'
|
|
):
|
|
"""
|
|
Initialize global Gemini service instance.
|
|
Call this in app.py during Flask app initialization.
|
|
|
|
Args:
|
|
api_key: Google AI API key (optional if set in env)
|
|
model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro')
|
|
thinking_level: Reasoning depth for Gemini 3 models ('minimal', 'low', 'medium', 'high')
|
|
"""
|
|
global _gemini_service
|
|
try:
|
|
_gemini_service = GeminiService(
|
|
api_key=api_key,
|
|
model=model,
|
|
thinking_level=thinking_level
|
|
)
|
|
logger.info("Global Gemini service initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Gemini service: {e}")
|
|
_gemini_service = None
|
|
|
|
|
|
def get_gemini_service() -> Optional[GeminiService]:
|
|
"""
|
|
Get global Gemini service instance.
|
|
|
|
Returns:
|
|
GeminiService instance or None if not initialized
|
|
"""
|
|
return _gemini_service
|
|
|
|
|
|
def generate_text(prompt: str, **kwargs) -> Optional[str]:
|
|
"""
|
|
Convenience function to generate text using global service.
|
|
|
|
Args:
|
|
prompt: Text prompt
|
|
**kwargs: Additional arguments for generate_text()
|
|
|
|
Returns:
|
|
Generated text or None if service not initialized
|
|
"""
|
|
service = get_gemini_service()
|
|
if service:
|
|
return service.generate_text(prompt, **kwargs)
|
|
|
|
logger.warning("Gemini service not initialized")
|
|
return None
|