Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import InferenceClient | |
import os | |
import json | |
import base64 | |
from PIL import Image | |
import io | |
ACCESS_TOKEN = os.getenv("HF_TOKEN") | |
print("Access token loaded.") | |
def encode_image_to_base64(image): | |
"""Convert a PIL Image to a base64 string""" | |
buffered = io.BytesIO() | |
image.save(buffered, format="JPEG") | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
return img_str | |
def process_uploaded_images(images): | |
"""Process uploaded images and return image_url dicts for API submission""" | |
if not images: | |
return [] | |
image_contents = [] | |
for img in images: | |
if isinstance(img, str): # Path to an image | |
try: | |
image = Image.open(img) | |
base64_image = encode_image_to_base64(image) | |
image_contents.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}" | |
} | |
}) | |
except Exception as e: | |
print(f"Error processing image {img}: {e}") | |
else: # Already a PIL Image | |
try: | |
base64_image = encode_image_to_base64(img) | |
image_contents.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}" | |
} | |
}) | |
except Exception as e: | |
print(f"Error processing uploaded image: {e}") | |
return image_contents | |
def respond( | |
message, | |
images, # New parameter for uploaded images | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
frequency_penalty, | |
seed, | |
provider, | |
custom_api_key, | |
custom_model, | |
model_search_term, | |
selected_model | |
): | |
print(f"Received message: {message}") | |
print(f"Received images: {len(images) if images else 0} image(s)") | |
print(f"History: {history}") | |
print(f"System message: {system_message}") | |
print(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}") | |
print(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}") | |
print(f"Selected provider: {provider}") | |
print(f"Custom API Key provided: {bool(custom_api_key.strip())}") | |
print(f"Selected model (custom_model): {custom_model}") | |
print(f"Model search term: {model_search_term}") | |
print(f"Selected model from radio: {selected_model}") | |
# Determine which token to use - custom API key if provided, otherwise the ACCESS_TOKEN | |
token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN | |
# Log which token source we're using (without printing the actual token) | |
if custom_api_key.strip() != "": | |
print("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication") | |
else: | |
print("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication") | |
# Initialize the Inference Client with the provider and appropriate token | |
client = InferenceClient(token=token_to_use, provider=provider) | |
print(f"Hugging Face Inference Client initialized with {provider} provider.") | |
# Convert seed to None if -1 (meaning random) | |
if seed == -1: | |
seed = None | |
# Prepare messages in the format expected by the API | |
messages = [{"role": "system", "content": system_message}] | |
print("Initial messages array constructed.") | |
# Add conversation history to the context | |
for val in history: | |
user_part = val[0] | |
assistant_part = val[1] | |
# Process user messages (could be multimodal) | |
if user_part: | |
# Check if the user message is already multimodal (from history) | |
if isinstance(user_part, list): | |
# Already in multimodal format, use as is | |
messages.append({"role": "user", "content": user_part}) | |
print("Added multimodal user message from history") | |
else: | |
# Simple text message | |
messages.append({"role": "user", "content": user_part}) | |
print(f"Added user message to context: {user_part}") | |
# Process assistant messages (always text) | |
if assistant_part: | |
messages.append({"role": "assistant", "content": assistant_part}) | |
print(f"Added assistant message to context: {assistant_part}") | |
# Process the current message (could include images) | |
current_message_content = [] | |
# Add text content if provided | |
if message and message.strip(): | |
current_message_content.append({ | |
"type": "text", | |
"text": message | |
}) | |
# Process and add image content if provided | |
if images: | |
image_contents = process_uploaded_images(images) | |
current_message_content.extend(image_contents) | |
# Format the final message based on content | |
if current_message_content: | |
if len(current_message_content) == 1 and "type" in current_message_content[0] and current_message_content[0]["type"] == "text": | |
# If only text, use simple string format for compatibility with all models | |
messages.append({"role": "user", "content": current_message_content[0]["text"]}) | |
print(f"Added simple text user message: {current_message_content[0]['text']}") | |
else: | |
# If multimodal content, use the array format | |
messages.append({"role": "user", "content": current_message_content}) | |
print(f"Added multimodal user message with {len(current_message_content)} parts") | |
# Determine which model to use, prioritizing custom_model if provided | |
model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model | |
print(f"Model selected for inference: {model_to_use}") | |
# Start with an empty string to build the response as tokens stream in | |
response = "" | |
print(f"Sending request to {provider} provider.") | |
# Prepare parameters for the chat completion request | |
parameters = { | |
"max_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"frequency_penalty": frequency_penalty, | |
} | |
if seed is not None: | |
parameters["seed"] = seed | |
# Use the InferenceClient for making the request | |
try: | |
# Create a generator for the streaming response | |
stream = client.chat_completion( | |
model=model_to_use, | |
messages=messages, | |
stream=True, | |
**parameters | |
) | |
# Print a starting message for token streaming | |
print("Received tokens: ", end="", flush=True) | |
# Process the streaming response | |
for chunk in stream: | |
if hasattr(chunk, 'choices') and len(chunk.choices) > 0: | |
# Extract the content from the response | |
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): | |
token_text = chunk.choices[0].delta.content | |
if token_text: | |
# Print tokens inline without newlines | |
print(token_text, end="", flush=True) | |
response += token_text | |
yield response | |
# Print a newline at the end of all tokens | |
print() | |
except Exception as e: | |
print(f"Error during inference: {e}") | |
response += f"\nError: {str(e)}" | |
yield response | |
print("Completed response generation.") | |
# Function to validate provider selection based on BYOK | |
def validate_provider(api_key, provider): | |
# If no custom API key is provided, only "hf-inference" can be used | |
if not api_key.strip() and provider != "hf-inference": | |
return gr.update(value="hf-inference") | |
return gr.update(value=provider) | |
# Function to update featured model list based on search | |
def filter_models(search_term): | |
print(f"Filtering models with search term: {search_term}") | |
filtered = [m for m in models_list if search_term.lower() in m.lower()] | |
print(f"Filtered models: {filtered}") | |
return gr.update(choices=filtered) | |
def set_custom_model_from_radio(selected): | |
""" | |
This function will get triggered whenever someone picks a model from the 'Featured Models' radio. | |
We will update the Custom Model text box with that selection automatically. | |
""" | |
print(f"Featured model selected: {selected}") | |
return selected | |
# Define multimodal models list | |
multimodal_models_list = [ | |
"meta-llama/Llama-3.3-70B-Vision-Instruct", | |
"meta-llama/Llama-3.1-8B-Vision-Instruct", | |
"Qwen/Qwen2.5-VL-7B-Chat", | |
"Qwen/Qwen2.5-VL-3B-Chat", | |
"microsoft/Phi-3-vision-instruct", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"deepseek-ai/DeepSeek-VL-7B-Chat", | |
"01-ai/Yi-VL-6B-Chat", | |
"01-ai/Yi-VL-34B-Chat", | |
"Cohere/command-vision-nightly", | |
"LLaVA/llava-1.6-34b-hf", | |
"fireworks-ai/FireworksBridge-Vision-Alpha", | |
"liuhaotian/llava-v1.6-vicuna-13b", | |
] | |
# Add multimodal models to the full model list | |
models_list = [ | |
"meta-llama/Llama-3.3-70B-Instruct", | |
"meta-llama/Llama-3.1-70B-Instruct", | |
"meta-llama/Llama-3.0-70B-Instruct", | |
"meta-llama/Llama-3.2-3B-Instruct", | |
"meta-llama/Llama-3.2-1B-Instruct", | |
"meta-llama/Llama-3.1-8B-Instruct", | |
"NousResearch/Hermes-3-Llama-3.1-8B", | |
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
"mistralai/Mistral-Nemo-Instruct-2407", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
"mistralai/Mistral-7B-Instruct-v0.2", | |
"Qwen/Qwen3-235B-A22B", | |
"Qwen/Qwen3-32B", | |
"Qwen/Qwen2.5-72B-Instruct", | |
"Qwen/Qwen2.5-3B-Instruct", | |
"Qwen/Qwen2.5-0.5B-Instruct", | |
"Qwen/QwQ-32B", | |
"Qwen/Qwen2.5-Coder-32B-Instruct", | |
"microsoft/Phi-3.5-mini-instruct", | |
"microsoft/Phi-3-mini-128k-instruct", | |
"microsoft/Phi-3-mini-4k-instruct", | |
"deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", | |
"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", | |
"HuggingFaceH4/zephyr-7b-beta", | |
"HuggingFaceTB/SmolLM2-360M-Instruct", | |
"tiiuae/falcon-7b-instruct", | |
"01-ai/Yi-1.5-34B-Chat", | |
] + multimodal_models_list # Add multimodal models to the list | |
# Create a custom ChatBot class that will display images | |
def format_history_with_images(history): | |
""" | |
Format history for display in the chatbot, handling multimodal content | |
""" | |
formatted_history = [] | |
for user_msg, assistant_msg in history: | |
# Process user message | |
if isinstance(user_msg, list): | |
# Multimodal message | |
formatted_user_msg = [] | |
for item in user_msg: | |
if item.get("type") == "text": | |
formatted_user_msg.append(item["text"]) | |
elif item.get("type") == "image_url": | |
# Extract the base64 image data | |
img_url = item.get("image_url", {}).get("url", "") | |
if img_url.startswith("data:image/"): | |
formatted_user_msg.append((img_url, "image")) | |
formatted_history.append((formatted_user_msg, assistant_msg)) | |
else: | |
# Regular text message | |
formatted_history.append((user_msg, assistant_msg)) | |
return formatted_history | |
# GRADIO UI | |
# Create a custom chatbot that can display images | |
chatbot = gr.Chatbot( | |
height=600, | |
show_copy_button=True, | |
placeholder="Select a model and begin chatting", | |
layout="panel" | |
) | |
print("Chatbot interface created.") | |
# Create a virtual column layout for the message input area | |
with gr.Blocks() as msg_input: | |
with gr.Row(): | |
with gr.Column(scale=4): | |
msg = gr.Textbox( | |
placeholder="Enter text here or upload an image", | |
show_label=False, | |
container=False, | |
lines=3 | |
) | |
with gr.Column(scale=1, min_width=50): | |
img_upload = gr.Image( | |
type="pil", | |
label="Upload Image", | |
show_label=False, | |
icon="🖼️", | |
container=True, | |
height=50, | |
width=50 | |
) | |
# Basic input components | |
system_message_box = gr.Textbox(value="", placeholder="You are a helpful assistant.", label="System Prompt") | |
with gr.Accordion("Model Settings", open=False): | |
with gr.Row(): | |
with gr.Column(): | |
max_tokens_slider = gr.Slider( | |
minimum=1, | |
maximum=4096, | |
value=512, | |
step=1, | |
label="Max tokens" | |
) | |
temperature_slider = gr.Slider( | |
minimum=0.1, | |
maximum=4.0, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
with gr.Column(): | |
top_p_slider = gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-P" | |
) | |
frequency_penalty_slider = gr.Slider( | |
minimum=-2.0, | |
maximum=2.0, | |
value=0.0, | |
step=0.1, | |
label="Frequency Penalty" | |
) | |
with gr.Row(): | |
seed_slider = gr.Slider( | |
minimum=-1, | |
maximum=65535, | |
value=-1, | |
step=1, | |
label="Seed (-1 for random)" | |
) | |
with gr.Accordion("Model Selection", open=False): | |
with gr.Row(): | |
with gr.Column(): | |
# Provider selection | |
providers_list = [ | |
"hf-inference", # Default Hugging Face Inference | |
"cerebras", # Cerebras provider | |
"together", # Together AI | |
"sambanova", # SambaNova | |
"novita", # Novita AI | |
"cohere", # Cohere | |
"fireworks-ai", # Fireworks AI | |
"hyperbolic", # Hyperbolic | |
"nebius", # Nebius | |
] | |
provider_radio = gr.Radio( | |
choices=providers_list, | |
value="hf-inference", | |
label="Inference Provider", | |
info="[View all models here](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending)" | |
) | |
# New BYOK textbox - Added for the new feature | |
byok_textbox = gr.Textbox( | |
value="", | |
label="BYOK (Bring Your Own Key)", | |
info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.", | |
placeholder="Enter your Hugging Face API token", | |
type="password" # Hide the API key for security | |
) | |
with gr.Column(): | |
# Model selection components | |
model_search_box = gr.Textbox( | |
label="Filter Models", | |
placeholder="Search for a featured model...", | |
lines=1 | |
) | |
featured_model_radio = gr.Radio( | |
label="Select a model below", | |
choices=models_list, | |
value="meta-llama/Llama-3.3-70B-Vision-Instruct", # Default to a multimodal model | |
interactive=True | |
) | |
# Custom model box | |
custom_model_box = gr.Textbox( | |
value="", | |
label="Custom Model", | |
info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.", | |
placeholder="meta-llama/Llama-3.3-70B-Vision-Instruct" | |
) | |
gr.Markdown("[See all multimodal models](https://huggingface.co/models?pipeline_tag=visual-question-answering&sort=trending)") | |
# Main Gradio interface | |
with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo: | |
gr.Markdown("# 🤖 Serverless-MultiModal-Hub") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
# Display the chatbot | |
chatbot_interface = chatbot | |
# Custom submit function to handle multimodal inputs | |
def submit_message(message, images, history): | |
history = history or [] | |
# Format the message content based on whether there are images | |
if images: | |
# Create a multimodal message format for history display | |
user_msg = [] | |
if message: | |
user_msg.append({"type": "text", "text": message}) | |
# Add each image as an image_url item | |
for img in images: | |
if img is not None: | |
img_base64 = encode_image_to_base64(img) | |
img_url = f"data:image/jpeg;base64,{img_base64}" | |
user_msg.append({ | |
"type": "image_url", | |
"image_url": {"url": img_url} | |
}) | |
# Add to history | |
history.append([user_msg, None]) | |
else: | |
# Text-only message | |
if message: | |
history.append([message, None]) | |
else: | |
# No content to submit | |
return history | |
return history | |
# Create message input | |
with gr.Group(): | |
with gr.Row(): | |
with gr.Column(scale=4): | |
text_input = gr.Textbox( | |
placeholder="Enter text here", | |
show_label=False, | |
container=False, | |
lines=3 | |
) | |
with gr.Column(scale=1, min_width=50): | |
image_input = gr.Image( | |
type="pil", | |
label="Upload Image", | |
show_label=False, | |
sources=["upload", "clipboard"], | |
tool="editor", | |
height=100, | |
visible=True | |
) | |
# Submit button | |
submit_btn = gr.Button("Submit", variant="primary") | |
# Clear button | |
clear_btn = gr.Button("Clear") | |
with gr.Column(scale=1): | |
# Put settings here | |
system_message_box = gr.Textbox( | |
value="", | |
placeholder="You are a helpful assistant that can understand images.", | |
label="System Prompt", | |
lines=2 | |
) | |
with gr.Accordion("Model Selection", open=False): | |
# Provider selection | |
provider_radio = gr.Radio( | |
choices=providers_list, | |
value="hf-inference", | |
label="Inference Provider" | |
) | |
# BYOK textbox | |
byok_textbox = gr.Textbox( | |
value="", | |
label="API Key", | |
placeholder="Enter provider API key", | |
type="password" | |
) | |
# Model selection components | |
model_search_box = gr.Textbox( | |
label="Filter Models", | |
placeholder="Search models...", | |
lines=1 | |
) | |
featured_model_radio = gr.Radio( | |
label="Models", | |
choices=models_list, | |
value="meta-llama/Llama-3.3-70B-Vision-Instruct", | |
interactive=True | |
) | |
custom_model_box = gr.Textbox( | |
value="", | |
label="Custom Model", | |
placeholder="Enter model path" | |
) | |
gr.Markdown("[View all multimodal models](https://huggingface.co/models?pipeline_tag=visual-question-answering&sort=trending)") | |
with gr.Accordion("Model Settings", open=False): | |
max_tokens_slider = gr.Slider(minimum=1, maximum=4096, value=512, step=1, label="Max tokens") | |
temperature_slider = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature") | |
top_p_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-P") | |
frequency_penalty_slider = gr.Slider(minimum=-2.0, maximum=2.0, value=0.0, step=0.1, label="Frequency Penalty") | |
seed_slider = gr.Slider(minimum=-1, maximum=65535, value=-1, step=1, label="Seed (-1 for random)") | |
# Connect the submit button | |
submit_btn.click( | |
fn=submit_message, | |
inputs=[text_input, image_input, chatbot_interface], | |
outputs=[chatbot_interface], | |
queue=False | |
).then( | |
fn=respond, | |
inputs=[ | |
text_input, | |
image_input, | |
chatbot_interface, | |
system_message_box, | |
max_tokens_slider, | |
temperature_slider, | |
top_p_slider, | |
frequency_penalty_slider, | |
seed_slider, | |
provider_radio, | |
byok_textbox, | |
custom_model_box, | |
model_search_box, | |
featured_model_radio | |
], | |
outputs=[chatbot_interface], | |
queue=True | |
).then( | |
fn=lambda: (None, None), # Clear inputs after submission | |
inputs=None, | |
outputs=[text_input, image_input] | |
) | |
# Clear button functionality | |
clear_btn.click(lambda: None, None, chatbot_interface, queue=False) | |
# Connect the model filter to update the radio choices | |
model_search_box.change( | |
fn=filter_models, | |
inputs=model_search_box, | |
outputs=featured_model_radio | |
) | |
# Connect the featured model radio to update the custom model box | |
featured_model_radio.change( | |
fn=set_custom_model_from_radio, | |
inputs=featured_model_radio, | |
outputs=custom_model_box | |
) | |
# Connect the BYOK textbox to validate provider selection | |
byok_textbox.change( | |
fn=validate_provider, | |
inputs=[byok_textbox, provider_radio], | |
outputs=provider_radio | |
) | |
# Also validate provider when the radio changes to ensure consistency | |
provider_radio.change( | |
fn=validate_provider, | |
inputs=[byok_textbox, provider_radio], | |
outputs=provider_radio | |
) | |
if __name__ == "__main__": | |
print("Launching Serverless-MultiModal-Hub application.") | |
demo.launch(show_api=True) |