""" Google Gemini AI Service ======================== Reusable service for interacting with Google Gemini API. Features: - Multiple model support (Flash, Pro, Gemini 3) - Thinking Mode support for Gemini 3 models - Error handling and retries - Cost tracking - Safety settings configuration Author: Maciej Pienczyn, InPi sp. z o.o. Updated: 2026-01-29 (Gemini 3 SDK migration) """ import os import logging import hashlib import time from datetime import datetime from typing import Optional, Dict, Any, List # New Gemini SDK (google-genai) with thinking mode support from google import genai from google.genai import types # Configure logging logger = logging.getLogger(__name__) # Database imports for cost tracking try: from database import SessionLocal, AIAPICostLog, AIUsageLog DB_AVAILABLE = True except ImportError: logger.warning("Database not available - cost tracking disabled") DB_AVAILABLE = False # Available Gemini models (2026 - Gemini 3 generation available) GEMINI_MODELS = { 'flash': 'gemini-2.5-flash', # Balanced cost/quality 'flash-lite': 'gemini-2.5-flash-lite', # Ultra cheap - $0.10/$0.40 per 1M tokens 'pro': 'gemini-2.5-pro', # High quality 2.5 gen '3-flash': 'gemini-3-flash-preview', # Gemini 3 Flash - thinking mode '3-pro': 'gemini-3.1-pro-preview', # Gemini 3.1 Pro (alias zachowany dla kompatybilności) '3.1-pro': 'gemini-3.1-pro-preview', # Gemini 3.1 Pro - najlepszy reasoning '3.1-flash-lite': 'gemini-3.1-flash-lite-preview', # Gemini 3.1 Flash Lite - szybki, tani } # Models that support thinking mode THINKING_MODELS = {'gemini-3-flash-preview', 'gemini-3.1-pro-preview', 'gemini-3.1-flash-lite-preview'} # Preview models — monitor for GA release to switch for better stability # Track at: https://ai.google.dev/gemini-api/docs/models PREVIEW_MODELS = {'gemini-3-flash-preview', 'gemini-3.1-pro-preview', 'gemini-3.1-flash-lite-preview'} # Fallback chain for rate limit (429) resilience — Paid Tier 1 # Order: primary → quality fallback → cheapest fallback MODEL_FALLBACK_CHAIN = [ 'gemini-3-flash-preview', # 10K RPD - thinking mode, primary 'gemini-3.1-flash-lite-preview', # Quality fallback - gen 3.1 'gemini-2.5-flash-lite', # Unlimited RPD - cheapest, last resort ] # Available thinking levels for Gemini 3 Flash THINKING_LEVELS = { 'minimal': 'MINIMAL', # Lowest latency, minimal reasoning 'low': 'LOW', # Fast, simple tasks 'medium': 'MEDIUM', # Balanced (Gemini 3 Flash only) 'high': 'HIGH', # Maximum reasoning depth (default) } # Pricing per 1M tokens (USD) - updated 2026-01-29 # Note: Flash on Free Tier = $0.00, Pro on Paid Tier = paid pricing GEMINI_PRICING = { # Prices per 1M tokens (USD). Verified 2026-03-25 against ai.google.dev/gemini-api/docs/pricing # Google bills thinking tokens at the OUTPUT rate (included in output pricing). 'gemini-2.5-flash': {'input': 0.30, 'output': 2.50}, 'gemini-2.5-flash-lite': {'input': 0.10, 'output': 0.40}, 'gemini-2.5-pro': {'input': 1.25, 'output': 10.00}, 'gemini-3-flash-preview': {'input': 0.50, 'output': 3.00}, # Paid tier 'gemini-3.1-pro-preview': {'input': 2.00, 'output': 12.00}, # Paid tier 'gemini-3.1-flash-lite-preview': {'input': 0.25, 'output': 1.50}, # Paid tier } class GeminiService: """Service class for Google Gemini API interactions with Thinking Mode support.""" def __init__( self, api_key: Optional[str] = None, model: str = 'flash', thinking_level: str = 'high', include_thoughts: bool = False, fallback_models: Optional[List[str]] = None ): """ Initialize Gemini service. Args: api_key: Google AI API key (reads from env if not provided) model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro') thinking_level: Reasoning depth ('minimal', 'low', 'medium', 'high') include_thoughts: Whether to include thinking process in response (for debugging) fallback_models: List of full model names for 429 fallback (default: MODEL_FALLBACK_CHAIN) API Key: - GOOGLE_GEMINI_API_KEY: Paid tier 1 (all models) """ # Always use paid tier API key if api_key: self.api_key = api_key else: self.api_key = os.getenv('GOOGLE_GEMINI_API_KEY') # Debug: Log API key (masked) if self.api_key: logger.info("Gemini API key loaded successfully") else: logger.error("API key is None or empty!") if not self.api_key or self.api_key == 'TWOJ_KLUCZ_API_TUTAJ': raise ValueError( "GOOGLE_GEMINI_API_KEY not configured. " "Please add your API key to .env file." ) # Initialize new Gemini client self.client = genai.Client(api_key=self.api_key) # Set model self.model_name = GEMINI_MODELS.get(model, GEMINI_MODELS['flash']) # Fallback chain for 429 rate limit resilience self.fallback_models = fallback_models if fallback_models is not None else MODEL_FALLBACK_CHAIN # Thinking mode configuration self.thinking_level = thinking_level self.include_thoughts = include_thoughts self._thinking_enabled = self.model_name in THINKING_MODELS # Safety settings self.safety_settings = [ types.SafetySetting( category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE" ), types.SafetySetting( category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE" ), ] chain_str = ' → '.join(self.fallback_models) logger.info( f"Gemini service initialized: model={self.model_name}, " f"thinking={self._thinking_enabled}, level={thinking_level}, " f"fallback_chain=[{chain_str}]" ) # Warn if using preview model — monitor for GA release if self.model_name in PREVIEW_MODELS: logger.warning( f"Using PREVIEW model: {self.model_name}. " f"Monitor https://ai.google.dev/gemini-api/docs/models for GA release. " f"Switch to GA model when available for better stability." ) @property def thinking_enabled(self) -> bool: """Whether thinking mode is enabled for current model.""" return self._thinking_enabled @property def thinking_level_display(self) -> str: """Human-readable thinking level for UI.""" if not self._thinking_enabled: return "Wyłączony" return { 'minimal': 'Minimalny', 'low': 'Niski', 'medium': 'Średni', 'high': 'Wysoki' }.get(self.thinking_level, self.thinking_level) def get_status(self) -> Dict[str, Any]: """Get service status for UI display.""" return { 'model': self.model_name, 'thinking_enabled': self._thinking_enabled, 'thinking_level': self.thinking_level, 'thinking_level_display': self.thinking_level_display, 'include_thoughts': self.include_thoughts } @staticmethod def _is_rate_limited(error: Exception) -> bool: """Check if error is a 429 / RESOURCE_EXHAUSTED rate limit error.""" error_str = str(error) return '429' in error_str or 'RESOURCE_EXHAUSTED' in error_str @staticmethod def _is_retryable(error: Exception) -> bool: """Check if error is retryable (rate limit or server overload).""" error_str = str(error) return ('429' in error_str or 'RESOURCE_EXHAUSTED' in error_str or '503' in error_str or 'UNAVAILABLE' in error_str) def _build_generation_config(self, model: str, temperature: float, max_tokens: Optional[int], thinking_level: Optional[str], response_schema: Optional[Dict] = None) -> types.GenerateContentConfig: """Build GenerateContentConfig, adjusting thinking mode per model.""" config_params = { 'temperature': temperature, } if max_tokens: config_params['max_output_tokens'] = max_tokens # Structured output: enforce JSON schema on response if response_schema: config_params['response_mime_type'] = 'application/json' config_params['response_schema'] = response_schema # Only add thinking config for models that support it if model in THINKING_MODELS: level = thinking_level or self.thinking_level thinking_config = types.ThinkingConfig( thinking_level=THINKING_LEVELS.get(level, 'HIGH'), include_thoughts=self.include_thoughts ) config_params['thinking_config'] = thinking_config return types.GenerateContentConfig( **config_params, safety_settings=self.safety_settings ) def generate_text( self, prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None, stream: bool = False, thinking_level: Optional[str] = None, feature: str = 'general', user_id: Optional[int] = None, company_id: Optional[int] = None, related_entity_type: Optional[str] = None, related_entity_id: Optional[int] = None, model: Optional[str] = None, response_schema: Optional[Dict] = None ) -> str: """ Generate text using Gemini API with automatic fallback, cost tracking and thinking mode. On 429/503, automatically retries with the next model in fallback chain. Args: prompt: Text prompt to send to the model temperature: Sampling temperature (0.0-1.0). Higher = more creative max_tokens: Maximum tokens to generate (None = model default) stream: Whether to stream the response thinking_level: Override default thinking level for this call feature: Feature name for cost tracking ('chat', 'news_evaluation', etc.) user_id: Optional user ID for cost tracking company_id: Optional company ID for context related_entity_type: Entity type ('zopk_news', 'chat_message', etc.) related_entity_id: Entity ID for reference model: Override model for this call (alias like '3-pro' or full name like 'gemini-3-pro-preview') response_schema: Optional JSON schema dict to enforce structured output (Gemini format) Returns: Generated text response Raises: Exception: If API call fails on all models """ # Resolve model override: accept alias ('3-pro') or full name ('gemini-3-pro-preview') primary_model = self.model_name if model: primary_model = GEMINI_MODELS.get(model, model) # Build ordered list of models to try: primary first, then fallbacks models_to_try = [primary_model] for m in self.fallback_models: if m not in models_to_try: models_to_try.append(m) start_time = time.time() last_error = None for model in models_to_try: try: generation_config = self._build_generation_config( model=model, temperature=temperature, max_tokens=max_tokens, thinking_level=thinking_level, response_schema=response_schema ) # Call API response = self.client.models.generate_content( model=model, contents=prompt, config=generation_config ) if stream: return response # Extract response text response_text = response.text # Count tokens and log cost latency_ms = int((time.time() - start_time) * 1000) input_tokens = self._count_tokens_from_response(response, 'input') output_tokens = self._count_tokens_from_response(response, 'output') thinking_tokens = self._count_tokens_from_response(response, 'thinking') # Log with model & thinking info level = thinking_level or self.thinking_level is_thinking = model in THINKING_MODELS is_fallback = model != self.model_name logger.info( f"Gemini API call successful. " f"Tokens: {input_tokens}+{output_tokens}" f"{f'+{thinking_tokens}t' if thinking_tokens else ''}, " f"Latency: {latency_ms}ms, " f"Model: {model}{'(fallback)' if is_fallback else ''}, " f"Thinking: {level.upper() if is_thinking else 'OFF'}" ) # Log to database for cost tracking (use actual model used) self._log_api_cost( prompt=prompt, response_text=response_text, input_tokens=input_tokens, output_tokens=output_tokens, thinking_tokens=thinking_tokens, latency_ms=latency_ms, success=True, feature=feature, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id, model_override=model if is_fallback else None ) return response_text except Exception as e: if self._is_retryable(e) and model != models_to_try[-1]: logger.warning(f"Retryable error on {model} ({type(e).__name__}), trying next fallback...") last_error = e continue # Non-retryable error or last model in chain — fail latency_ms = int((time.time() - start_time) * 1000) self._log_api_cost( prompt=prompt, response_text='', input_tokens=self._estimate_tokens(prompt), output_tokens=0, thinking_tokens=0, latency_ms=latency_ms, success=False, error_message=str(e), feature=feature, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id, model_override=model if model != self.model_name else None ) logger.error(f"Gemini API error on {model}: {str(e)}") raise Exception(f"Gemini API call failed: {str(e)}") # All models exhausted (all returned 429) latency_ms = int((time.time() - start_time) * 1000) logger.error(f"All fallback models exhausted. Last error: {last_error}") self._log_api_cost( prompt=prompt, response_text='', input_tokens=self._estimate_tokens(prompt), output_tokens=0, thinking_tokens=0, latency_ms=latency_ms, success=False, error_message=f"All models rate limited: {last_error}", feature=feature, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id ) raise Exception(f"All Gemini models rate limited. Last error: {last_error}") def chat(self, messages: List[Dict[str, str]]) -> str: """ Multi-turn chat conversation. Args: messages: List of message dicts with 'role' and 'content' keys Example: [ {'role': 'user', 'content': 'Hello'}, {'role': 'model', 'content': 'Hi there!'}, {'role': 'user', 'content': 'How are you?'} ] Returns: Model's response to the last message """ try: # Build contents from messages contents = [] for msg in messages: role = 'user' if msg['role'] == 'user' else 'model' contents.append(types.Content( role=role, parts=[types.Part(text=msg['content'])] )) # Build config with thinking if available config_params = {'temperature': 0.7} if self._thinking_enabled: config_params['thinking_config'] = types.ThinkingConfig( thinking_level=THINKING_LEVELS.get(self.thinking_level, 'HIGH'), include_thoughts=self.include_thoughts ) generation_config = types.GenerateContentConfig( **config_params, safety_settings=self.safety_settings ) response = self.client.models.generate_content( model=self.model_name, contents=contents, config=generation_config ) return response.text except Exception as e: logger.error(f"Gemini chat error: {str(e)}") raise Exception(f"Gemini chat failed: {str(e)}") def analyze_image(self, image_path: str, prompt: str) -> str: """ Analyze image with Gemini Vision. Args: image_path: Path to image file prompt: Text prompt describing what to analyze Returns: Analysis result """ try: import PIL.Image img = PIL.Image.open(image_path) # Convert image to bytes import io img_bytes = io.BytesIO() img.save(img_bytes, format=img.format or 'PNG') img_bytes = img_bytes.getvalue() contents = [ types.Part(text=prompt), types.Part( inline_data=types.Blob( mime_type=f"image/{(img.format or 'png').lower()}", data=img_bytes ) ) ] response = self.client.models.generate_content( model=self.model_name, contents=contents ) return response.text except Exception as e: logger.error(f"Gemini image analysis error: {str(e)}") raise Exception(f"Image analysis failed: {str(e)}") def count_tokens(self, text: str) -> int: """ Count tokens in text. Args: text: Text to count tokens for Returns: Number of tokens """ try: result = self.client.models.count_tokens( model=self.model_name, contents=text ) return result.total_tokens except Exception as e: logger.warning(f"Token counting failed: {e}") return self._estimate_tokens(text) def _estimate_tokens(self, text: str) -> int: """Estimate tokens when API counting fails (~4 chars per token).""" return len(text) // 4 def _count_tokens_from_response(self, response, token_type: str) -> int: """Extract token count from API response metadata.""" try: usage = response.usage_metadata if not usage: return 0 if token_type == 'input': return getattr(usage, 'prompt_token_count', 0) or 0 elif token_type == 'output': return getattr(usage, 'candidates_token_count', 0) or 0 elif token_type == 'thinking': # SDK may not expose thinking_token_count directly. # Derive from: total - prompt - candidates = thinking tokens. thinking = getattr(usage, 'thinking_token_count', 0) or 0 if not thinking: total = getattr(usage, 'total_token_count', 0) or 0 prompt = getattr(usage, 'prompt_token_count', 0) or 0 candidates = getattr(usage, 'candidates_token_count', 0) or 0 thinking = max(0, total - prompt - candidates) return thinking except Exception: return 0 return 0 def _log_api_cost( self, prompt: str, response_text: str, input_tokens: int, output_tokens: int, thinking_tokens: int = 0, latency_ms: int = 0, success: bool = True, error_message: Optional[str] = None, feature: str = 'general', user_id: Optional[int] = None, company_id: Optional[int] = None, related_entity_type: Optional[str] = None, related_entity_id: Optional[int] = None, model_override: Optional[str] = None ): """ Log API call costs to database for monitoring. Args: prompt: Input prompt text response_text: Output response text input_tokens: Number of input tokens used output_tokens: Number of output tokens generated thinking_tokens: Number of thinking tokens (Gemini 3) latency_ms: Response time in milliseconds success: Whether API call succeeded error_message: Error details if failed feature: Feature name ('chat', 'news_evaluation', 'user_creation', etc.) user_id: Optional user ID company_id: Optional company ID for context related_entity_type: Entity type ('zopk_news', 'chat_message', etc.) related_entity_id: Entity ID for reference model_override: Actual model used (if different from self.model_name due to fallback) """ if not DB_AVAILABLE: return actual_model = model_override or self.model_name try: # Calculate costs using actual model pricing # Google bills thinking tokens at the output rate pricing = GEMINI_PRICING.get(actual_model, {'input': 0.50, 'output': 3.00}) input_cost = (input_tokens / 1_000_000) * pricing['input'] output_cost = ((output_tokens + thinking_tokens) / 1_000_000) * pricing['output'] total_cost = input_cost + output_cost # Cost in cents for AIUsageLog cost_cents = total_cost * 100 # Create prompt hash (for debugging, not storing full prompt for privacy) prompt_hash = hashlib.sha256(prompt.encode()).hexdigest() # Save to database db = SessionLocal() try: # Log to legacy AIAPICostLog table legacy_log = AIAPICostLog( timestamp=datetime.now(), api_provider='gemini', model_name=actual_model, feature=feature, user_id=user_id, input_tokens=input_tokens, output_tokens=output_tokens + thinking_tokens, # Combined for legacy total_tokens=input_tokens + output_tokens + thinking_tokens, input_cost=input_cost, output_cost=output_cost, # Includes thinking (billed at output rate) total_cost=total_cost, success=success, error_message=error_message, latency_ms=latency_ms, prompt_hash=prompt_hash ) db.add(legacy_log) # Log to new AIUsageLog table usage_log = AIUsageLog( request_type=feature, model=actual_model, tokens_input=input_tokens, tokens_output=output_tokens + thinking_tokens, cost_cents=cost_cents, user_id=user_id, company_id=company_id, related_entity_type=related_entity_type, related_entity_id=related_entity_id, prompt_length=len(prompt), response_length=len(response_text), response_time_ms=latency_ms, success=success, error_message=error_message ) db.add(usage_log) db.commit() logger.info( f"API cost logged: {feature} - ${total_cost:.6f} " f"({input_tokens}+{output_tokens}" f"{f'+{thinking_tokens}t' if thinking_tokens else ''} tokens, {latency_ms}ms)" ) finally: db.close() except Exception as e: logger.error(f"Failed to log API cost: {e}") def generate_embedding( self, text: str, task_type: str = 'RETRIEVAL_DOCUMENT', title: Optional[str] = None, user_id: Optional[int] = None, feature: str = 'embedding' ) -> Optional[List[float]]: """ Generate embedding vector for text using Google's text-embedding model. Args: text: Text to embed task_type: One of: - 'RETRIEVAL_DOCUMENT': For documents to be retrieved - 'RETRIEVAL_QUERY': For search queries - 'SEMANTIC_SIMILARITY': For comparing texts - 'CLASSIFICATION': For text classification - 'CLUSTERING': For text clustering title: Optional title for document (improves quality) user_id: User ID for cost tracking feature: Feature name for cost tracking Returns: 768-dimensional embedding vector or None on error """ if not text or not text.strip(): logger.warning("Empty text provided for embedding") return None start_time = time.time() try: # Build content with optional title content_parts = [] if title: content_parts.append(types.Part(text=f"Title: {title}\n\n")) content_parts.append(types.Part(text=text)) result = self.client.models.embed_content( model='gemini-embedding-001', contents=types.Content(parts=content_parts), config=types.EmbedContentConfig( task_type=task_type, output_dimensionality=768 ) ) embedding = result.embeddings[0].values if result.embeddings else None if not embedding: logger.error("No embedding returned from API") return None # Log cost latency_ms = int((time.time() - start_time) * 1000) token_count = len(text) // 4 cost_usd = (token_count / 1000) * 0.00001 logger.debug( f"Embedding generated: {len(embedding)} dims, " f"{token_count} tokens, {latency_ms}ms, ${cost_usd:.8f}" ) return list(embedding) except Exception as e: logger.error(f"Embedding generation error: {e}") return None def generate_embeddings_batch( self, texts: List[str], task_type: str = 'RETRIEVAL_DOCUMENT', user_id: Optional[int] = None ) -> List[Optional[List[float]]]: """ Generate embeddings for multiple texts. Args: texts: List of texts to embed task_type: Task type for all embeddings user_id: User ID for cost tracking Returns: List of embedding vectors (None for failed items) """ results = [] for text in texts: embedding = self.generate_embedding( text=text, task_type=task_type, user_id=user_id, feature='embedding_batch' ) results.append(embedding) return results # Global service instance (initialized in app.py) _gemini_service: Optional[GeminiService] = None def init_gemini_service( api_key: Optional[str] = None, model: str = 'flash', thinking_level: str = 'high' ): """ Initialize global Gemini service instance. Call this in app.py during Flask app initialization. Args: api_key: Google AI API key (optional if set in env) model: Model to use ('flash', 'flash-lite', 'pro', '3-flash', '3-pro') thinking_level: Reasoning depth for Gemini 3 models ('minimal', 'low', 'medium', 'high') """ global _gemini_service try: _gemini_service = GeminiService( api_key=api_key, model=model, thinking_level=thinking_level ) logger.info("Global Gemini service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gemini service: {e}") _gemini_service = None def get_gemini_service() -> Optional[GeminiService]: """ Get global Gemini service instance. Returns: GeminiService instance or None if not initialized """ return _gemini_service def generate_text(prompt: str, **kwargs) -> Optional[str]: """ Convenience function to generate text using global service. Args: prompt: Text prompt **kwargs: Additional arguments for generate_text() Returns: Generated text or None if service not initialized """ service = get_gemini_service() if service: return service.generate_text(prompt, **kwargs) logger.warning("Gemini service not initialized") return None