import gradio as gr import google.generativeai as genai import os import mimetypes from PIL import Image import io import magic # python-magic library from dotenv import load_dotenv # (Optional) Load environment variables for local testing if you have a .env file # load_dotenv() # TEST_API_KEY = os.getenv("GEMINI_API_KEY") # Use this ONLY for your local testing # --- Constants --- # Define available models (expand this list as needed) # Include models supporting different modalities and versions AVAILABLE_MODELS = [ "gemini-1.5-flash-latest", "gemini-1.5-pro-latest", "gemini-1.0-pro", "gemini-pro-vision", # Example vision model # "gemini-experimental", # Add other relevant models ] # Define parameters for each model (Example structure) # This needs meticulous mapping based on official Gemini documentation MODEL_PARAMS = { "gemini-1.5-flash-latest": { "temperature": {"type": "slider", "min": 0.0, "max": 2.0, "step": 0.1, "default": 1.0}, "top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95}, "top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 40}, "max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 8192}, "stop_sequences": {"type": "textbox", "lines": 1, "placeholder": "e.g., END,STOP", "default": ""}, # Safety settings could be added here too (as dropdowns or checkboxes) }, "gemini-1.5-pro-latest": { # Similar params, possibly different defaults or ranges "temperature": {"type": "slider", "min": 0.0, "max": 2.0, "step": 0.1, "default": 1.0}, "top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95}, "top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 40}, "max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 8192}, "stop_sequences": {"type": "textbox", "lines": 1, "placeholder": "e.g., END,STOP", "default": ""}, }, "gemini-1.0-pro": { # Params for older model might differ slightly "temperature": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.1, "default": 0.9}, # Different max/default maybe "top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95}, "top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 40}, "max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 2048}, # Different default "stop_sequences": {"type": "textbox", "lines": 1, "placeholder": "e.g., END,STOP", "default": ""}, }, "gemini-pro-vision": { # Vision models might have fewer text-generation params or different ones "temperature": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.1, "default": 0.4}, "top_p": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "default": 0.95}, "top_k": {"type": "slider", "min": 1, "max": 100, "step": 1, "default": 32}, "max_output_tokens": {"type": "number", "min": 1, "step": 1, "default": 2048}, # No stop sequences typically needed here? Check docs. } } # --- Helper Functions --- def get_mime_type(file_path): """Get MIME type using python-magic for reliability.""" try: mime = magic.Magic(mime=True) return mime.from_file(file_path) except Exception: # Fallback to mimetypes if magic fails return mimetypes.guess_type(file_path)[0] def convert_file_to_text(file_obj): """ Attempts to convert various file types to text. Returns (text_content, original_filename) or (None, original_filename) if conversion fails. """ file_path = file_obj.name filename = os.path.basename(file_path) mime_type = get_mime_type(file_path) print(f"Processing file: {filename}, MIME type: {mime_type}") # Debugging try: if mime_type is None: # If MIME type is unknown, try reading as text print(f"Warning: Unknown MIME type for {filename}. Attempting to read as text.") with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read(), filename elif mime_type.startswith("text/"): with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read(), filename elif mime_type == "application/pdf": # Placeholder for PDF conversion (requires pypdf or similar) print(f"PDF conversion not implemented yet for {filename}.") # from pypdf import PdfReader # Example # reader = PdfReader(file_path) # text = "" # for page in reader.pages: # text += page.extract_text() + "\n" # return text, filename return f"[Unsupported PDF: {filename} - Conversion not implemented]", filename # Temporary elif mime_type in ["application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"]: # Placeholder for DOCX conversion (requires python-docx or similar) print(f"DOCX conversion not implemented yet for {filename}.") # import docx # Example # doc = docx.Document(file_path) # text = "\n".join([para.text for para in doc.paragraphs]) # return text, filename return f"[Unsupported Word Doc: {filename} - Conversion not implemented]", filename # Temporary else: # For other unsupported types, return a marker print(f"Unsupported file type: {mime_type} for {filename}. Skipping content.") return f"[Unsupported file type: {mime_type} - {filename}]", filename except Exception as e: print(f"Error converting file {filename}: {e}") return f"[Error converting file: {filename}]", filename def prepare_gemini_input(prompt, files): """Prepares the input list for Gemini, handling text and images.""" gemini_parts = [] if prompt: gemini_parts.append(prompt) if files: for file_obj in files: file_path = file_obj.name mime_type = get_mime_type(file_path) filename = os.path.basename(file_path) print(f"Preparing file for Gemini: {filename}, MIME: {mime_type}") if mime_type and mime_type.startswith("image/"): try: img = Image.open(file_path) # Convert image to bytes (e.g., PNG or JPEG) # Gemini API directly accepts PIL Images usually gemini_parts.append(img) print(f"Added image: {filename}") except Exception as e: print(f"Error processing image {filename}: {e}") gemini_parts.append(f"[Error processing image: {filename}]") elif mime_type and mime_type.startswith("video/"): # Gemini 1.5 Pro can handle video # Upload file via File API first (more complex, needs google.ai.generativelanguage) # For simplicity here, we'll just note it's a video # or provide a basic text representation if conversion isn't implemented print(f"Video file detected: {filename}. Full video processing requires File API.") gemini_parts.append(f"[Video file: {filename} - Requires File API upload]") # Placeholder: Add text conversion if feasible for your use case # text_content, _ = convert_file_to_text(file_obj) # if text_content: # gemini_parts.append(f"--- Content of video file {filename} (extracted as text) ---\n{text_content}") elif mime_type and mime_type.startswith("audio/"): # Gemini 1.5 Pro can handle audio print(f"Audio file detected: {filename}. Full audio processing requires File API.") gemini_parts.append(f"[Audio file: {filename} - Requires File API upload]") # Placeholder: Add text conversion if feasible (e.g. transcript) # text_content, _ = convert_file_to_text(file_obj) # Needs specific audio-to-text logic # if text_content: # gemini_parts.append(f"--- Content of audio file {filename} (extracted as text) ---\n{text_content}") else: # Assume text or convertible to text text_content, original_filename = convert_file_to_text(file_obj) if text_content: # Add context marker gemini_parts.append(f"\n--- Content from file: {original_filename} ---\n{text_content}\n--- End of file: {original_filename} ---") else: gemini_parts.append(f"[Could not process file: {original_filename}]") # Ensure there's at least one part (maybe an empty string if only files were given?) if not gemini_parts: gemini_parts.append("") # Avoid sending empty list return gemini_parts # --- Gradio UI Functions --- def validate_api_key(api_key): """Checks if the API key is potentially valid by trying to list models.""" if not api_key: return "
Please enter an API Key.
" try: genai.configure(api_key=api_key) models = genai.list_models() # Check if at least one desired model is available with this key available_core_models = [m.name for m in models if 'generateContent' in m.supported_generation_methods] if any(model_name.split('/')[-1] in AVAILABLE_MODELS for model_name in available_core_models): return "API Key seems valid (can list models).
" else: return "API Key is valid but might not have access to the required Gemini models.
" except Exception as e: print(f"API Key validation error: {e}") # Be careful not to leak too much error detail if "API key not valid" in str(e): return "API Key is invalid.
" else: return f"API Key validation failed. Error: {str(e)}
" def update_parameter_visibility(model_name): """Updates visibility and values of parameter controls based on selected model.""" updates = {} params_for_model = MODEL_PARAMS.get(model_name, {}) # Define ALL possible parameter components used across models all_param_keys = set(k for params in MODEL_PARAMS.values() for k in params) for key in all_param_keys: param_config = params_for_model.get(key) if param_config: # Parameter exists for this model: make visible and set defaults updates[param_elements[key]] = gr.update( visible=True, label=key.replace("_", " ").title(), # Nicer label value=param_config.get("default") # Set default value # Add specific updates for slider ranges etc. if needed # minimum=param_config.get("min"), # maximum=param_config.get("max"), # step=param_config.get("step") ) else: # Parameter does NOT exist for this model: hide it updates[param_elements[key]] = gr.update(visible=False, value=None) # Reset value when hiding return updates def handle_chat(api_key, model_name, history, message, files, *params_tuple): """Handles the chat interaction.""" # 1. Basic Validation if not api_key: gr.Warning("Gemini API Key is missing!") return history, "" # Return unchanged history and empty textbox if not message and not files: gr.Warning("Please enter a message or upload files.") return history, "" # 2. Configure API Key try: genai.configure(api_key=api_key) except Exception as e: gr.Error(f"Failed to configure API Key: {e}") return history, message # Keep message in textbox for retry # 3. Prepare Generation Config from *params_tuple param_keys = [key for key, config in MODEL_PARAMS.get(model_name, {}).items()] generation_config_dict = {} if len(params_tuple) == len(param_keys): generation_config_dict = {key: val for key, val in zip(param_keys, params_tuple) if val is not None} # Handle stop sequences (expecting comma-separated string) if 'stop_sequences' in generation_config_dict and isinstance(generation_config_dict['stop_sequences'], str): sequences = [s.strip() for s in generation_config_dict['stop_sequences'].split(',') if s.strip()] if sequences: generation_config_dict['stop_sequences'] = sequences else: del generation_config_dict['stop_sequences'] # Remove if empty/invalid print(f"Using Generation Config: {generation_config_dict}") # Debug else: print(f"Warning: Mismatch between expected params ({len(param_keys)}) and received params ({len(params_tuple)})") # 4. Prepare Model Input gemini_input_parts = prepare_gemini_input(message, files) print(f"Prepared Gemini Input Parts: {gemini_input_parts}") # Debugging # 5. Initialize Model and Chat try: # Add safety settings if needed/configured # safety_settings = {...} model = genai.GenerativeModel(model_name)#, safety_settings=safety_settings) # Convert Gradio history (list of lists) to Gemini format (list of Content objects) gemini_history = [] for user_msg, model_msg in history: # Simple text history for now. Need enhancement for multimodal history. if user_msg: gemini_history.append({'role': 'user', 'parts': [user_msg]}) if model_msg: gemini_history.append({'role': 'model', 'parts': [model_msg]}) chat = model.start_chat(history=gemini_history) print(f"Starting chat with history (simplified): {gemini_history}") # Debugging except Exception as e: gr.Error(f"Failed to initialize model or chat: {e}") return history, message # Keep message in textbox # 6. Send Message and Get Response response_text = "" try: # Use streaming for better UX in chat response = chat.send_message(gemini_input_parts, generation_config=genai.types.GenerationConfig(**generation_config_dict), stream=True) full_response_content = "" for chunk in response: # Check if the chunk has text content if hasattr(chunk, 'text'): chunk_text = chunk.text print(f"Stream chunk: {chunk_text}") # Debug stream full_response_content += chunk_text # Yield intermediate updates to the chatbot current_history = history + [[message or "[Input files only]", full_response_content]] yield current_history, "" # Update chatbot, clear input # Check for image data if model supports it (more complex parsing needed) # elif chunk.parts and chunk.parts[0].inline_data: # # Handle potential image output - requires modification # pass response_text = full_response_content # Final text response # Check for blocked prompts or safety issues if not response_text and response.prompt_feedback.block_reason: block_reason = response.prompt_feedback.block_reason safety_ratings = response.prompt_feedback.safety_ratings gr.Warning(f"Request blocked. Reason: {block_reason}. Ratings: {safety_ratings}") # Append a notice to history instead of an empty response history.append([message or "[Input files only]", f"[Request blocked due to: {block_reason}]"]) return history, "" # Clear input box except Exception as e: gr.Error(f"Error during generation: {e}") # Optionally add the error to history for context history.append([message or "[Input files only]", f"[Error during generation: {str(e)}]"]) return history, "" # Clear input box # 7. Update History and Clear Input # The yielding above handles intermediate updates. This is the final state. final_history = history + [[message or "[Input files only]", response_text or "[No text content received]"]] return final_history, "" # Final update, clear input def handle_single_response(api_key, model_name, prompt, files, *params_tuple): """Handles the single response interaction.""" # 1. Validations if not api_key: gr.Warning("Gemini API Key is missing!") return "[Error: API Key Missing]", None # Text output, Image output if not prompt and not files: gr.Warning("Please enter a prompt or upload files.") return "[Error: No input provided]", None # 2. Configure API Key try: genai.configure(api_key=api_key) except Exception as e: gr.Error(f"Failed to configure API Key: {e}") return f"[Error: API Key Config Failed: {e}]", None # 3. Prepare Generation Config param_keys = [key for key, config in MODEL_PARAMS.get(model_name, {}).items()] generation_config_dict = {} if len(params_tuple) == len(param_keys): generation_config_dict = {key: val for key, val in zip(param_keys, params_tuple) if val is not None} # Handle stop sequences if 'stop_sequences' in generation_config_dict and isinstance(generation_config_dict['stop_sequences'], str): sequences = [s.strip() for s in generation_config_dict['stop_sequences'].split(',') if s.strip()] if sequences: generation_config_dict['stop_sequences'] = sequences else: del generation_config_dict['stop_sequences'] print(f"Using Generation Config: {generation_config_dict}") # Debug else: print(f"Warning: Mismatch between expected params ({len(param_keys)}) and received params ({len(params_tuple)})") # 4. Prepare Model Input gemini_input_parts = prepare_gemini_input(prompt, files) print(f"Prepared Gemini Input Parts: {gemini_input_parts}") # Debugging # 5. Initialize Model try: # Add safety settings if needed/configured model = genai.GenerativeModel(model_name) except Exception as e: gr.Error(f"Failed to initialize model: {e}") return f"[Error: Model Initialization Failed: {e}]", None # 6. Generate Content (Non-streaming for single response usually) output_text = "[No text content generated]" output_image = None # Placeholder for image output try: response = model.generate_content( gemini_input_parts, generation_config=genai.types.GenerationConfig(**generation_config_dict), stream=False # Simpler for single turn unless very long output expected ) # Check for blocked prompts or safety issues if response.prompt_feedback.block_reason: block_reason = response.prompt_feedback.block_reason safety_ratings = response.prompt_feedback.safety_ratings gr.Warning(f"Request blocked. Reason: {block_reason}. Ratings: {safety_ratings}") return f"[Request blocked due to: {block_reason}]", None # Process response parts (could contain text and/or images) # This part needs refinement based on how Gemini API returns mixed content # For now, prioritize text and assume first image part if present response_text_parts = [] for part in response.parts: if hasattr(part, 'text'): response_text_parts.append(part.text) elif hasattr(part, 'inline_data') and part.inline_data.mime_type.startswith('image/'): if output_image is None: # Display the first image found try: image_data = part.inline_data.data img = Image.open(io.BytesIO(image_data)) output_image = img print("Image received in response.") except Exception as img_err: print(f"Error decoding image from response: {img_err}") response_text_parts.append("[Error decoding image in response]") if response_text_parts: output_text = "\n".join(response_text_parts) elif hasattr(response, 'text'): # Fallback if parts parsing fails but text attribute exists output_text = response.text # Check if only an image was returned (or intended) if not response_text_parts and output_image is not None: output_text = "[Image generated - see output below]" except Exception as e: gr.Error(f"Error during generation: {e}") output_text = f"[Error during generation: {str(e)}]" # 7. Return results return output_text, output_image # --- Build Gradio Interface --- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Gemini API Interface") gr.Markdown("Interact with Google Gemini models using your own API key. Supports chat, single responses, file uploads, and model-specific parameters.") # API Key Section with gr.Row(): api_key_input = gr.Textbox( label="Gemini API Key", placeholder="Enter your Gemini API Key here", type="password", scale=3 ) validate_button = gr.Button("Validate Key", scale=1) api_key_status = gr.Markdown("Enter your key and click Validate.
") # Model Selection model_dropdown = gr.Dropdown( label="Select Gemini Model", choices=AVAILABLE_MODELS, value=AVAILABLE_MODELS[0], # Default model ) # Dynamic Parameters Section (Initially hidden, updated by model selection) param_elements = {} # Dictionary to hold parameter UI components with gr.Accordion("Model Parameters", open=False) as params_accordion: # Create UI elements for ALL possible parameters defined in MODEL_PARAMS # They will be shown/hidden by the update_parameter_visibility function all_possible_params = set(k for params in MODEL_PARAMS.values() for k in params) for param_name in sorted(list(all_possible_params)): # Sort for consistent order # Determine control type based on the first model that defines it (can be refined) control_type = "textbox" # Default config = {} for model_cfg in MODEL_PARAMS.values(): if param_name in model_cfg: config = model_cfg[param_name] control_type = config.get("type", "textbox") break # Found config for this param if control_type == "slider": param_elements[param_name] = gr.Slider( label=param_name.replace("_", " ").title(), minimum=config.get("min", 0), maximum=config.get("max", 1), step=config.get("step", 0.1), value=config.get("default"), visible=False, # Initially hidden interactive=True ) elif control_type == "number": param_elements[param_name] = gr.Number( label=param_name.replace("_", " ").title(), minimum=config.get("min", 1), step=config.get("step", 1), value=config.get("default"), visible=False, interactive=True ) else: # Default to Textbox for stop_sequences etc. param_elements[param_name] = gr.Textbox( label=param_name.replace("_", " ").title(), lines=config.get("lines", 1), placeholder=config.get("placeholder", ""), value=config.get("default", ""), visible=False, interactive=True ) # Pack the parameter components into a list for function inputs/outputs # IMPORTANT: The order here MUST match the order expected by handle_chat/handle_single_response ordered_param_components = [param_elements[key] for key in sorted(param_elements.keys())] # Main Interaction Area (Tabs) with gr.Tabs(): # --- Chat Interface Tab --- with gr.TabItem("Chat Interface"): gr.Markdown("Have a conversation with the selected model. Upload files to include their content.") chat_history_state = gr.State([]) # Holds the conversation history chatbot_display = gr.Chatbot(label="Conversation", height=500) with gr.Row(): chat_file_upload = gr.File(label="Upload Files (Text, Images, etc.)", file_count="multiple") with gr.Row(): chat_message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", scale=4, lines=3) chat_submit_button = gr.Button("Send", variant="primary", scale=1) clear_chat_button = gr.Button("Clear Chat History") # --- Single Response Tab --- with gr.TabItem("Single Response"): gr.Markdown("Send a prompt (and optionally files) to get a single response from the model.") with gr.Row(): with gr.Column(scale=2): single_prompt_input = gr.Textbox(label="Your Prompt", placeholder="Enter your prompt...", lines=5) single_file_upload = gr.File(label="Upload Files (Text, Images, etc.)", file_count="multiple") single_submit_button = gr.Button("Generate Response", variant="primary") with gr.Column(scale=2): gr.Markdown("**Output:**") single_output_text = gr.Textbox(label="Text Response", lines=10, interactive=False) single_output_image = gr.Image(label="Image Response", type="pil", interactive=False) # Display PIL images # --- Event Wiring --- # 1. API Key Validation validate_button.click( fn=validate_api_key, inputs=[api_key_input], outputs=[api_key_status] ) # 2. Update Parameters UI when Model Changes model_dropdown.change( fn=update_parameter_visibility, inputs=[model_dropdown], outputs=list(param_elements.values()) # Pass the actual components ) # Trigger initial parameter visibility update on load demo.load( fn=update_parameter_visibility, inputs=[model_dropdown], outputs=list(param_elements.values()) ) # 3. Chat Submission Logic (using .then() for streaming if possible, or standard submit) # Note: Gradio streaming with gr.Chatbot often uses yields chat_submit_button.click( fn=handle_chat, inputs=[ api_key_input, model_dropdown, chat_history_state, chat_message_input, chat_file_upload ] + ordered_param_components, # Add dynamic params outputs=[chatbot_display, chat_message_input] # Update chatbot, clear input box ).then( # Update the state *after* the response is fully generated lambda history: history, # Simple pass-through to get final history inputs=chatbot_display, outputs=chat_history_state ) # Allow submitting chat by pressing Enter in the textbox chat_message_input.submit( fn=handle_chat, inputs=[ api_key_input, model_dropdown, chat_history_state, chat_message_input, chat_file_upload ] + ordered_param_components, outputs=[chatbot_display, chat_message_input] ).then( lambda history: history, inputs=chatbot_display, outputs=chat_history_state ) # 4. Clear Chat Logic def clear_chat_history_func(): return [], [] # Clears chatbot display and history state clear_chat_button.click( fn=clear_chat_history_func, inputs=[], outputs=[chatbot_display, chat_history_state] ) # 5. Single Response Submission Logic single_submit_button.click( fn=handle_single_response, inputs=[ api_key_input, model_dropdown, single_prompt_input, single_file_upload ] + ordered_param_components, # Add dynamic params outputs=[single_output_text, single_output_image] ) # Launch the Gradio app if __name__ == "__main__": demo.launch(debug=True) # Set debug=False for deployment