Ganesh Chintalapati
Fix syntaxerror
fac7191
raw
history blame
8.69 kB
import os
import logging
import httpx
from dotenv import load_dotenv
import gradio as gr
from typing import AsyncGenerator, List, Dict, Tuple
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
logger.info("Environment variables loaded from .env file")
logger.info(f"OPENAI_API_KEY present: {'OPENAI_API_KEY' in os.environ}")
logger.info(f"ANTHROPIC_API_KEY present: {'ANTHROPIC_API_KEY' in os.environ}")
logger.info(f"GEMINI_API_KEY present: {'GEMINI_API_KEY' in os.environ}")
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
logger.error("OpenAI API key not provided")
yield "Error: OpenAI API key not provided."
return
# Build message history
messages = []
for msg in history:
messages.append({"role": "user", "content": msg["user"]})
if msg["bot"]:
messages.append({"role": "assistant", "content": msg["bot"]})
messages.append({"role": "user", "content": query})
headers = {
"Authorization": f"Bearer {openai_api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-3.5-turbo",
"messages": messages,
"stream": True
}
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response:
response.raise_for_status()
async for chunk in response.intro_text():
if chunk:
# Parse the streaming chunk (JSON lines)
lines = chunk.splitlines()
for line in lines:
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data == "[DONE]":
break
try:
json_data = eval(data) # Safely parse JSON
if "choices" in json_data and json_data["choices"]:
delta = json_data["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]
except Exception as e:
logger.error(f"Error parsing OpenAI stream chunk: {str(e)}")
yield f"Error parsing stream: {str(e)}"
except httpx.HTTPStatusError as e:
logger.error(f"OpenAI HTTP Status Error: {e.response.status_code}, {e.response.text}")
yield f"Error: OpenAI HTTP Status Error: {e.response.status_code}, {e.response.text}"
except Exception as e:
logger.error(f"OpenAI Error: {str(e)}")
yield f"Error: OpenAI Error: {str(e)}"
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str:
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
if not anthropic_api_key:
logger.error("Anthropic API key not provided")
return "Error: Anthropic API key not provided."
# Build message history
messages = []
for msg in history:
messages.append({"role": "user", "content": msg["user"]})
if msg["bot"]:
messages.append({"role": "assistant", "content": msg["bot"]})
messages.append({"role": "user", "content": query})
headers = {
"x-api-key": anthropic_api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
payload = {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": messages
}
try:
async with httpx.AsyncClient() as client:
logger.info(f"Sending Anthropic request: {payload}")
response = await client.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload)
response.raise_for_status()
logger.info(f"Anthropic response: {response.json()}")
return response.json()['content'][0]['text']
except httpx.HTTPStatusError as e:
logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}")
return f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}"
except Exception as e:
logger.error(f"Anthropic Error: {str(e)}")
return f"Error: Anthropic Error: {str(e)}"
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str:
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("Gemini API key not provided")
return "Error: Gemini API key not provided."
# Gemini doesn't natively support chat history in the same way, so we concatenate history as text
history_text = ""
for msg in history:
history_text += f"User: {msg['user']}\nAssistant: {msg['bot']}\n" if msg["bot"] else f"User: {msg['user']}\n"
full_query = history_text + f"User: {query}\n"
headers = {
"Content-Type": "application/json"
}
payload = {
"contents": [{"parts": [{"text": full_query}]}]
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_api_key}",
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()['candidates'][0]['content']['parts'][0]['text']
except httpx.HTTPStatusError as e:
logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}")
return f"Error: Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}"
except Exception as e:
logger.error(f"Gemini Error: {str(e)}")
return f"Error: Gemini Error: {str(e)}"
async def query_model(query: str, provider: str, history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]]], None]:
provider = provider.lower()
response = ""
if provider == "openai":
async for chunk in ask_openai(query, history):
response += chunk
yield chunk, history # Yield partial response for streaming
elif provider == "anthropic":
response = await ask_anthropic(query, history)
yield response, history
elif provider == "gemini":
response = await ask_gemini(query, history)
yield response, history
else:
response = f"Error: Unknown provider: {provider}"
yield response, history
# Update history with the new query and response
updated_history = history + [{"user": query, "bot": response}]
logger.info(f"Updated history: {updated_history}")
yield response, updated_history # Final yield with updated history
async def submit_query(query: str, provider: str, history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]]], None]:
if not query.strip():
yield "", history, history
return
response = ""
async for response_chunk, updated_history in query_model(query, provider, history):
response += response_chunk
yield "", updated_history, updated_history # Yield intermediate updates for streaming
yield "", updated_history, updated_history # Final yield with cleared query
# Gradio interface
def clear_history():
return [], []
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Multi-Model Chat")
gr.Markdown("Chat with OpenAI, Anthropic, or Gemini. Select a provider and start typing!")
provider = gr.Dropdown(choices=["OpenAI", "Anthropic", "Gemini"], label="Select Provider", value="OpenAI")
history_state = gr.State(value=[])
chatbot = gr.Chatbot(label="Conversation", type="messages")
query = gr.Textbox(label="Enter your query", placeholder="e.g., What is the capital of the United States?")
submit_button = gr.Button("Submit")
clear_button = gr.Button("Clear History")
submit_button.click(
fn=submit_query,
inputs=[query, provider, history_state],
outputs=[query, chatbot, history_state]
)
clear_button.click(
fn=clear_history,
inputs=[],
outputs=[chatbot, history_state]
)
# Launch the Gradio app
demo.launch()