Ganesh Chintalapati
OpenAI and Gemini works
7a83934
raw
history blame
10.7 kB
import os
import logging
import httpx
import json
from dotenv import load_dotenv
import gradio as gr
from typing import AsyncGenerator, List, Dict, Tuple
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
logger.info("Environment variables loaded from .env file")
logger.info(f"OPENAI_API_KEY present: {'OPENAI_API_KEY' in os.environ}")
logger.info(f"ANTHROPIC_API_KEY present: {'ANTHROPIC_API_KEY' in os.environ}")
logger.info(f"GEMINI_API_KEY present: {'GEMINI_API_KEY' in os.environ}")
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
logger.error("OpenAI API key not provided")
yield "Error: OpenAI API key not provided."
return
# Build message history with user and assistant roles
messages = []
for msg in history:
messages.append({"role": "user", "content": msg["user"]})
if msg["bot"]:
messages.append({"role": "assistant", "content": msg["bot"]})
messages.append({"role": "user", "content": query})
headers = {
"Authorization": f"Bearer {openai_api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-3.5-turbo",
"messages": messages,
"stream": True
}
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
# Process complete JSON lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data == "[DONE]":
break
if not data.strip():
continue
try:
json_data = json.loads(data)
if "choices" in json_data and json_data["choices"]:
delta = json_data["choices"][0].get("delta", {})
if "content" in delta and delta["content"] is not None:
yield delta["content"]
except json.JSONDecodeError as e:
logger.error(f"Error parsing OpenAI stream chunk: {str(e)} - Data: {data}")
yield f"Error parsing stream: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error in OpenAI stream: {str(e)} - Data: {data}")
yield f"Error in stream: {str(e)}"
except httpx.HTTPStatusError as e:
response_text = await e.response.aread()
logger.error(f"OpenAI HTTP Status Error: {e.response.status_code}, {response_text}")
yield f"Error: OpenAI HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}"
except Exception as e:
logger.error(f"OpenAI Error: {str(e)}")
yield f"Error: OpenAI Error: {str(e)}"
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str:
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
if not anthropic_api_key:
logger.error("Anthropic API key not provided")
return "Error: Anthropic API key not provided."
# Build message history with user and assistant roles
messages = []
for msg in history:
messages.append({"role": "user", "content": msg["user"]})
if msg["bot"]:
messages.append({"role": "assistant", "content": msg["bot"]})
messages.append({"role": "user", "content": query})
headers = {
"x-api-key": anthropic_api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
payload = {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": messages
}
try:
async with httpx.AsyncClient() as client:
logger.info(f"Sending Anthropic request: {payload}")
response = await client.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload)
response.raise_for_status()
logger.info(f"Anthropic response: {response.json()}")
return response.json()['content'][0]['text']
except httpx.HTTPStatusError as e:
logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}")
return f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}"
except Exception as e:
logger.error(f"Anthropic Error: {str(e)}")
return f"Error: Anthropic Error: {str(e)}"
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str:
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("Gemini API key not provided")
return "Error: Gemini API key not provided."
# Concatenate history as text for Gemini
history_text = ""
for msg in history:
history_text += f"User: {msg['user']}\nAssistant: {msg['bot']}\n" if msg["bot"] else f"User: {msg['user']}\n"
full_query = history_text + f"User: {query}\n"
headers = {
"Content-Type": "application/json"
}
payload = {
"contents": [{"parts": [{"text": full_query}]}]
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_api_key}",
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()['candidates'][0]['content']['parts'][0]['text']
except httpx.HTTPStatusError as e:
logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}")
return f"Error: Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}"
except Exception as e:
logger.error(f"Gemini Error: {str(e)}")
return f"Error: Gemini Error: {str(e)}"
async def query_model(query: str, providers: List[str], history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]]], None]:
logger.info(f"Processing query with providers: {providers}")
responses = [] # To collect responses from each provider
streaming_response = ""
# Handle OpenAI (streaming)
if "OpenAI" in providers:
async for chunk in ask_openai(query, history):
streaming_response += chunk
# Yield streaming updates for OpenAI
chatbot_messages = []
for msg in history:
chatbot_messages.append({"role": "user", "content": msg["user"]})
if msg["bot"]:
chatbot_messages.append({"role": "assistant", "content": msg["bot"]})
chatbot_messages.append({"role": "user", "content": query})
chatbot_messages.append({"role": "assistant", "content": streaming_response})
yield "", chatbot_messages, history # Yield partial updates
if streaming_response.strip():
responses.append(f"[OpenAI]: {streaming_response}")
# Handle Anthropic (non-streaming)
if "Anthropic" in providers:
response = await ask_anthropic(query, history)
if response.strip():
responses.append(f"[Anthropic]: {response}")
# Handle Gemini (non-streaming)
if "Gemini" in providers:
response = await ask_gemini(query, history)
if response.strip():
responses.append(f"[Gemini]: {response}")
# Combine responses
combined_response = "\n\n".join(responses) if responses else "No valid responses received."
# Update history with the combined response
updated_history = history + [{"user": query, "bot": combined_response}]
logger.info(f"Updated history: {updated_history}")
# Yield final response
chatbot_messages = []
for msg in updated_history:
chatbot_messages.append({"role": "user", "content": msg["user"]})
if msg["bot"]:
chatbot_messages.append({"role": "assistant", "content": msg["bot"]})
yield "", chatbot_messages, updated_history
async def submit_query(query: str, providers: List[str], history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]]], None]:
if not query.strip():
chatbot_messages = [{"role": "assistant", "content": "Please enter a query."}]
yield "", chatbot_messages, history
return
if not providers:
chatbot_messages = [{"role": "assistant", "content": "Please select at least one provider."}]
yield "", chatbot_messages, history
return
async for response_chunk, chatbot_messages, updated_history in query_model(query, providers, history):
yield "", chatbot_messages, updated_history # Keep query textbox unchanged during streaming
# Final yield to clear the query textbox
yield "", chatbot_messages, updated_history
# Gradio interface
def clear_history():
return [], []
# Define Gradio interface
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Multi-Model Chat")
gr.Markdown("Chat with OpenAI, Anthropic, or Gemini. Select providers and start typing!")
providers = gr.CheckboxGroup(choices=["OpenAI", "Anthropic", "Gemini"], label="Select Providers", value=["OpenAI"])
history_state = gr.State(value=[])
chatbot = gr.Chatbot(label="Conversation", type="messages")
query = gr.Textbox(label="Enter your query", placeholder="e.g., What is the capital of the United States?")
submit_button = gr.Button("Submit")
clear_button = gr.Button("Clear History")
submit_button.click(
fn=submit_query,
inputs=[query, providers, history_state],
outputs=[query, chatbot, history_state]
)
clear_button.click(
fn=clear_history,
inputs=[],
outputs=[chatbot, history_state]
)
# Launch the Gradio app
demo.launch()