gemini-webui / app.py
sdafd's picture
Create app.py
e59f7e5 verified
raw
history blame
29 kB
import gradio as gr
import google.generativeai as genai
import os
import mimetypes
from PIL import Image
import io
import magic # python-magic library
from dotenv import load_dotenv
# (Optional) Load environment variables for local testing if you have a .env file
# load_dotenv()
# TEST_API_KEY = os.getenv("GEMINI_API_KEY") # Use this ONLY for your local testing
# --- Constants ---
# Define available models (expand this list as needed)
# Include models supporting different modalities and versions
AVAILABLE_MODELS = [
"gemini-1.5-flash-latest",
"gemini-1.5-pro-latest",
"gemini-1.0-pro",
"gemini-pro-vision", # Example vision model
# "gemini-experimental", # Add other relevant models
]
# Define parameters for each model (Example structure)
# This needs meticulous mapping based on official Gemini documentation
MODEL_PARAMS = {
"gemini-1.5-flash-latest": {
"temperature": {"type": "slider", "min": 0.0, "max": 2.0, "step": 0.1, "default": 1.0},
"top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95},
"top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 40},
"max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 8192},
"stop_sequences": {"type": "textbox", "lines": 1, "placeholder": "e.g., END,STOP", "default": ""},
# Safety settings could be added here too (as dropdowns or checkboxes)
},
"gemini-1.5-pro-latest": {
# Similar params, possibly different defaults or ranges
"temperature": {"type": "slider", "min": 0.0, "max": 2.0, "step": 0.1, "default": 1.0},
"top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95},
"top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 40},
"max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 8192},
"stop_sequences": {"type": "textbox", "lines": 1, "placeholder": "e.g., END,STOP", "default": ""},
},
"gemini-1.0-pro": {
# Params for older model might differ slightly
"temperature": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.1, "default": 0.9}, # Different max/default maybe
"top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95},
"top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 40},
"max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 2048}, # Different default
"stop_sequences": {"type": "textbox", "lines": 1, "placeholder": "e.g., END,STOP", "default": ""},
},
"gemini-pro-vision": {
# Vision models might have fewer text-generation params or different ones
"temperature": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.1, "default": 0.4},
"top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95},
"top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 32},
"max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 2048},
# No stop sequences typically needed here? Check docs.
}
}
# --- Helper Functions ---
def get_mime_type(file_path):
"""Get MIME type using python-magic for reliability."""
try:
mime = magic.Magic(mime=True)
return mime.from_file(file_path)
except Exception:
# Fallback to mimetypes if magic fails
return mimetypes.guess_type(file_path)[0]
def convert_file_to_text(file_obj):
"""
Attempts to convert various file types to text.
Returns (text_content, original_filename) or (None, original_filename) if conversion fails.
"""
file_path = file_obj.name
filename = os.path.basename(file_path)
mime_type = get_mime_type(file_path)
print(f"Processing file: {filename}, MIME type: {mime_type}") # Debugging
try:
if mime_type is None:
# If MIME type is unknown, try reading as text
print(f"Warning: Unknown MIME type for {filename}. Attempting to read as text.")
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read(), filename
elif mime_type.startswith("text/"):
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read(), filename
elif mime_type == "application/pdf":
# Placeholder for PDF conversion (requires pypdf or similar)
print(f"PDF conversion not implemented yet for {filename}.")
# from pypdf import PdfReader # Example
# reader = PdfReader(file_path)
# text = ""
# for page in reader.pages:
# text += page.extract_text() + "\n"
# return text, filename
return f"[Unsupported PDF: {filename} - Conversion not implemented]", filename # Temporary
elif mime_type in ["application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"]:
# Placeholder for DOCX conversion (requires python-docx or similar)
print(f"DOCX conversion not implemented yet for {filename}.")
# import docx # Example
# doc = docx.Document(file_path)
# text = "\n".join([para.text for para in doc.paragraphs])
# return text, filename
return f"[Unsupported Word Doc: {filename} - Conversion not implemented]", filename # Temporary
else:
# For other unsupported types, return a marker
print(f"Unsupported file type: {mime_type} for {filename}. Skipping content.")
return f"[Unsupported file type: {mime_type} - {filename}]", filename
except Exception as e:
print(f"Error converting file {filename}: {e}")
return f"[Error converting file: {filename}]", filename
def prepare_gemini_input(prompt, files):
"""Prepares the input list for Gemini, handling text and images."""
gemini_parts = []
if prompt:
gemini_parts.append(prompt)
if files:
for file_obj in files:
file_path = file_obj.name
mime_type = get_mime_type(file_path)
filename = os.path.basename(file_path)
print(f"Preparing file for Gemini: {filename}, MIME: {mime_type}")
if mime_type and mime_type.startswith("image/"):
try:
img = Image.open(file_path)
# Convert image to bytes (e.g., PNG or JPEG)
# Gemini API directly accepts PIL Images usually
gemini_parts.append(img)
print(f"Added image: {filename}")
except Exception as e:
print(f"Error processing image {filename}: {e}")
gemini_parts.append(f"[Error processing image: {filename}]")
elif mime_type and mime_type.startswith("video/"): # Gemini 1.5 Pro can handle video
# Upload file via File API first (more complex, needs google.ai.generativelanguage)
# For simplicity here, we'll just note it's a video
# or provide a basic text representation if conversion isn't implemented
print(f"Video file detected: {filename}. Full video processing requires File API.")
gemini_parts.append(f"[Video file: {filename} - Requires File API upload]")
# Placeholder: Add text conversion if feasible for your use case
# text_content, _ = convert_file_to_text(file_obj)
# if text_content:
# gemini_parts.append(f"--- Content of video file {filename} (extracted as text) ---\n{text_content}")
elif mime_type and mime_type.startswith("audio/"): # Gemini 1.5 Pro can handle audio
print(f"Audio file detected: {filename}. Full audio processing requires File API.")
gemini_parts.append(f"[Audio file: {filename} - Requires File API upload]")
# Placeholder: Add text conversion if feasible (e.g. transcript)
# text_content, _ = convert_file_to_text(file_obj) # Needs specific audio-to-text logic
# if text_content:
# gemini_parts.append(f"--- Content of audio file {filename} (extracted as text) ---\n{text_content}")
else: # Assume text or convertible to text
text_content, original_filename = convert_file_to_text(file_obj)
if text_content:
# Add context marker
gemini_parts.append(f"\n--- Content from file: {original_filename} ---\n{text_content}\n--- End of file: {original_filename} ---")
else:
gemini_parts.append(f"[Could not process file: {original_filename}]")
# Ensure there's at least one part (maybe an empty string if only files were given?)
if not gemini_parts:
gemini_parts.append("") # Avoid sending empty list
return gemini_parts
# --- Gradio UI Functions ---
def validate_api_key(api_key):
"""Checks if the API key is potentially valid by trying to list models."""
if not api_key:
return "<p style='color: orange;'>Please enter an API Key.</p>"
try:
genai.configure(api_key=api_key)
models = genai.list_models()
# Check if at least one desired model is available with this key
available_core_models = [m.name for m in models if 'generateContent' in m.supported_generation_methods]
if any(model_name.split('/')[-1] in AVAILABLE_MODELS for model_name in available_core_models):
return "<p style='color: green;'>API Key seems valid (can list models).</p>"
else:
return "<p style='color: orange;'>API Key is valid but might not have access to the required Gemini models.</p>"
except Exception as e:
print(f"API Key validation error: {e}")
# Be careful not to leak too much error detail
if "API key not valid" in str(e):
return "<p style='color: red;'>API Key is invalid.</p>"
else:
return f"<p style='color: red;'>API Key validation failed. Error: {str(e)}</p>"
def update_parameter_visibility(model_name):
"""Updates visibility and values of parameter controls based on selected model."""
updates = {}
params_for_model = MODEL_PARAMS.get(model_name, {})
# Define ALL possible parameter components used across models
all_param_keys = set(k for params in MODEL_PARAMS.values() for k in params)
for key in all_param_keys:
param_config = params_for_model.get(key)
if param_config:
# Parameter exists for this model: make visible and set defaults
updates[param_elements[key]] = gr.update(
visible=True,
label=key.replace("_", " ").title(), # Nicer label
value=param_config.get("default") # Set default value
# Add specific updates for slider ranges etc. if needed
# minimum=param_config.get("min"),
# maximum=param_config.get("max"),
# step=param_config.get("step")
)
else:
# Parameter does NOT exist for this model: hide it
updates[param_elements[key]] = gr.update(visible=False, value=None) # Reset value when hiding
return updates
def handle_chat(api_key, model_name, history, message, files, *params_tuple):
"""Handles the chat interaction."""
# 1. Basic Validation
if not api_key:
gr.Warning("Gemini API Key is missing!")
return history, "" # Return unchanged history and empty textbox
if not message and not files:
gr.Warning("Please enter a message or upload files.")
return history, ""
# 2. Configure API Key
try:
genai.configure(api_key=api_key)
except Exception as e:
gr.Error(f"Failed to configure API Key: {e}")
return history, message # Keep message in textbox for retry
# 3. Prepare Generation Config from *params_tuple
param_keys = [key for key, config in MODEL_PARAMS.get(model_name, {}).items()]
generation_config_dict = {}
if len(params_tuple) == len(param_keys):
generation_config_dict = {key: val for key, val in zip(param_keys, params_tuple) if val is not None}
# Handle stop sequences (expecting comma-separated string)
if 'stop_sequences' in generation_config_dict and isinstance(generation_config_dict['stop_sequences'], str):
sequences = [s.strip() for s in generation_config_dict['stop_sequences'].split(',') if s.strip()]
if sequences:
generation_config_dict['stop_sequences'] = sequences
else:
del generation_config_dict['stop_sequences'] # Remove if empty/invalid
print(f"Using Generation Config: {generation_config_dict}") # Debug
else:
print(f"Warning: Mismatch between expected params ({len(param_keys)}) and received params ({len(params_tuple)})")
# 4. Prepare Model Input
gemini_input_parts = prepare_gemini_input(message, files)
print(f"Prepared Gemini Input Parts: {gemini_input_parts}") # Debugging
# 5. Initialize Model and Chat
try:
# Add safety settings if needed/configured
# safety_settings = {...}
model = genai.GenerativeModel(model_name)#, safety_settings=safety_settings)
# Convert Gradio history (list of lists) to Gemini format (list of Content objects)
gemini_history = []
for user_msg, model_msg in history:
# Simple text history for now. Need enhancement for multimodal history.
if user_msg: gemini_history.append({'role': 'user', 'parts': [user_msg]})
if model_msg: gemini_history.append({'role': 'model', 'parts': [model_msg]})
chat = model.start_chat(history=gemini_history)
print(f"Starting chat with history (simplified): {gemini_history}") # Debugging
except Exception as e:
gr.Error(f"Failed to initialize model or chat: {e}")
return history, message # Keep message in textbox
# 6. Send Message and Get Response
response_text = ""
try:
# Use streaming for better UX in chat
response = chat.send_message(gemini_input_parts,
generation_config=genai.types.GenerationConfig(**generation_config_dict),
stream=True)
full_response_content = ""
for chunk in response:
# Check if the chunk has text content
if hasattr(chunk, 'text'):
chunk_text = chunk.text
print(f"Stream chunk: {chunk_text}") # Debug stream
full_response_content += chunk_text
# Yield intermediate updates to the chatbot
current_history = history + [[message or "[Input files only]", full_response_content]]
yield current_history, "" # Update chatbot, clear input
# Check for image data if model supports it (more complex parsing needed)
# elif chunk.parts and chunk.parts[0].inline_data:
# # Handle potential image output - requires modification
# pass
response_text = full_response_content # Final text response
# Check for blocked prompts or safety issues
if not response_text and response.prompt_feedback.block_reason:
block_reason = response.prompt_feedback.block_reason
safety_ratings = response.prompt_feedback.safety_ratings
gr.Warning(f"Request blocked. Reason: {block_reason}. Ratings: {safety_ratings}")
# Append a notice to history instead of an empty response
history.append([message or "[Input files only]", f"[Request blocked due to: {block_reason}]"])
return history, "" # Clear input box
except Exception as e:
gr.Error(f"Error during generation: {e}")
# Optionally add the error to history for context
history.append([message or "[Input files only]", f"[Error during generation: {str(e)}]"])
return history, "" # Clear input box
# 7. Update History and Clear Input
# The yielding above handles intermediate updates. This is the final state.
final_history = history + [[message or "[Input files only]", response_text or "[No text content received]"]]
return final_history, "" # Final update, clear input
def handle_single_response(api_key, model_name, prompt, files, *params_tuple):
"""Handles the single response interaction."""
# 1. Validations
if not api_key:
gr.Warning("Gemini API Key is missing!")
return "[Error: API Key Missing]", None # Text output, Image output
if not prompt and not files:
gr.Warning("Please enter a prompt or upload files.")
return "[Error: No input provided]", None
# 2. Configure API Key
try:
genai.configure(api_key=api_key)
except Exception as e:
gr.Error(f"Failed to configure API Key: {e}")
return f"[Error: API Key Config Failed: {e}]", None
# 3. Prepare Generation Config
param_keys = [key for key, config in MODEL_PARAMS.get(model_name, {}).items()]
generation_config_dict = {}
if len(params_tuple) == len(param_keys):
generation_config_dict = {key: val for key, val in zip(param_keys, params_tuple) if val is not None}
# Handle stop sequences
if 'stop_sequences' in generation_config_dict and isinstance(generation_config_dict['stop_sequences'], str):
sequences = [s.strip() for s in generation_config_dict['stop_sequences'].split(',') if s.strip()]
if sequences:
generation_config_dict['stop_sequences'] = sequences
else:
del generation_config_dict['stop_sequences']
print(f"Using Generation Config: {generation_config_dict}") # Debug
else:
print(f"Warning: Mismatch between expected params ({len(param_keys)}) and received params ({len(params_tuple)})")
# 4. Prepare Model Input
gemini_input_parts = prepare_gemini_input(prompt, files)
print(f"Prepared Gemini Input Parts: {gemini_input_parts}") # Debugging
# 5. Initialize Model
try:
# Add safety settings if needed/configured
model = genai.GenerativeModel(model_name)
except Exception as e:
gr.Error(f"Failed to initialize model: {e}")
return f"[Error: Model Initialization Failed: {e}]", None
# 6. Generate Content (Non-streaming for single response usually)
output_text = "[No text content generated]"
output_image = None # Placeholder for image output
try:
response = model.generate_content(
gemini_input_parts,
generation_config=genai.types.GenerationConfig(**generation_config_dict),
stream=False # Simpler for single turn unless very long output expected
)
# Check for blocked prompts or safety issues
if response.prompt_feedback.block_reason:
block_reason = response.prompt_feedback.block_reason
safety_ratings = response.prompt_feedback.safety_ratings
gr.Warning(f"Request blocked. Reason: {block_reason}. Ratings: {safety_ratings}")
return f"[Request blocked due to: {block_reason}]", None
# Process response parts (could contain text and/or images)
# This part needs refinement based on how Gemini API returns mixed content
# For now, prioritize text and assume first image part if present
response_text_parts = []
for part in response.parts:
if hasattr(part, 'text'):
response_text_parts.append(part.text)
elif hasattr(part, 'inline_data') and part.inline_data.mime_type.startswith('image/'):
if output_image is None: # Display the first image found
try:
image_data = part.inline_data.data
img = Image.open(io.BytesIO(image_data))
output_image = img
print("Image received in response.")
except Exception as img_err:
print(f"Error decoding image from response: {img_err}")
response_text_parts.append("[Error decoding image in response]")
if response_text_parts:
output_text = "\n".join(response_text_parts)
elif hasattr(response, 'text'): # Fallback if parts parsing fails but text attribute exists
output_text = response.text
# Check if only an image was returned (or intended)
if not response_text_parts and output_image is not None:
output_text = "[Image generated - see output below]"
except Exception as e:
gr.Error(f"Error during generation: {e}")
output_text = f"[Error during generation: {str(e)}]"
# 7. Return results
return output_text, output_image
# --- Build Gradio Interface ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Gemini API Interface")
gr.Markdown("Interact with Google Gemini models using your own API key. Supports chat, single responses, file uploads, and model-specific parameters.")
# API Key Section
with gr.Row():
api_key_input = gr.Textbox(
label="Gemini API Key",
placeholder="Enter your Gemini API Key here",
type="password",
scale=3
)
validate_button = gr.Button("Validate Key", scale=1)
api_key_status = gr.Markdown("<p style='color: gray;'>Enter your key and click Validate.</p>")
# Model Selection
model_dropdown = gr.Dropdown(
label="Select Gemini Model",
choices=AVAILABLE_MODELS,
value=AVAILABLE_MODELS[0], # Default model
)
# Dynamic Parameters Section (Initially hidden, updated by model selection)
param_elements = {} # Dictionary to hold parameter UI components
with gr.Accordion("Model Parameters", open=False) as params_accordion:
# Create UI elements for ALL possible parameters defined in MODEL_PARAMS
# They will be shown/hidden by the update_parameter_visibility function
all_possible_params = set(k for params in MODEL_PARAMS.values() for k in params)
for param_name in sorted(list(all_possible_params)): # Sort for consistent order
# Determine control type based on the first model that defines it (can be refined)
control_type = "textbox" # Default
config = {}
for model_cfg in MODEL_PARAMS.values():
if param_name in model_cfg:
config = model_cfg[param_name]
control_type = config.get("type", "textbox")
break # Found config for this param
if control_type == "slider":
param_elements[param_name] = gr.Slider(
label=param_name.replace("_", " ").title(),
minimum=config.get("min", 0),
maximum=config.get("max", 1),
step=config.get("step", 0.1),
value=config.get("default"),
visible=False, # Initially hidden
interactive=True
)
elif control_type == "number":
param_elements[param_name] = gr.Number(
label=param_name.replace("_", " ").title(),
minimum=config.get("min", 1),
step=config.get("step", 1),
value=config.get("default"),
visible=False,
interactive=True
)
else: # Default to Textbox for stop_sequences etc.
param_elements[param_name] = gr.Textbox(
label=param_name.replace("_", " ").title(),
lines=config.get("lines", 1),
placeholder=config.get("placeholder", ""),
value=config.get("default", ""),
visible=False,
interactive=True
)
# Pack the parameter components into a list for function inputs/outputs
# IMPORTANT: The order here MUST match the order expected by handle_chat/handle_single_response
ordered_param_components = [param_elements[key] for key in sorted(param_elements.keys())]
# Main Interaction Area (Tabs)
with gr.Tabs():
# --- Chat Interface Tab ---
with gr.TabItem("Chat Interface"):
gr.Markdown("Have a conversation with the selected model. Upload files to include their content.")
chat_history_state = gr.State([]) # Holds the conversation history
chatbot_display = gr.Chatbot(label="Conversation", height=500)
with gr.Row():
chat_file_upload = gr.File(label="Upload Files (Text, Images, etc.)", file_count="multiple")
with gr.Row():
chat_message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", scale=4, lines=3)
chat_submit_button = gr.Button("Send", variant="primary", scale=1)
clear_chat_button = gr.Button("Clear Chat History")
# --- Single Response Tab ---
with gr.TabItem("Single Response"):
gr.Markdown("Send a prompt (and optionally files) to get a single response from the model.")
with gr.Row():
with gr.Column(scale=2):
single_prompt_input = gr.Textbox(label="Your Prompt", placeholder="Enter your prompt...", lines=5)
single_file_upload = gr.File(label="Upload Files (Text, Images, etc.)", file_count="multiple")
single_submit_button = gr.Button("Generate Response", variant="primary")
with gr.Column(scale=2):
gr.Markdown("**Output:**")
single_output_text = gr.Textbox(label="Text Response", lines=10, interactive=False)
single_output_image = gr.Image(label="Image Response", type="pil", interactive=False) # Display PIL images
# --- Event Wiring ---
# 1. API Key Validation
validate_button.click(
fn=validate_api_key,
inputs=[api_key_input],
outputs=[api_key_status]
)
# 2. Update Parameters UI when Model Changes
model_dropdown.change(
fn=update_parameter_visibility,
inputs=[model_dropdown],
outputs=list(param_elements.values()) # Pass the actual components
)
# Trigger initial parameter visibility update on load
demo.load(
fn=update_parameter_visibility,
inputs=[model_dropdown],
outputs=list(param_elements.values())
)
# 3. Chat Submission Logic (using .then() for streaming if possible, or standard submit)
# Note: Gradio streaming with gr.Chatbot often uses yields
chat_submit_button.click(
fn=handle_chat,
inputs=[
api_key_input,
model_dropdown,
chat_history_state,
chat_message_input,
chat_file_upload
] + ordered_param_components, # Add dynamic params
outputs=[chatbot_display, chat_message_input] # Update chatbot, clear input box
).then(
# Update the state *after* the response is fully generated
lambda history: history, # Simple pass-through to get final history
inputs=chatbot_display,
outputs=chat_history_state
)
# Allow submitting chat by pressing Enter in the textbox
chat_message_input.submit(
fn=handle_chat,
inputs=[
api_key_input,
model_dropdown,
chat_history_state,
chat_message_input,
chat_file_upload
] + ordered_param_components,
outputs=[chatbot_display, chat_message_input]
).then(
lambda history: history,
inputs=chatbot_display,
outputs=chat_history_state
)
# 4. Clear Chat Logic
def clear_chat_history_func():
return [], [] # Clears chatbot display and history state
clear_chat_button.click(
fn=clear_chat_history_func,
inputs=[],
outputs=[chatbot_display, chat_history_state]
)
# 5. Single Response Submission Logic
single_submit_button.click(
fn=handle_single_response,
inputs=[
api_key_input,
model_dropdown,
single_prompt_input,
single_file_upload
] + ordered_param_components, # Add dynamic params
outputs=[single_output_text, single_output_image]
)
# Launch the Gradio app
if __name__ == "__main__":
demo.launch(debug=True) # Set debug=False for deployment