Whshhs / app.py
Athspi's picture
Update app.py
5fced44 verified
raw
history blame
29 kB
# -*- coding: utf-8 -*-
import os
import gradio as gr
from google import genai
# Make sure to import necessary types from the SDK
from google.generative_ai import types
from google.generative_ai.types import HarmCategory, HarmBlockThreshold # For safety settings
import requests
import markdownify
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
import traceback
import json # Although not directly used in the final code, useful for debugging args
# --- Browser/Web Tool Functions ---
def can_crawl_url(url: str, user_agent: str = "PythonGoogleGenAIAgent/1.0") -> bool:
"""Check robots.txt permissions for a URL"""
# Use a more specific user agent, but '*' is a fallback
if not url:
print("No URL provided to can_crawl_url")
return False
try:
parsed_url = urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
print(f"Invalid URL format for robots.txt check: {url}")
return False # Cannot determine robots.txt location
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
print(f"Checking robots.txt at: {robots_url} for URL: {url}")
# Using RobotFileParser's default opener which handles redirects
rp = RobotFileParser()
rp.set_url(robots_url)
rp.read()
can_fetch = rp.can_fetch(user_agent, url)
print(f"Can fetch {url} with agent '{user_agent}': {can_fetch}")
return can_fetch
except Exception as e:
print(f"Error checking robots.txt for {url}: {e}")
# Default to false if unsure, to be polite to servers
return False
def load_page(url: str) -> str:
"""
Load webpage content as markdown. Designed to be used as a Gemini Function.
Args:
url: The URL of the webpage to load.
Returns:
Markdown content of the page or an error message.
"""
print(f"Attempting to load page: {url}")
if not url:
return "Error: No URL provided."
if not url.startswith(('http://', 'https://')):
return f"Error: Invalid URL scheme. Please provide http or https URL. Got: {url}"
USER_AGENT = "PythonGoogleGenAIAgent/1.0 (Function Calling)" # Be identifiable
if not can_crawl_url(url, user_agent=USER_AGENT):
print(f"URL {url} failed robots.txt check for agent {USER_AGENT}")
return f"Error: Access denied by robots.txt for URL {url}"
try:
headers = {'User-Agent': USER_AGENT}
response = requests.get(url, timeout=15, headers=headers, allow_redirects=True)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
# Check content type - try to only process HTML
content_type = response.headers.get('content-type', '').lower()
if 'html' not in content_type:
print(f"Non-HTML content type '{content_type}' at {url}. Returning summary.")
# Return limited info for non-html types
return f"Content at {url} is of type '{content_type}'. Size: {len(response.content)} bytes. Cannot convert to Markdown."
# Limit content size before markdown conversion to avoid excessive memory/CPU
MAX_CONTENT_SIZE = 1_000_000 # 1MB limit
if len(response.content) > MAX_CONTENT_SIZE:
print(f"Content size {len(response.content)} exceeds limit {MAX_CONTENT_SIZE}. Truncating.")
# Decode potentially large content carefully
try:
html_content = response.content[:MAX_CONTENT_SIZE].decode(response.apparent_encoding or 'utf-8', errors='ignore')
except Exception as decode_err:
print(f"Decoding error after truncation: {decode_err}. Falling back to utf-8 ignore.")
html_content = response.content[:MAX_CONTENT_SIZE].decode('utf-8', errors='ignore')
truncated_msg = "\n\n[Content truncated due to size limit]"
else:
html_content = response.text # Use response.text which handles encoding better for smaller content
truncated_msg = ""
# Convert to Markdown
# Added heading_style for potentially better formatting
markdown_content = markdownify.markdownify(html_content, heading_style="ATX", strip=['script', 'style'], escape_underscores=False)
# Simple cleaning (optional, can be expanded)
markdown_content = '\n'.join([line.strip() for line in markdown_content.splitlines() if line.strip()])
print(f"Successfully loaded and converted {url} to markdown.")
# Add URL source attribution
return f"Content from {url}:\n\n" + markdown_content + truncated_msg
except requests.exceptions.Timeout:
print(f"Timeout error loading page: {url}")
return f"Error: Timeout while trying to load {url}"
except requests.exceptions.RequestException as e:
print(f"Request error loading page {url}: {str(e)}")
return f"Error loading page {url}: {str(e)}"
except Exception as e:
print(f"General error loading page {url}: {str(e)}")
traceback.print_exc() # Print full traceback for debugging
return f"Error loading page {url}: An unexpected error occurred ({type(e).__name__})."
# --- Gemini Client Initialization and Configuration ---
try:
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY environment variable not set.")
genai.configure(api_key=api_key)
# *** Use the requested experimental model ***
MODEL_NAME = "gemini-2.5-pro-exp-03-25"
print(f"Attempting to use EXPERIMENTAL model: {MODEL_NAME}")
# Define the browse tool using FunctionDeclaration
browse_tool = types.Tool(
function_declarations=[
types.FunctionDeclaration(
name='load_page',
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
'url': types.Schema(type=types.Type.STRING, description="The *full* URL of the webpage to load (must start with http:// or https://).")
},
required=['url']
)
)
]
)
# Define the code execution tool
# Enables the model to suggest and potentially execute Python code.
code_execution_tool = types.Tool(code_execution=types.ToolCodeExecution())
# Combine tools that the model can use
tools = [browse_tool, code_execution_tool]
# Create the model instance
model = genai.GenerativeModel(
model_name=MODEL_NAME,
tools=tools,
# Relax safety settings slightly *if needed* for code/complex generation,
# but be aware of the implications. BLOCK_NONE is risky. Use with caution.
# Consider BLOCK_LOW_AND_ABOVE or MEDIUM as safer alternatives.
safety_settings={
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
# Adjust specific categories if you face frequent blocking for safe content.
},
# System instruction (optional but recommended for setting context)
system_instruction="You are a helpful AI assistant called Gemini-Toolkit. You can browse specific web pages provided by the user via the 'load_page' tool. You can also execute Python code using the 'code_execution' tool to perform calculations, analyze data, or demonstrate programming concepts. Explain your reasoning and the steps you take. If asked to browse, confirm the URL you are accessing. If providing code, explain what it does.",
)
print(f"Gemini client initialized with model: {MODEL_NAME} and tools.")
except Exception as e:
print(f"CRITICAL ERROR: Error initializing Gemini client: {e}")
traceback.print_exc()
# Provide a fallback model or handle the error gracefully in the UI
model = None
tools = []
# Consider exiting if the core functionality is unavailable
# raise SystemExit("Failed to initialize core Gemini model.") from e
# --- Gradio App Logic ---
def handle_function_call(function_call):
"""Executes the function call requested by the model."""
function_name = function_call.name
args = function_call.args # This is now a dict-like object
print(f"Executing Function Call: {function_name} with args: {dict(args)}") # Log args
try:
if function_name == 'load_page':
url = args.get('url')
if url:
# Execute the actual function
function_response_content = load_page(url=url)
# Limit response size to send back to Gemini
MAX_RESPONSE_LEN = 50000 # Limit characters sent back
if len(function_response_content) > MAX_RESPONSE_LEN:
print(f"Tool Response truncated from {len(function_response_content)} to {MAX_RESPONSE_LEN} chars.")
function_response_content = function_response_content[:MAX_RESPONSE_LEN] + "\n\n[... Tool Response Truncated Due to Size Limit ...]"
else:
function_response_content = "Error: URL parameter was missing in the function call. Please ensure the 'url' argument is provided."
else:
# Should not happen if tools are defined correctly and model uses them
print(f"Error: Received call for unknown function '{function_name}'")
function_response_content = f"Error: Unknown function '{function_name}' called by the model."
# Create the FunctionResponse part to send back to the model
# API expects the response arg to be a dict, typically {'content': <result>}
function_response_part = types.Part(
function_response=types.FunctionResponse(
name=function_name,
response={'content': function_response_content}
)
)
print(f"Function Response generated for {function_name}")
return function_response_part
except Exception as e:
print(f"Error during execution of function '{function_name}': {e}")
traceback.print_exc()
# Return an error message back to the model
return types.Part(
function_response=types.FunctionResponse(
name=function_name,
response={'error': f"Failed to execute function {function_name}: {str(e)}"}
)
)
def generate_response_with_tools(user_input, history_state):
"""Handles user input, interacts with Gemini (incl. tools), and manages history."""
if not model:
# Handle case where model initialization failed
return "Error: The AI model (Gemini) could not be initialized. Please check the logs or API key configuration.", history_state or []
if not user_input.strip():
# Return immediately if input is empty, don't update history
# Let the UI handle showing this message without clearing history state
# For chatbot, we might just not send anything or return a specific tuple
# Returning just a message for the chatbot display:
return [[None, "Please enter a valid query."]], history_state or []
# --- History Management ---
# Load history from state (should be list of Content objects)
# Initialize if state is None or empty
conversation_history = history_state if isinstance(history_state, list) else []
# Append the user's new message to the history
conversation_history.append(types.Content(role="user", parts=[types.Part.from_text(user_input)]))
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
# Limit history length *before* sending to API to avoid excessive token usage/cost
# Keep the system instruction + last N turns. A turn = user msg + model response (potentially with tool calls/responses)
MAX_HISTORY_TURNS = 10
max_history_items = MAX_HISTORY_TURNS * 2 + (1 if conversation_history and conversation_history[0].role == "system" else 0) # Approx items to keep
if len(conversation_history) > max_history_items:
print(f"Trimming conversation history from {len(conversation_history)} items to ~{max_history_items}")
if conversation_history[0].role == "system":
# Keep system instruction and the latest items
conversation_history = [conversation_history[0]] + conversation_history[-(max_history_items-1):]
else:
# Just keep the latest items
conversation_history = conversation_history[-max_history_items:]
# --- Interaction Loop (for potential tool calls) ---
MAX_TOOL_LOOPS = 5 # Prevent infinite loops if the model keeps calling tools without finishing
loop_count = 0
current_history_for_api = list(conversation_history) # Work with a copy in the loop
try:
while loop_count < MAX_TOOL_LOOPS:
loop_count += 1
print(f"Generation loop {loop_count}/{MAX_TOOL_LOOPS}...")
# Send context and query to Gemini
# Use the potentially trimmed history for this API call
response = model.generate_content(
current_history_for_api,
request_options={"timeout": 120}, # Increase timeout for complex/tool calls
# generation_config=genai.types.GenerationConfig( # If you need temperature etc.
# temperature=0.7
# )
)
# --- Process Response Candidate ---
if not response.candidates:
print("Warning: No candidates received from Gemini.")
# Append a message indicating no response
final_bot_message = "[No response generated by the model.]"
current_history_for_api.append(types.Content(role="model", parts=[types.Part.from_text(final_bot_message)]))
break # Exit loop
candidate = response.candidates[0]
# Check for safety blocks or finish reasons other than STOP or TOOL use
if candidate.finish_reason not in (types.Candidate.FinishReason.STOP, types.Candidate.FinishReason.TOOL_CALL):
print(f"Warning: Generation stopped unexpectedly. Reason: {candidate.finish_reason.name}")
# Append the reason to the conversation for context, if desired
stop_reason_msg = f"[Model stopped generating. Reason: {candidate.finish_reason.name}]"
# Check if there's any text content before adding the stop reason
if candidate.content and candidate.content.parts and any(p.text for p in candidate.content.parts):
current_history_for_api.append(candidate.content) # Add what content there was
# Extract text to display if needed, before adding stop reason
final_bot_message = "".join([p.text for p in candidate.content.parts if p.text]) + f"\n{stop_reason_msg}"
else:
# No text, just add the stop reason message as the model turn
final_bot_message = stop_reason_msg
current_history_for_api.append(types.Content(role="model", parts=[types.Part.from_text(final_bot_message)]))
break # Exit loop
# --- Handle Potential Tool Call ---
has_tool_call = candidate.finish_reason == types.Candidate.FinishReason.TOOL_CALL
# Append the model's response (which might contain text and/or tool calls) to history *before* execution
# The API expects the model's turn asking for the tool first.
current_history_for_api.append(candidate.content)
if has_tool_call:
print("Tool call requested by model.")
tool_calls_to_process = [part.function_call for part in candidate.content.parts if part.function_call]
if not tool_calls_to_process:
print("Warning: Model indicated TOOL_CALL finish reason but no function_call part found.")
# Maybe append an error message? Or just break?
# Let's try to continue, maybe there's text output.
final_bot_message = "".join([p.text for p in candidate.content.parts if p.text])
if not final_bot_message:
final_bot_message = "[Model indicated tool use but provided no details or text.]"
break # Exit loop as we can't proceed with tool call
# Execute the function(s) and get responses
tool_responses = []
for function_call in tool_calls_to_process:
function_response_part = handle_function_call(function_call)
tool_responses.append(function_response_part)
# Add the tool execution results to history for the *next* API call
current_history_for_api.append(types.Content(role="tool", parts=tool_responses)) # Use role="tool"
print("Added tool response(s) to history. Continuing loop...")
continue # Go back to the start of the while loop to call the API again
else:
# No tool call, this is the final response from the model
print("No tool call requested. Final response received.")
final_bot_message = "".join([part.text for part in candidate.content.parts if part.text])
# Also check for code execution *suggestions* or *results* in the final turn
code_parts_display = []
for part in candidate.content.parts:
if part.executable_code:
lang = part.executable_code.language.name.lower() if part.executable_code.language else "python"
code = part.executable_code.code
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
elif part.code_execution_result:
outcome_str = "Success" if part.code_execution_result.outcome == part.code_execution_result.Outcome.OK else "Failure"
code_parts_display.append(f"Code Execution Result ({outcome_str}):\n```\n{part.code_execution_result.output}\n```")
if code_parts_display:
final_bot_message += "\n\n" + "\n\n".join(code_parts_display)
# Handle empty final message case
if not final_bot_message.strip():
final_bot_message = "[Assistant completed its turn without generating text output.]"
break # Exit the while loop
# End of while loop
if loop_count >= MAX_TOOL_LOOPS:
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
final_bot_message = (final_bot_message + "\n\n" if final_bot_message else "") + f"[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
# Ensure the last model message is added even if loop limit reached
if current_history_for_api[-1].role != "model":
current_history_for_api.append(types.Content(role="model", parts=[types.Part.from_text(final_bot_message)]))
print("--- Response Generation Complete ---")
# Update the main history state with the final state of the conversation
# We return the *final* bot message text for display, and the *full* history state
# The chatbot UI needs [[user, bot], [user, bot], ...] format
# Create the Gradio chatbot display format from our history
chatbot_display_list = []
user_msg = None
for i, content in enumerate(current_history_for_api):
# Skip system instruction for display
if content.role == "system": continue
# Combine multi-part messages for display
msg_text = ""
for part in content.parts:
if part.text:
msg_text += part.text + "\n"
# Display code suggestions nicely
elif part.executable_code:
lang = part.executable_code.language.name.lower() if part.executable_code.language else "python"
code = part.executable_code.code
msg_text += f"\nSuggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
# We don't display tool calls/responses directly in chat bubbles usually
# elif part.function_call: msg_text += f"[Requesting tool: {part.function_call.name}]\n"
# elif part.function_response: msg_text += f"[Tool response received for {part.function_response.name}]\n"
elif part.code_execution_result:
outcome_str = "Success" if part.code_execution_result.outcome == part.code_execution_result.Outcome.OK else "Failure"
msg_text += f"\nCode Execution Result ({outcome_str}):\n```\n{part.code_execution_result.output}\n```\n"
msg_text = msg_text.strip()
if not msg_text: continue # Skip empty parts/turns
if content.role == "user":
# If there was a pending user message, start a new pair
user_msg = msg_text
# Append None temporarily for the bot response, it will be filled if available
chatbot_display_list.append([user_msg, None])
elif content.role == "model":
if chatbot_display_list and chatbot_display_list[-1][1] is None:
# Fill in the bot response for the last user message
chatbot_display_list[-1][1] = msg_text
else:
# Model message without a preceding user message (unlikely here, but handle)
# Or potentially consecutive model messages after tool use. Append as separate bot message.
chatbot_display_list.append([None, msg_text])
user_msg = None # Reset pending user message
# Ensure the very last bot message is captured if the loop ended correctly
# This logic might be redundant if the history appending handles it correctly
# Let's rely on history build up and the formatting loop above.
return chatbot_display_list, current_history_for_api # Return display list and history state
except Exception as e:
print(f"ERROR during Gemini generation or tool processing: {str(e)}")
traceback.print_exc()
error_message = f"An error occurred while processing your request: {str(e)}"
# Return error in chatbot format and the history state *before* the error
chatbot_error_display = [[None, error_message]]
# Try to get the display history before error if possible
if 'current_history_for_api' in locals():
# Rebuild display list up to the point before error for continuity
# (This is simplified, full rebuild might be complex)
existing_display = []
for c in current_history_for_api[:-1]: # Exclude potentially problematic last addition
if c.role == "user": existing_display.append([c.parts[0].text, None])
elif c.role == "model" and existing_display and existing_display[-1][1] is None:
existing_display[-1][1] = "".join([p.text for p in c.parts if p.text])
existing_display.append([None, error_message]) # Add error message at end
chatbot_error_display = existing_display
# Return the history *before* this failed turn started
return chatbot_error_display, conversation_history # Revert state to before this turn
# --- Gradio Interface ---
with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as demo:
gr.Markdown(f"# πŸš€ Gemini AI Assistant ({MODEL_NAME})")
gr.Markdown("Ask questions, request info from specific URLs, or ask for code/calculations. Uses function calling and code execution.")
# Chatbot component to display conversation
chatbot_display = gr.Chatbot(
label="Conversation",
bubble_full_width=False,
height=600, # Increased height
show_copy_button=True,
render_markdown=True # Ensure markdown inc code blocks is rendered
)
# Textbox for user input
msg_input = gr.Textbox(
label="Your Query",
placeholder="Ask anything... (e.g., 'Summarize example.com', 'Calculate 2^64', 'Write python code to list files')",
lines=3, # Start with more lines
scale=4 # Take more horizontal space
)
# Use ClearButton which handles multiple components
clear_btn = gr.ClearButton(value="πŸ—‘οΈ Clear Chat")
# Submit button (using default value seems fine)
send_btn = gr.Button("➑️ Send", variant="primary", scale=1)
# Hidden state to store the raw conversation history (list of genai.types.Content)
chat_history_state = gr.State([])
def user_message_update(user_message, history_display_list):
"""Appends the user's message to the display list and clears the input."""
if not user_message.strip(): # Avoid adding empty messages
return gr.update(value=""), history_display_list # Clear input, return unchanged history display
# Append user message with None placeholder for bot response
return gr.update(value=""), history_display_list + [[user_message, None]]
def bot_response_update(history_display_list, history_state):
"""Calls the backend Gemini function and updates display/state."""
if not history_display_list or history_display_list[-1][0] is None:
# Should not happen if user_message_update ran first, but safeguard
print("Warning: bot_response_update called without preceding user message in display.")
# Return unchanged display, maybe signal error? For now, just return current state.
return history_display_list, history_state
user_message = history_display_list[-1][0] # Get the last user message from display list
print(f"User message being sent to backend: {user_message}")
# Call the main Gemini interaction function
# It now returns the *entire* chat history for display, and the updated state
updated_display_list, updated_history_state = generate_response_with_tools(user_message, history_state)
# The backend function now returns the full display list
# Update the state variable directly
return updated_display_list, updated_history_state
# Define the action for sending a message (Enter key in Textbox)
msg_input.submit(
user_message_update, # 1. Update display with user msg, clear input
[msg_input, chatbot_display],
[msg_input, chatbot_display],
queue=False, # Run immediately UI update
).then(
bot_response_update, # 2. Call backend, get full display list & new state
[chatbot_display, chat_history_state], # Pass current display (for last msg) & state
[chatbot_display, chat_history_state] # Update display & state from backend return
)
# Define the action for clicking the Send button
send_btn.click(
user_message_update,
[msg_input, chatbot_display],
[msg_input, chatbot_display],
queue=False,
).then(
bot_response_update,
[chatbot_display, chat_history_state],
[chatbot_display, chat_history_state]
)
# Setup the ClearButton to target the necessary components, including the state
clear_btn.add(components=[msg_input, chatbot_display, chat_history_state])
# The ClearButton itself doesn't need a custom function when using .add()
# It will set components to their default/initial values (Textbox="", Chatbot=None, State=[])
if __name__ == "__main__":
print("Starting Gradio App...")
# Enable queue for handling potentially long API calls/tool executions
# Set share=True to get a public link (remove if only running locally)
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
print("Gradio App Stopped.")