Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| # chatbot_handler.py | |
| import logging | |
| import json | |
| from google import genai | |
| from google.genai import types as genai_types # Import types for GenerateContentConfig | |
| import os | |
| import asyncio | |
| # Gemini API key configuration | |
| GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '') | |
| client = None | |
| model_name = "gemini-2.0-flash" # As per user's documentation snippet, ensure this model is available with their API key type | |
| # This will be used to create genai_types.GenerateContentConfig | |
| generation_config_params = { | |
| "temperature": 0.7, | |
| "top_p": 1, | |
| "top_k": 1, | |
| "max_output_tokens": 2048, | |
| # If you need a system instruction, add it here, e.g.: | |
| # "system_instruction": "You are a helpful AI assistant providing insights on LinkedIn analytics." | |
| } | |
| try: | |
| if GEMINI_API_KEY: | |
| # Initialize client using genai.Client | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| logging.info(f"Gemini client (genai.Client) initialized. Target model for generation: '{model_name}'") | |
| else: | |
| logging.error("Gemini API Key is not set.") | |
| except Exception as e: | |
| logging.error(f"Failed to initialize Gemini client (genai.Client): {e}", exc_info=True) | |
| def format_history_for_gemini(gradio_chat_history: list) -> list: | |
| """Converts Gradio chat history to Gemini content format.""" | |
| gemini_contents = [] | |
| for msg in gradio_chat_history: | |
| role = "user" if msg.get("role") == "user" else "model" | |
| content = msg.get("content") | |
| if isinstance(content, str): | |
| gemini_contents.append({"role": role, "parts": [{"text": content}]}) | |
| elif isinstance(content, list) and len(content) > 0 and isinstance(content[0], dict) and "type" in content[0]: | |
| parts = [] | |
| for part_item in content: | |
| if part_item.get("type") == "text": | |
| parts.append({"text": part_item.get("text", "")}) | |
| if parts: | |
| gemini_contents.append({"role": role, "parts": parts}) | |
| else: | |
| logging.warning(f"Skipping complex but empty content part in chat history: {content}") | |
| else: | |
| logging.warning(f"Skipping non-string/non-standard content in chat history: {content}") | |
| # For `client.models.generate_content`, the `contents` parameter | |
| # expects a list of `Content` objects (or dicts that can be cast to them). | |
| # Each `Content` object has 'role' and 'parts'. | |
| return gemini_contents | |
| async def generate_llm_response(user_message: str, plot_id: str, plot_label: str, chat_history_for_plot: list, plot_data_summary: str = None): | |
| if not client: | |
| logging.error("Gemini client (genai.Client) not initialized.") | |
| return "The AI model is not available. Configuration error." | |
| gemini_formatted_history = format_history_for_gemini(chat_history_for_plot) | |
| if not gemini_formatted_history: | |
| logging.error("Formatted history for Gemini is empty.") | |
| return "There was an issue processing the conversation history (empty)." | |
| if not any(part.get("text","").strip() for message in gemini_formatted_history for part in message.get("parts",[])): | |
| logging.error("Formatted history for Gemini contains no text parts.") | |
| return "There was an issue processing the conversation history for the AI model (empty text)." | |
| try: | |
| response = None | |
| if hasattr(client, 'models') and hasattr(client.models, 'generate_content'): | |
| logging.debug(f"Using genai.Client.models.generate_content for model '{model_name}' (synchronous via asyncio.to_thread)") | |
| # Create the GenerateContentConfig object from our parameters | |
| # This can include system_instruction if added to generation_config_params | |
| gen_config_obj = genai_types.GenerateContentConfig(**generation_config_params) | |
| # Call client.models.generate_content | |
| # 1. Use model_name directly (e.g., "gemini-1.5-flash-latest") | |
| # 2. Use 'config' instead of 'generation_config' for the keyword argument | |
| response = await asyncio.to_thread( | |
| client.models.generate_content, | |
| model=model_name, # Use model_name directly | |
| contents=gemini_formatted_history, | |
| config=gen_config_obj # Corrected keyword argument | |
| ) | |
| else: | |
| logging.error(f"Gemini client (genai.Client) does not have 'models.generate_content' method. Type: {type(client)}") | |
| return "AI model interaction error (SDK method not found)." | |
| # Process response | |
| if hasattr(response, 'prompt_feedback') and response.prompt_feedback and response.prompt_feedback.block_reason: | |
| reason = response.prompt_feedback.block_reason | |
| reason_name = getattr(reason, 'name', str(reason)) | |
| logging.warning(f"Blocked by prompt feedback: {reason_name}") | |
| return f"Blocked due to content policy: {reason_name}." | |
| if hasattr(response, 'text') and response.text: | |
| logging.debug("Response has a direct .text attribute.") | |
| return response.text | |
| logging.debug("Response does not have a direct .text attribute or it's empty, checking candidates.") | |
| if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: | |
| return "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text')) | |
| finish_reason = "UNKNOWN" | |
| if response.candidates and response.candidates[0].finish_reason: | |
| finish_reason_val = response.candidates[0].finish_reason | |
| finish_reason = getattr(finish_reason_val, 'name', str(finish_reason_val)) | |
| if not (hasattr(response, 'text') and response.text) and \ | |
| not (response.candidates and response.candidates[0].content and response.candidates[0].content.parts): | |
| logging.warning(f"No content parts in response and no direct .text. Finish reason: {finish_reason}") | |
| if finish_reason == "SAFETY": | |
| return f"Response generation stopped due to safety reasons. Finish reason: {finish_reason}." | |
| return f"The AI model returned an empty response. Finish reason: {finish_reason}." | |
| return f"Unexpected response structure from AI model (checked .text and .candidates). Finish reason: {finish_reason}." | |
| except AttributeError as ae: | |
| logging.error(f"AttributeError during Gemini call for plot '{plot_label}': {ae}", exc_info=True) | |
| return f"AI model error (Attribute): {type(ae).__name__} - {ae}." | |
| except Exception as e: | |
| logging.error(f"Error generating response for plot '{plot_label}': {e}", exc_info=True) | |
| if "API_KEY_INVALID" in str(e) or "API key not valid" in str(e): | |
| return "AI model error: API key is not valid. Please check configuration." | |
| if "400" in str(e) and "model" in str(e).lower() and "not found" in str(e).lower(): | |
| return f"AI model error: Model '{model_name}' not found or not accessible with your API key." | |
| # Check for the specific TypeError related to generate_content arguments | |
| if isinstance(e, TypeError) and "got an unexpected keyword argument" in str(e): | |
| logging.error(f"TypeError in generate_content call: {e}. This might indicate an issue with SDK version or method signature.") | |
| return f"AI model error (Internal SDK call issue): {e}" | |
| return f"An unexpected error occurred while contacting the AI model: {type(e).__name__}." | |