File size: 7,590 Bytes
de15097
 
 
24f43be
 
d2de44b
24f43be
d2de44b
 
 
 
 
fd8976c
c231c9f
24f43be
 
09bc280
 
 
 
a8afa39
 
09bc280
d2de44b
c231c9f
09bc280
 
a8afa39
24f43be
 
d2de44b
 
c231c9f
24f43be
d2de44b
de15097
 
09bc280
de15097
 
09bc280
c231c9f
 
 
09bc280
 
 
 
 
 
a8afa39
09bc280
 
de15097
09bc280
a8afa39
24f43be
 
de15097
 
d2de44b
c231c9f
d2de44b
24f43be
d2de44b
 
de15097
 
a8afa39
24f43be
 
 
 
 
 
c231c9f
 
09bc280
24f43be
 
 
 
a8afa39
24f43be
 
a8afa39
 
 
24f43be
 
a8afa39
 
fd8976c
 
09bc280
 
24f43be
 
09bc280
a8afa39
09bc280
 
a8afa39
09bc280
 
 
24f43be
 
 
 
 
09bc280
 
 
 
 
 
a8afa39
09bc280
24f43be
 
 
a8afa39
 
09bc280
 
24f43be
09bc280
 
 
 
c231c9f
d2de44b
24f43be
a8afa39
 
 
 
 
 
 
09bc280
a8afa39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# chatbot_handler.py
import logging
import json
from google import genai
from google.genai import types as genai_types # Import types for GenerateContentConfig
import os
import asyncio

# Gemini API key configuration
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')

client = None
model_name = "gemini-2.0-flash" # As per user's documentation snippet, ensure this model is available with their API key type

# This will be used to create genai_types.GenerateContentConfig
generation_config_params = {
    "temperature": 0.7,
    "top_p": 1,
    "top_k": 1,
    "max_output_tokens": 2048,
    # If you need a system instruction, add it here, e.g.:
    # "system_instruction": "You are a helpful AI assistant providing insights on LinkedIn analytics."
}


try:
    if GEMINI_API_KEY:
        # Initialize client using genai.Client
        client = genai.Client(api_key=GEMINI_API_KEY)
        logging.info(f"Gemini client (genai.Client) initialized. Target model for generation: '{model_name}'")
    else:
        logging.error("Gemini API Key is not set.")
except Exception as e:
    logging.error(f"Failed to initialize Gemini client (genai.Client): {e}", exc_info=True)


def format_history_for_gemini(gradio_chat_history: list) -> list:
    """Converts Gradio chat history to Gemini content format."""
    gemini_contents = []
    for msg in gradio_chat_history:
        role = "user" if msg.get("role") == "user" else "model"
        content = msg.get("content")
        if isinstance(content, str):
            gemini_contents.append({"role": role, "parts": [{"text": content}]})
        elif isinstance(content, list) and len(content) > 0 and isinstance(content[0], dict) and "type" in content[0]:
            parts = []
            for part_item in content:
                if part_item.get("type") == "text":
                    parts.append({"text": part_item.get("text", "")})
            if parts:
                gemini_contents.append({"role": role, "parts": parts})
            else:
                logging.warning(f"Skipping complex but empty content part in chat history: {content}")
        else:
            logging.warning(f"Skipping non-string/non-standard content in chat history: {content}")
    # For `client.models.generate_content`, the `contents` parameter
    # expects a list of `Content` objects (or dicts that can be cast to them).
    # Each `Content` object has 'role' and 'parts'.
    return gemini_contents


async def generate_llm_response(user_message: str, plot_id: str, plot_label: str, chat_history_for_plot: list, plot_data_summary: str = None):
    if not client:
        logging.error("Gemini client (genai.Client) not initialized.")
        return "The AI model is not available. Configuration error."

    gemini_formatted_history = format_history_for_gemini(chat_history_for_plot)

    if not gemini_formatted_history:
        logging.error("Formatted history for Gemini is empty.")
        return "There was an issue processing the conversation history (empty)."
    
    if not any(part.get("text","").strip() for message in gemini_formatted_history for part in message.get("parts",[])):
        logging.error("Formatted history for Gemini contains no text parts.")
        return "There was an issue processing the conversation history for the AI model (empty text)."

    try:
        response = None
        if hasattr(client, 'models') and hasattr(client.models, 'generate_content'):
            logging.debug(f"Using genai.Client.models.generate_content for model '{model_name}' (synchronous via asyncio.to_thread)")
            
            # Create the GenerateContentConfig object from our parameters
            # This can include system_instruction if added to generation_config_params
            gen_config_obj = genai_types.GenerateContentConfig(**generation_config_params)

            # Call client.models.generate_content
            # 1. Use model_name directly (e.g., "gemini-1.5-flash-latest")
            # 2. Use 'config' instead of 'generation_config' for the keyword argument
            response = await asyncio.to_thread(
                client.models.generate_content,
                model=model_name,  # Use model_name directly
                contents=gemini_formatted_history,
                config=gen_config_obj  # Corrected keyword argument
                
            )
        else:
            logging.error(f"Gemini client (genai.Client) does not have 'models.generate_content' method. Type: {type(client)}")
            return "AI model interaction error (SDK method not found)."

        # Process response
        if hasattr(response, 'prompt_feedback') and response.prompt_feedback and response.prompt_feedback.block_reason:
            reason = response.prompt_feedback.block_reason
            reason_name = getattr(reason, 'name', str(reason))
            logging.warning(f"Blocked by prompt feedback: {reason_name}")
            return f"Blocked due to content policy: {reason_name}."

        if hasattr(response, 'text') and response.text:
            logging.debug("Response has a direct .text attribute.")
            return response.text

        logging.debug("Response does not have a direct .text attribute or it's empty, checking candidates.")
        if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
            return "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
        
        finish_reason = "UNKNOWN"
        if response.candidates and response.candidates[0].finish_reason:
            finish_reason_val = response.candidates[0].finish_reason
            finish_reason = getattr(finish_reason_val, 'name', str(finish_reason_val))
            
        if not (hasattr(response, 'text') and response.text) and \
           not (response.candidates and response.candidates[0].content and response.candidates[0].content.parts):
            logging.warning(f"No content parts in response and no direct .text. Finish reason: {finish_reason}")
            if finish_reason == "SAFETY":
                return f"Response generation stopped due to safety reasons. Finish reason: {finish_reason}."
            return f"The AI model returned an empty response. Finish reason: {finish_reason}."

        return f"Unexpected response structure from AI model (checked .text and .candidates). Finish reason: {finish_reason}."

    except AttributeError as ae:
        logging.error(f"AttributeError during Gemini call for plot '{plot_label}': {ae}", exc_info=True)
        return f"AI model error (Attribute): {type(ae).__name__} - {ae}."
    except Exception as e:
        logging.error(f"Error generating response for plot '{plot_label}': {e}", exc_info=True)
        if "API_KEY_INVALID" in str(e) or "API key not valid" in str(e):
            return "AI model error: API key is not valid. Please check configuration."
        if "400" in str(e) and "model" in str(e).lower() and "not found" in str(e).lower():
            return f"AI model error: Model '{model_name}' not found or not accessible with your API key."
        # Check for the specific TypeError related to generate_content arguments
        if isinstance(e, TypeError) and "got an unexpected keyword argument" in str(e):
             logging.error(f"TypeError in generate_content call: {e}. This might indicate an issue with SDK version or method signature.")
             return f"AI model error (Internal SDK call issue): {e}"
        return f"An unexpected error occurred while contacting the AI model: {type(e).__name__}."