Ganesh Chintalapati
final version v1
2709e97
import os
import httpx
import json
import traceback
from typing import AsyncGenerator, List, Dict
from config import logger
# ===== OpenAI =====
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
logger.error("OpenAI API key not provided")
yield "Error: OpenAI API key not provided."
return
messages = []
for msg in history:
messages.append({"role": "user", "content": msg["user"]})
if msg.get("openai"):
messages.append({"role": "assistant", "content": msg["openai"]})
messages.append({"role": "user", "content": query})
headers = {
"Authorization": f"Bearer {openai_api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-3.5-turbo",
"messages": messages,
"stream": True
}
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
data = line[6:]
if data.strip() == "[DONE]":
break
if not data.strip():
continue
try:
json_data = json.loads(data)
delta = json_data["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]
except Exception as e:
logger.error(f"OpenAI parse error: {e}")
yield f"[OpenAI Error]: {e}"
except Exception as e:
logger.error(f"OpenAI API error: {e}")
yield f"[OpenAI Error]: {e}"
# ===== Anthropic =====
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
if not anthropic_api_key:
logger.error("Anthropic API key not provided")
yield "Error: Anthropic API key not provided."
return
messages = []
for msg in history:
messages.append({"role": "user", "content": msg["user"]})
if msg.get("anthropic"):
messages.append({"role": "assistant", "content": msg["anthropic"]})
messages.append({"role": "user", "content": query})
headers = {
"x-api-key": anthropic_api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
payload = {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": messages,
"stream": True
}
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", "https://api.anthropic.com/v1/messages", headers=headers, json=payload) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
data = line[6:]
if data.strip() == "[DONE]":
break
if not data.strip():
continue
try:
json_data = json.loads(data)
if json_data.get("type") == "content_block_delta" and "delta" in json_data:
yield json_data["delta"].get("text", "")
except Exception as e:
logger.error(f"Anthropic parse error: {e}")
yield f"[Anthropic Error]: {e}"
except Exception as e:
logger.error(f"Anthropic API error: {e}")
yield f"[Anthropic Error]: {e}"
# ===== Gemini =====
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("Gemini API key not provided")
yield "Error: Gemini API key not provided."
return
history_text = ""
for msg in history:
history_text += f"User: {msg['user']}\n"
if msg.get("gemini"):
history_text += f"Assistant: {msg['gemini']}\n"
full_prompt = f"{history_text}User: {query}\n"
headers = {"Content-Type": "application/json"}
payload = {
"contents": [{"parts": [{"text": full_prompt}]}]
}
try:
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key={gemini_api_key}",
headers=headers,
json=payload
) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
if not chunk.strip():
continue
buffer += chunk
try:
json_data = json.loads(buffer.strip(", \n"))
buffer = ""
# handle both list and dict format
objects = json_data if isinstance(json_data, list) else [json_data]
for obj in objects:
candidates = obj.get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
text = part.get("text", "")
if text:
yield text
except json.JSONDecodeError:
continue # wait for more data
except Exception as e:
logger.error(f"Gemini API error: {e}")
yield f"[Gemini Error]: {e}"