|
import os |
|
import base64 |
|
import gradio as gr |
|
import requests |
|
import json |
|
from io import BytesIO |
|
from PIL import Image |
|
import time |
|
|
|
|
|
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY", "") |
|
|
|
|
|
free_models = [ |
|
("Google: Gemini Pro 2.0 Experimental", "google/gemini-2.0-pro-exp-02-05:free"), |
|
("Google: Gemini 2.0 Flash", "google/gemini-2.0-flash-exp:free"), |
|
("Google: Gemini Pro 2.5 Experimental", "google/gemini-2.5-pro-exp-03-25:free"), |
|
("Meta: Llama 3.2 11B Vision", "meta-llama/llama-3.2-11b-vision-instruct:free"), |
|
("Qwen: Qwen2.5 VL 72B", "qwen/qwen2.5-vl-72b-instruct:free"), |
|
("DeepSeek: DeepSeek R1", "deepseek/deepseek-r1:free"), |
|
("Meta: Llama 3.1 8B", "meta-llama/llama-3.1-8b-instruct:free"), |
|
("Mistral: Mistral Small 3.1 24B", "mistralai/mistral-small-3.1-24b-instruct:free") |
|
] |
|
|
|
|
|
def encode_image(image): |
|
"""Convert PIL Image to base64 string""" |
|
buffered = BytesIO() |
|
image.save(buffered, format="JPEG") |
|
return base64.b64encode(buffered.getvalue()).decode("utf-8") |
|
|
|
def encode_file(file_path): |
|
"""Convert text file to string""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
return file.read() |
|
except Exception as e: |
|
return f"Error reading file: {str(e)}" |
|
|
|
def generate_response(message, chat_history, model_name, uploaded_image=None, uploaded_file=None, |
|
temp=0.7, max_tok=1000, use_stream=True): |
|
"""Process message and get response from API""" |
|
|
|
model_id = next((model_id for name, model_id in free_models if name == model_name), free_models[0][1]) |
|
|
|
|
|
messages = [] |
|
for turn in chat_history: |
|
if isinstance(turn, tuple): |
|
user_msg, ai_msg = turn |
|
messages.append({"role": "user", "content": user_msg}) |
|
messages.append({"role": "assistant", "content": ai_msg}) |
|
|
|
|
|
if uploaded_file: |
|
file_content = encode_file(uploaded_file) |
|
message = f"{message}\n\nFile content:\n```\n{file_content}\n```" |
|
|
|
|
|
if uploaded_image: |
|
|
|
base64_image = encode_image(uploaded_image) |
|
content = [ |
|
{"type": "text", "text": message}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:image/jpeg;base64,{base64_image}" |
|
} |
|
} |
|
] |
|
messages.append({"role": "user", "content": content}) |
|
else: |
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}", |
|
"HTTP-Referer": "https://huggingface.co/spaces", |
|
} |
|
|
|
url = "https://openrouter.ai/api/v1/chat/completions" |
|
|
|
|
|
data = { |
|
"model": model_id, |
|
"messages": messages, |
|
"stream": use_stream, |
|
"temperature": temp, |
|
"max_tokens": max_tok |
|
} |
|
|
|
|
|
chat_history.append((message, "")) |
|
|
|
try: |
|
if use_stream: |
|
|
|
with requests.post(url, headers=headers, json=data, stream=True) as response: |
|
response.raise_for_status() |
|
|
|
full_response = "" |
|
buffer = "" |
|
|
|
for chunk in response.iter_content(chunk_size=1024, decode_unicode=False): |
|
if chunk: |
|
buffer += chunk.decode('utf-8') |
|
|
|
|
|
while '\n' in buffer: |
|
line, buffer = buffer.split('\n', 1) |
|
line = line.strip() |
|
|
|
if line.startswith('data: '): |
|
data = line[6:] |
|
if data == '[DONE]': |
|
break |
|
|
|
try: |
|
data_obj = json.loads(data) |
|
delta_content = data_obj["choices"][0]["delta"].get("content", "") |
|
if delta_content: |
|
full_response += delta_content |
|
chat_history[-1] = (message, full_response) |
|
yield chat_history |
|
except Exception: |
|
pass |
|
|
|
|
|
if full_response: |
|
chat_history[-1] = (message, full_response) |
|
yield chat_history |
|
|
|
else: |
|
|
|
response = requests.post(url, headers=headers, json=data) |
|
response.raise_for_status() |
|
result = response.json() |
|
|
|
reply = result.get("choices", [{}])[0].get("message", {}).get("content", "No response") |
|
chat_history[-1] = (message, reply) |
|
yield chat_history |
|
|
|
except Exception as e: |
|
error_msg = f"Error: {str(e)}" |
|
chat_history[-1] = (message, error_msg) |
|
yield chat_history |
|
|
|
def clear_chat(): |
|
"""Clear the chat history""" |
|
return [] |
|
|
|
def clear_input(): |
|
"""Clear the input field""" |
|
return "", None, None |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Default()) as demo: |
|
gr.Markdown("# 🔆 CrispChat") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
chatbot = gr.Chatbot( |
|
height=500, |
|
layout="bubble", |
|
show_copy_button=True, |
|
show_share_button=False, |
|
avatar_images=("👤", "🤖") |
|
) |
|
|
|
with gr.Group(): |
|
user_message = gr.Textbox( |
|
placeholder="Type your message here...", |
|
lines=3, |
|
show_label=False |
|
) |
|
|
|
with gr.Row(): |
|
image_upload = gr.Image( |
|
type="pil", |
|
label="Image (optional)", |
|
show_label=True |
|
) |
|
|
|
file_upload = gr.File( |
|
label="Text File (optional)", |
|
file_types=[".txt", ".md", ".py", ".js", ".html", ".css", ".json"] |
|
) |
|
|
|
with gr.Row(): |
|
submit_btn = gr.Button("Send", variant="primary") |
|
clear_chat_btn = gr.Button("Clear Chat") |
|
|
|
with gr.Column(scale=1): |
|
model_selector = gr.Dropdown( |
|
choices=[name for name, _ in free_models], |
|
value=free_models[0][0], |
|
label="Select Model" |
|
) |
|
|
|
temperature = gr.Slider( |
|
minimum=0.1, |
|
maximum=2.0, |
|
value=0.7, |
|
step=0.1, |
|
label="Temperature" |
|
) |
|
|
|
max_tokens = gr.Slider( |
|
minimum=100, |
|
maximum=4000, |
|
value=1000, |
|
step=100, |
|
label="Max Tokens" |
|
) |
|
|
|
streaming = gr.Checkbox( |
|
label="Streaming", |
|
value=True |
|
) |
|
|
|
|
|
submit_btn.click( |
|
fn=generate_response, |
|
inputs=[ |
|
user_message, |
|
chatbot, |
|
model_selector, |
|
image_upload, |
|
file_upload, |
|
temperature, |
|
max_tokens, |
|
streaming |
|
], |
|
outputs=chatbot |
|
).then( |
|
fn=clear_input, |
|
outputs=[user_message, image_upload, file_upload] |
|
) |
|
|
|
user_message.submit( |
|
fn=generate_response, |
|
inputs=[ |
|
user_message, |
|
chatbot, |
|
model_selector, |
|
image_upload, |
|
file_upload, |
|
temperature, |
|
max_tokens, |
|
streaming |
|
], |
|
outputs=chatbot |
|
).then( |
|
fn=clear_input, |
|
outputs=[user_message, image_upload, file_upload] |
|
) |
|
|
|
|
|
clear_chat_btn.click( |
|
fn=clear_chat, |
|
outputs=chatbot |
|
) |
|
|
|
|
|
from fastapi import FastAPI |
|
from pydantic import BaseModel |
|
|
|
app = FastAPI() |
|
|
|
class GenerateRequest(BaseModel): |
|
message: str |
|
model: str = None |
|
image_data: str = None |
|
|
|
@app.post("/api/generate") |
|
async def api_generate(request: GenerateRequest): |
|
"""API endpoint for generating responses""" |
|
try: |
|
|
|
model_id = request.model |
|
if not model_id: |
|
model_id = free_models[0][1] |
|
|
|
|
|
messages = [] |
|
if request.image_data: |
|
try: |
|
image_bytes = base64.b64decode(request.image_data) |
|
image = Image.open(BytesIO(image_bytes)) |
|
base64_image = encode_image(image) |
|
content = [ |
|
{"type": "text", "text": request.message}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:image/jpeg;base64,{base64_image}" |
|
} |
|
} |
|
] |
|
messages.append({"role": "user", "content": content}) |
|
except Exception as e: |
|
return {"error": f"Image processing error: {str(e)}"} |
|
else: |
|
messages.append({"role": "user", "content": request.message}) |
|
|
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}", |
|
"HTTP-Referer": "https://huggingface.co/spaces", |
|
} |
|
|
|
url = "https://openrouter.ai/api/v1/chat/completions" |
|
|
|
data = { |
|
"model": model_id, |
|
"messages": messages, |
|
"temperature": 0.7 |
|
} |
|
|
|
|
|
response = requests.post(url, headers=headers, json=data) |
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
reply = result.get("choices", [{}])[0].get("message", {}).get("content", "No response") |
|
|
|
return {"response": reply} |
|
|
|
except Exception as e: |
|
return {"error": f"Error: {str(e)}"} |
|
|
|
|
|
app = gr.mount_gradio_app(app, demo, path="/") |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |