File size: 10,626 Bytes
de15097
 
 
24f43be
 
d2de44b
24f43be
d2de44b
 
 
 
 
09bc280
24f43be
c231c9f
24f43be
 
09bc280
 
 
 
 
d2de44b
24f43be
09bc280
 
 
 
 
 
c231c9f
09bc280
 
24f43be
 
 
d2de44b
 
c231c9f
24f43be
d2de44b
de15097
 
09bc280
de15097
 
09bc280
c231c9f
 
 
09bc280
 
 
 
 
 
 
 
 
de15097
09bc280
24f43be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de15097
 
d2de44b
c231c9f
d2de44b
24f43be
d2de44b
 
24f43be
de15097
 
24f43be
 
 
 
 
 
 
 
c231c9f
 
09bc280
24f43be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09bc280
 
24f43be
 
09bc280
24f43be
09bc280
 
24f43be
09bc280
 
 
24f43be
 
 
 
 
 
 
 
 
09bc280
 
 
 
 
 
24f43be
09bc280
24f43be
 
 
 
09bc280
 
 
24f43be
 
09bc280
 
 
 
c231c9f
d2de44b
24f43be
 
 
 
 
09bc280
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# chatbot_handler.py
import logging
import json
from google import genai
from google.genai import types as genai_types # Import types for GenerateContentConfig
import os
import asyncio

# Gemini API key configuration
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')

client = None
model_name = "gemini-1.5-flash-latest" # Using a more recent Flash model
# model_name = "gemini-2.0-flash" # As per user's documentation snippet, ensure this model is available with their API key type

# This will be used to create genai_types.GenerateContentConfig
generation_config_params = {
    "temperature": 0.7,
    "top_p": 1,
    "top_k": 1,
    "max_output_tokens": 2048,
}

# Safety settings list
common_safety_settings = [
    {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
    {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
    {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
    {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]

try:
    if GEMINI_API_KEY:
        # Initialize client using genai.Client as per user's documentation and error
        client = genai.Client(api_key=GEMINI_API_KEY)
        logging.info(f"Gemini client (genai.Client) initialized. Target model for generation: '{model_name}'")
    else:
        logging.error("Gemini API Key is not set.")
except Exception as e:
    logging.error(f"Failed to initialize Gemini client (genai.Client): {e}", exc_info=True)


def format_history_for_gemini(gradio_chat_history: list) -> list:
    """Converts Gradio chat history to Gemini content format."""
    gemini_contents = []
    for msg in gradio_chat_history:
        role = "user" if msg.get("role") == "user" else "model"
        content = msg.get("content")
        if isinstance(content, str):
            gemini_contents.append({"role": role, "parts": [{"text": content}]})
        elif isinstance(content, list) and len(content) > 0 and isinstance(content[0], dict) and "type" in content[0]:
            parts = []
            for part_item in content:
                if part_item.get("type") == "text":
                    parts.append({"text": part_item.get("text", "")})
            if parts:
                 gemini_contents.append({"role": role, "parts": parts})
            else:
                logging.warning(f"Skipping complex but empty content part in chat history: {content}")
        else:
            logging.warning(f"Skipping non-string/non-standard content in chat history: {content}")
    # For the older client.models.generate_content, the 'contents' is typically a list of strings or multimodal parts,
    # not a list of role-based dicts. The role-based dicts are for chat history with newer .start_chat().send_message().
    # The user's example shows: contents=["Explain how AI works"]
    # If the history is to be used, it needs to be formatted as a flat list of alternating user/model prompts for some older chat patterns,
    # or the API might only take the latest user message if not using a dedicated chat session object.
    # Given the `client.models.generate_content` structure, we might need to adjust how history is passed.
    # For now, let's assume gemini_formatted_history is what `contents` expects, or it should be just the latest user message.
    # The documentation for client.models.generate_content shows `contents` can be a list of parts.
    # Let's re-evaluate: if chat_history_for_plot is a list of {"role": ..., "parts": ...},
    # client.models.generate_content might expect `contents` to be just the parts of the last user message,
    # or a more complex structure if it supports multi-turn via this method directly.
    # The example `contents=[image, "Tell me about this instrument"]` suggests a list of content parts.
    # Let's assume for now that the `gemini_formatted_history` (which is a list of {"role": ..., "parts": ...})
    # is the correct format for the `contents` argument if the SDK version handles it.
    # If not, this function or its usage in generate_llm_response will need adjustment.
    # For a simple non-chat scenario, contents would be like: `[{"parts": [{"text": user_message}]}]`
    # For a multi-turn conversation, the `contents` parameter for `generate_content`
    # expects a list of `Content` objects (or dicts that can be cast to them).
    # Each `Content` object has 'role' and 'parts'.
    # So, the current `format_history_for_gemini` output *should* be correct.
    return gemini_contents


async def generate_llm_response(user_message: str, plot_id: str, plot_label: str, chat_history_for_plot: list, plot_data_summary: str = None):
    if not client:
        logging.error("Gemini client (genai.Client) not initialized.")
        return "The AI model is not available. Configuration error."

    # gemini_formatted_history will be a list of {"role": ..., "parts": ...} dicts
    gemini_formatted_history = format_history_for_gemini(chat_history_for_plot)

    if not gemini_formatted_history: # Should not happen if chat_history_for_plot has at least one message
        logging.error("Formatted history for Gemini is empty.")
        return "There was an issue processing the conversation history (empty)."
    
    # Ensure the last message has text if it's the only one (e.g. initial prompt)
    if not any(part.get("text","").strip() for message in gemini_formatted_history for part in message.get("parts",[])):
        logging.error("Formatted history for Gemini contains no text parts.")
        return "There was an issue processing the conversation history for the AI model (empty text)."

    try:
        response = None
        # We are now certain we need to use client.models.generate_content
        if hasattr(client, 'models') and hasattr(client.models, 'generate_content'):
            logging.debug(f"Using genai.Client.models.generate_content for model '{model_name}' (synchronous via asyncio.to_thread)")
            
            # The model name for client.models.generate_content should not be prefixed with "models/"
            # if it's like "gemini-1.5-flash-latest" or "gemini-2.0-flash".
            # If your model_name is already "models/gemini-1.5-flash-latest", then it's fine.
            # Let's assume model_name is like "gemini-1.5-flash-latest"
            effective_model_name = model_name 
            if not model_name.startswith("models/"): # Ensure it's not like "models/models/gemini..."
                 effective_model_name = f"models/{model_name}" # Prepend "models/" if not already there

            # Create the GenerateContentConfig object from our parameters
            gen_config_obj = genai_types.GenerateContentConfig(**generation_config_params)

            response = await asyncio.to_thread(
                client.models.generate_content,
                model=effective_model_name, # Pass the model name string
                contents=gemini_formatted_history, # This should be the list of Content dicts
                generation_config=gen_config_obj,
                safety_settings=common_safety_settings
            )
        else:
            logging.error(f"Gemini client (genai.Client) does not have 'models.generate_content' method. Type: {type(client)}")
            return "AI model interaction error (SDK method not found)."

        # Process response (this part should be largely consistent)
        if hasattr(response, 'prompt_feedback') and response.prompt_feedback and response.prompt_feedback.block_reason:
            reason = response.prompt_feedback.block_reason
            reason_name = getattr(reason, 'name', str(reason)) # .name might not exist
            logging.warning(f"Blocked by prompt feedback: {reason_name}")
            return f"Blocked due to content policy: {reason_name}."

        # The user's documentation example uses `response.text` directly.
        # This implies the response object from `client.models.generate_content` might be simpler.
        # Let's check for `response.text` first.
        if hasattr(response, 'text') and response.text:
            logging.debug("Response has a direct .text attribute.")
            return response.text

        # Fallback to candidates structure if .text is not available or empty
        logging.debug("Response does not have a direct .text attribute or it's empty, checking candidates.")
        if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
            return "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
        
        finish_reason = "UNKNOWN"
        if response.candidates and response.candidates[0].finish_reason:
            finish_reason_val = response.candidates[0].finish_reason
            finish_reason = getattr(finish_reason_val, 'name', str(finish_reason_val)) # .name might not exist
            
        if not (hasattr(response, 'text') and response.text) and \
           not (response.candidates and response.candidates[0].content and response.candidates[0].content.parts):
            logging.warning(f"No content parts in response and no direct .text. Finish reason: {finish_reason}")
            if finish_reason == "SAFETY": # Or other relevant finish reasons
                 return f"Response generation stopped due to safety reasons. Finish reason: {finish_reason}."
            return f"The AI model returned an empty response. Finish reason: {finish_reason}."

        # If we reach here, it means .text was empty and candidates structure was also empty/problematic
        return f"Unexpected response structure from AI model (checked .text and .candidates). Finish reason: {finish_reason}."

    except AttributeError as ae:
        logging.error(f"AttributeError during Gemini call for plot '{plot_label}': {ae}", exc_info=True)
        return f"AI model error (Attribute): {type(ae).__name__} - {ae}."
    except Exception as e:
        logging.error(f"Error generating response for plot '{plot_label}': {e}", exc_info=True)
        # Check for specific API errors if possible
        if "API_KEY_INVALID" in str(e) or "API key not valid" in str(e):
             return "AI model error: API key is not valid. Please check configuration."
        if "400" in str(e) and "model" in str(e).lower() and "not found" in str(e).lower(): # Example for model not found
             return f"AI model error: Model '{model_name}' not found or not accessible with your API key."
        return f"An unexpected error occurred while contacting the AI model: {type(e).__name__}."