Spaces:
Sleeping
Sleeping
import os | |
import logging | |
import httpx | |
import json | |
from dotenv import load_dotenv | |
import gradio as gr | |
from typing import AsyncGenerator, List, Dict, Tuple | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
logger.info("Environment variables loaded from .env file") | |
logger.info(f"OPENAI_API_KEY present: {'OPENAI_API_KEY' in os.environ}") | |
logger.info(f"ANTHROPIC_API_KEY present: {'ANTHROPIC_API_KEY' in os.environ}") | |
logger.info(f"GEMINI_API_KEY present: {'GEMINI_API_KEY' in os.environ}") | |
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
logger.error("OpenAI API key not provided") | |
yield "Error: OpenAI API key not provided." | |
return | |
# Build message history with user and assistant roles | |
messages = [] | |
for msg in history: | |
messages.append({"role": "user", "content": msg["user"]}) | |
if msg["bot"]: | |
messages.append({"role": "assistant", "content": msg["bot"]}) | |
messages.append({"role": "user", "content": query}) | |
headers = { | |
"Authorization": f"Bearer {openai_api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": "gpt-3.5-turbo", | |
"messages": messages, | |
"stream": True | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: | |
response.raise_for_status() | |
buffer = "" | |
async for chunk in response.aiter_text(): | |
if chunk: | |
buffer += chunk | |
# Process complete JSON lines | |
while "\n" in buffer: | |
line, buffer = buffer.split("\n", 1) | |
if line.startswith("data: "): | |
data = line[6:] # Remove "data: " prefix | |
if data == "[DONE]": | |
break | |
if not data.strip(): | |
continue | |
try: | |
json_data = json.loads(data) | |
if "choices" in json_data and json_data["choices"]: | |
delta = json_data["choices"][0].get("delta", {}) | |
if "content" in delta and delta["content"] is not None: | |
yield delta["content"] | |
except json.JSONDecodeError as e: | |
logger.error(f"Error parsing OpenAI stream chunk: {str(e)} - Data: {data}") | |
yield f"Error parsing stream: {str(e)}" | |
except Exception as e: | |
logger.error(f"Unexpected error in OpenAI stream: {str(e)} - Data: {data}") | |
yield f"Error in stream: {str(e)}" | |
except httpx.HTTPStatusError as e: | |
response_text = await e.response.aread() | |
logger.error(f"OpenAI HTTP Status Error: {e.response.status_code}, {response_text}") | |
yield f"Error: OpenAI HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}" | |
except Exception as e: | |
logger.error(f"OpenAI Error: {str(e)}") | |
yield f"Error: OpenAI Error: {str(e)}" | |
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str: | |
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") | |
if not anthropic_api_key: | |
logger.error("Anthropic API key not provided") | |
return "Error: Anthropic API key not provided." | |
# Build message history with user and assistant roles | |
messages = [] | |
for msg in history: | |
messages.append({"role": "user", "content": msg["user"]}) | |
if msg["bot"]: | |
messages.append({"role": "assistant", "content": msg["bot"]}) | |
messages.append({"role": "user", "content": query}) | |
headers = { | |
"x-api-key": anthropic_api_key, | |
"anthropic-version": "2023-06-01", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": "claude-3-5-sonnet-20241022", | |
"max_tokens": 1024, | |
"messages": messages | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
logger.info(f"Sending Anthropic request: {payload}") | |
response = await client.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload) | |
response.raise_for_status() | |
logger.info(f"Anthropic response: {response.json()}") | |
return response.json()['content'][0]['text'] | |
except httpx.HTTPStatusError as e: | |
logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}") | |
return f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}" | |
except Exception as e: | |
logger.error(f"Anthropic Error: {str(e)}") | |
return f"Error: Anthropic Error: {str(e)}" | |
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str: | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("Gemini API key not provided") | |
return "Error: Gemini API key not provided." | |
# Concatenate history as text for Gemini | |
history_text = "" | |
for msg in history: | |
history_text += f"User: {msg['user']}\nAssistant: {msg['bot']}\n" if msg["bot"] else f"User: {msg['user']}\n" | |
full_query = history_text + f"User: {query}\n" | |
headers = { | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"contents": [{"parts": [{"text": full_query}]}] | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_api_key}", | |
headers=headers, | |
json=payload | |
) | |
response.raise_for_status() | |
return response.json()['candidates'][0]['content']['parts'][0]['text'] | |
except httpx.HTTPStatusError as e: | |
logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}") | |
return f"Error: Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}" | |
except Exception as e: | |
logger.error(f"Gemini Error: {str(e)}") | |
return f"Error: Gemini Error: {str(e)}" | |
async def query_model(query: str, providers: List[str], history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]]], None]: | |
logger.info(f"Processing query with providers: {providers}") | |
responses = [] # To collect responses from each provider | |
streaming_response = "" | |
# Handle OpenAI (streaming) | |
if "OpenAI" in providers: | |
async for chunk in ask_openai(query, history): | |
streaming_response += chunk | |
# Yield streaming updates for OpenAI | |
chatbot_messages = [] | |
for msg in history: | |
chatbot_messages.append({"role": "user", "content": msg["user"]}) | |
if msg["bot"]: | |
chatbot_messages.append({"role": "assistant", "content": msg["bot"]}) | |
chatbot_messages.append({"role": "user", "content": query}) | |
chatbot_messages.append({"role": "assistant", "content": streaming_response}) | |
yield "", chatbot_messages, history # Yield partial updates | |
if streaming_response.strip(): | |
responses.append(f"[OpenAI]: {streaming_response}") | |
# Handle Anthropic (non-streaming) | |
if "Anthropic" in providers: | |
response = await ask_anthropic(query, history) | |
if response.strip(): | |
responses.append(f"[Anthropic]: {response}") | |
# Handle Gemini (non-streaming) | |
if "Gemini" in providers: | |
response = await ask_gemini(query, history) | |
if response.strip(): | |
responses.append(f"[Gemini]: {response}") | |
# Combine responses | |
combined_response = "\n\n".join(responses) if responses else "No valid responses received." | |
# Update history with the combined response | |
updated_history = history + [{"user": query, "bot": combined_response}] | |
logger.info(f"Updated history: {updated_history}") | |
# Yield final response | |
chatbot_messages = [] | |
for msg in updated_history: | |
chatbot_messages.append({"role": "user", "content": msg["user"]}) | |
if msg["bot"]: | |
chatbot_messages.append({"role": "assistant", "content": msg["bot"]}) | |
yield "", chatbot_messages, updated_history | |
async def submit_query(query: str, providers: List[str], history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]]], None]: | |
if not query.strip(): | |
chatbot_messages = [{"role": "assistant", "content": "Please enter a query."}] | |
yield "", chatbot_messages, history | |
return | |
if not providers: | |
chatbot_messages = [{"role": "assistant", "content": "Please select at least one provider."}] | |
yield "", chatbot_messages, history | |
return | |
async for response_chunk, chatbot_messages, updated_history in query_model(query, providers, history): | |
yield "", chatbot_messages, updated_history # Keep query textbox unchanged during streaming | |
# Final yield to clear the query textbox | |
yield "", chatbot_messages, updated_history | |
# Gradio interface | |
def clear_history(): | |
return [], [] | |
# Define Gradio interface | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Multi-Model Chat") | |
gr.Markdown("Chat with OpenAI, Anthropic, or Gemini. Select providers and start typing!") | |
providers = gr.CheckboxGroup(choices=["OpenAI", "Anthropic", "Gemini"], label="Select Providers", value=["OpenAI"]) | |
history_state = gr.State(value=[]) | |
chatbot = gr.Chatbot(label="Conversation", type="messages") | |
query = gr.Textbox(label="Enter your query", placeholder="e.g., What is the capital of the United States?") | |
submit_button = gr.Button("Submit") | |
clear_button = gr.Button("Clear History") | |
submit_button.click( | |
fn=submit_query, | |
inputs=[query, providers, history_state], | |
outputs=[query, chatbot, history_state] | |
) | |
clear_button.click( | |
fn=clear_history, | |
inputs=[], | |
outputs=[chatbot, history_state] | |
) | |
# Launch the Gradio app | |
demo.launch() |