Spaces:
Sleeping
Sleeping
import os | |
import logging | |
import httpx | |
from dotenv import load_dotenv | |
import gradio as gr | |
from typing import AsyncGenerator, List, Dict, Tuple | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
logger.info("Environment variables loaded from .env file") | |
logger.info(f"OPENAI_API_KEY present: {'OPENAI_API_KEY' in os.environ}") | |
logger.info(f"ANTHROPIC_API_KEY present: {'ANTHROPIC_API_KEY' in os.environ}") | |
logger.info(f"GEMINI_API_KEY present: {'GEMINI_API_KEY' in os.environ}") | |
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
logger.error("OpenAI API key not provided") | |
yield "Error: OpenAI API key not provided." | |
return | |
# Build message history | |
messages = [] | |
for msg in history: | |
messages.append({"role": "user", "content": msg["user"]}) | |
if msg["bot"]: | |
messages.append({"role": "assistant", "content": msg["bot"]}) | |
messages.append({"role": "user", "content": query}) | |
headers = { | |
"Authorization": f"Bearer {openai_api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": "gpt-3.5-turbo", | |
"messages": messages, | |
"stream": True | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: | |
response.raise_for_status() | |
async for chunk in response.intro_text(): | |
if chunk: | |
# Parse the streaming chunk (JSON lines) | |
lines = chunk.splitlines() | |
for line in lines: | |
if line.startswith("data: "): | |
data = line[6:] # Remove "data: " prefix | |
if data == "[DONE]": | |
break | |
try: | |
json_data = eval(data) # Safely parse JSON | |
if "choices" in json_data and json_data["choices"]: | |
delta = json_data["choices"][0].get("delta", {}) | |
if "content" in delta: | |
yield delta["content"] | |
except Exception as e: | |
logger.error(f"Error parsing OpenAI stream chunk: {str(e)}") | |
yield f"Error parsing stream: {str(e)}" | |
except httpx.HTTPStatusError as e: | |
logger.error(f"OpenAI HTTP Status Error: {e.response.status_code}, {e.response.text}") | |
yield f"Error: OpenAI HTTP Status Error: {e.response.status_code}, {e.response.text}" | |
except Exception as e: | |
logger.error(f"OpenAI Error: {str(e)}") | |
yield f"Error: OpenAI Error: {str(e)}" | |
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str: | |
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") | |
if not anthropic_api_key: | |
logger.error("Anthropic API key not provided") | |
return "Error: Anthropic API key not provided." | |
# Build message history | |
messages = [] | |
for msg in history: | |
messages.append({"role": "user", "content": msg["user"]}) | |
if msg["bot"]: | |
messages.append({"role": "assistant", "content": msg["bot"]}) | |
messages.append({"role": "user", "content": query}) | |
headers = { | |
"x-api-key": anthropic_api_key, | |
"anthropic-version": "2023-06-01", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": "claude-3-5-sonnet-20241022", | |
"max_tokens": 1024, | |
"messages": messages | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
logger.info(f"Sending Anthropic request: {payload}") | |
response = await client.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload) | |
response.raise_for_status() | |
logger.info(f"Anthropic response: {response.json()}") | |
return response.json()['content'][0]['text'] | |
except httpx.HTTPStatusError as e: | |
logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}") | |
return f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}" | |
except Exception as e: | |
logger.error(f"Anthropic Error: {str(e)}") | |
return f"Error: Anthropic Error: {str(e)}" | |
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str: | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("Gemini API key not provided") | |
return "Error: Gemini API key not provided." | |
# Gemini doesn't natively support chat history in the same way, so we concatenate history as text | |
history_text = "" | |
for msg in history: | |
history_text += f"User: {msg['user']}\nAssistant: {msg['bot']}\n" if msg["bot"] else f"User: {msg['user']}\n" | |
full_query = history_text + f"User: {query}\n" | |
headers = { | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"contents": [{"parts": [{"text": full_query}]}] | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_api_key}", | |
headers=headers, | |
json=payload | |
) | |
response.raise_for_status() | |
return response.json()['candidates'][0]['content']['parts'][0]['text'] | |
except httpx.HTTPStatusError as e: | |
logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}") | |
return f"Error: Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}" | |
except Exception as e: | |
logger.error(f"Gemini Error: {str(e)}") | |
return f"Error: Gemini Error: {str(e)}" | |
async def query_model(query: str, provider: str, history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]]], None]: | |
provider = provider.lower() | |
response = "" | |
if provider == "openai": | |
async for chunk in ask_openai(query, history): | |
response += chunk | |
yield chunk, history # Yield partial response for streaming | |
elif provider == "anthropic": | |
response = await ask_anthropic(query, history) | |
yield response, history | |
elif provider == "gemini": | |
response = await ask_gemini(query, history) | |
yield response, history | |
else: | |
response = f"Error: Unknown provider: {provider}" | |
yield response, history | |
# Update history with the new query and response | |
updated_history = history + [{"user": query, "bot": response}] | |
logger.info(f"Updated history: {updated_history}") | |
yield response, updated_history # Final yield with updated history | |
async def submit_query(query: str, provider: str, history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]]], None]: | |
if not query.strip(): | |
yield "", history, history | |
return | |
response = "" | |
async for response_chunk, updated_history in query_model(query, provider, history): | |
response += response_chunk | |
yield "", updated_history, updated_history # Yield intermediate updates for streaming | |
yield "", updated_history, updated_history # Final yield with cleared query | |
# Gradio interface | |
def clear_history(): | |
return [], [] | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Multi-Model Chat") | |
gr.Markdown("Chat with OpenAI, Anthropic, or Gemini. Select a provider and start typing!") | |
provider = gr.Dropdown(choices=["OpenAI", "Anthropic", "Gemini"], label="Select Provider", value="OpenAI") | |
history_state = gr.State(value=[]) | |
chatbot = gr.Chatbot(label="Conversation", type="messages") | |
query = gr.Textbox(label="Enter your query", placeholder="e.g., What is the capital of the United States?") | |
submit_button = gr.Button("Submit") | |
clear_button = gr.Button("Clear History") | |
submit_button.click( | |
fn=submit_query, | |
inputs=[query, provider, history_state], | |
outputs=[query, chatbot, history_state] | |
) | |
clear_button.click( | |
fn=clear_history, | |
inputs=[], | |
outputs=[chatbot, history_state] | |
) | |
# Launch the Gradio app | |
demo.launch() |