import os import logging import httpx from dotenv import load_dotenv import gradio as gr from typing import AsyncGenerator, List, Dict, Tuple # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() logger.info("Environment variables loaded from .env file") logger.info(f"OPENAI_API_KEY present: {'OPENAI_API_KEY' in os.environ}") logger.info(f"ANTHROPIC_API_KEY present: {'ANTHROPIC_API_KEY' in os.environ}") logger.info(f"GEMINI_API_KEY present: {'GEMINI_API_KEY' in os.environ}") async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: openai_api_key = os.getenv("OPENAI_API_KEY") if not openai_api_key: logger.error("OpenAI API key not provided") yield "Error: OpenAI API key not provided." return # Build message history messages = [] for msg in history: messages.append({"role": "user", "content": msg["user"]}) if msg["bot"]: messages.append({"role": "assistant", "content": msg["bot"]}) messages.append({"role": "user", "content": query}) headers = { "Authorization": f"Bearer {openai_api_key}", "Content-Type": "application/json" } payload = { "model": "gpt-3.5-turbo", "messages": messages, "stream": True } try: async with httpx.AsyncClient() as client: async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: response.raise_for_status() async for chunk in response.intro_text(): if chunk: # Parse the streaming chunk (JSON lines) lines = chunk.splitlines() for line in lines: if line.startswith("data: "): data = line[6:] # Remove "data: " prefix if data == "[DONE]": break try: json_data = eval(data) # Safely parse JSON if "choices" in json_data and json_data["choices"]: delta = json_data["choices"][0].get("delta", {}) if "content" in delta: yield delta["content"] except Exception as e: logger.error(f"Error parsing OpenAI stream chunk: {str(e)}") yield f"Error parsing stream: {str(e)}" except httpx.HTTPStatusError as e: logger.error(f"OpenAI HTTP Status Error: {e.response.status_code}, {e.response.text}") yield f"Error: OpenAI HTTP Status Error: {e.response.status_code}, {e.response.text}" except Exception as e: logger.error(f"OpenAI Error: {str(e)}") yield f"Error: OpenAI Error: {str(e)}" async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str: anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") if not anthropic_api_key: logger.error("Anthropic API key not provided") return "Error: Anthropic API key not provided." # Build message history messages = [] for msg in history: messages.append({"role": "user", "content": msg["user"]}) if msg["bot"]: messages.append({"role": "assistant", "content": msg["bot"]}) messages.append({"role": "user", "content": query}) headers = { "x-api-key": anthropic_api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } payload = { "model": "claude-3-5-sonnet-20241022", "max_tokens": 1024, "messages": messages } try: async with httpx.AsyncClient() as client: logger.info(f"Sending Anthropic request: {payload}") response = await client.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload) response.raise_for_status() logger.info(f"Anthropic response: {response.json()}") return response.json()['content'][0]['text'] except httpx.HTTPStatusError as e: logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}") return f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {e.response.text}" except Exception as e: logger.error(f"Anthropic Error: {str(e)}") return f"Error: Anthropic Error: {str(e)}" async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str: gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("Gemini API key not provided") return "Error: Gemini API key not provided." # Gemini doesn't natively support chat history in the same way, so we concatenate history as text history_text = "" for msg in history: history_text += f"User: {msg['user']}\nAssistant: {msg['bot']}\n" if msg["bot"] else f"User: {msg['user']}\n" full_query = history_text + f"User: {query}\n" headers = { "Content-Type": "application/json" } payload = { "contents": [{"parts": [{"text": full_query}]}] } try: async with httpx.AsyncClient() as client: response = await client.post( f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_api_key}", headers=headers, json=payload ) response.raise_for_status() return response.json()['candidates'][0]['content']['parts'][0]['text'] except httpx.HTTPStatusError as e: logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}") return f"Error: Gemini HTTP Status Error: {e.response.status_code}, {e.response.text}" except Exception as e: logger.error(f"Gemini Error: {str(e)}") return f"Error: Gemini Error: {str(e)}" async def query_model(query: str, provider: str, history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]]], None]: provider = provider.lower() response = "" if provider == "openai": async for chunk in ask_openai(query, history): response += chunk yield chunk, history # Yield partial response for streaming elif provider == "anthropic": response = await ask_anthropic(query, history) yield response, history elif provider == "gemini": response = await ask_gemini(query, history) yield response, history else: response = f"Error: Unknown provider: {provider}" yield response, history # Update history with the new query and response updated_history = history + [{"user": query, "bot": response}] logger.info(f"Updated history: {updated_history}") yield response, updated_history # Final yield with updated history async def submit_query(query: str, provider: str, history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]]], None]: if not query.strip(): yield "", history, history return response = "" async for response_chunk, updated_history in query_model(query, provider, history): response += response_chunk yield "", updated_history, updated_history # Yield intermediate updates for streaming yield "", updated_history, updated_history # Final yield with cleared query # Gradio interface def clear_history(): return [], [] with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Multi-Model Chat") gr.Markdown("Chat with OpenAI, Anthropic, or Gemini. Select a provider and start typing!") provider = gr.Dropdown(choices=["OpenAI", "Anthropic", "Gemini"], label="Select Provider", value="OpenAI") history_state = gr.State(value=[]) chatbot = gr.Chatbot(label="Conversation", type="messages") query = gr.Textbox(label="Enter your query", placeholder="e.g., What is the capital of the United States?") submit_button = gr.Button("Submit") clear_button = gr.Button("Clear History") submit_button.click( fn=submit_query, inputs=[query, provider, history_state], outputs=[query, chatbot, history_state] ) clear_button.click( fn=clear_history, inputs=[], outputs=[chatbot, history_state] ) # Launch the Gradio app demo.launch()