Spaces:
Sleeping
Sleeping
import os | |
import httpx | |
import json | |
import traceback | |
from typing import AsyncGenerator, List, Dict | |
from config import logger | |
# ===== OpenAI ===== | |
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
logger.error("OpenAI API key not provided") | |
yield "Error: OpenAI API key not provided." | |
return | |
messages = [] | |
for msg in history: | |
messages.append({"role": "user", "content": msg["user"]}) | |
if msg.get("openai"): | |
messages.append({"role": "assistant", "content": msg["openai"]}) | |
messages.append({"role": "user", "content": query}) | |
headers = { | |
"Authorization": f"Bearer {openai_api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": "gpt-3.5-turbo", | |
"messages": messages, | |
"stream": True | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: | |
response.raise_for_status() | |
buffer = "" | |
async for chunk in response.aiter_text(): | |
if chunk: | |
buffer += chunk | |
while "\n" in buffer: | |
line, buffer = buffer.split("\n", 1) | |
if line.startswith("data: "): | |
data = line[6:] | |
if data.strip() == "[DONE]": | |
break | |
if not data.strip(): | |
continue | |
try: | |
json_data = json.loads(data) | |
delta = json_data["choices"][0].get("delta", {}) | |
if "content" in delta: | |
yield delta["content"] | |
except Exception as e: | |
logger.error(f"OpenAI parse error: {e}") | |
yield f"[OpenAI Error]: {e}" | |
except Exception as e: | |
logger.error(f"OpenAI API error: {e}") | |
yield f"[OpenAI Error]: {e}" | |
# ===== Anthropic ===== | |
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") | |
if not anthropic_api_key: | |
logger.error("Anthropic API key not provided") | |
yield "Error: Anthropic API key not provided." | |
return | |
messages = [] | |
for msg in history: | |
messages.append({"role": "user", "content": msg["user"]}) | |
if msg.get("anthropic"): | |
messages.append({"role": "assistant", "content": msg["anthropic"]}) | |
messages.append({"role": "user", "content": query}) | |
headers = { | |
"x-api-key": anthropic_api_key, | |
"anthropic-version": "2023-06-01", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": "claude-3-5-sonnet-20241022", | |
"max_tokens": 1024, | |
"messages": messages, | |
"stream": True | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
async with client.stream("POST", "https://api.anthropic.com/v1/messages", headers=headers, json=payload) as response: | |
response.raise_for_status() | |
buffer = "" | |
async for chunk in response.aiter_text(): | |
if chunk: | |
buffer += chunk | |
while "\n" in buffer: | |
line, buffer = buffer.split("\n", 1) | |
if line.startswith("data: "): | |
data = line[6:] | |
if data.strip() == "[DONE]": | |
break | |
if not data.strip(): | |
continue | |
try: | |
json_data = json.loads(data) | |
if json_data.get("type") == "content_block_delta" and "delta" in json_data: | |
yield json_data["delta"].get("text", "") | |
except Exception as e: | |
logger.error(f"Anthropic parse error: {e}") | |
yield f"[Anthropic Error]: {e}" | |
except Exception as e: | |
logger.error(f"Anthropic API error: {e}") | |
yield f"[Anthropic Error]: {e}" | |
# ===== Gemini ===== | |
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("Gemini API key not provided") | |
yield "Error: Gemini API key not provided." | |
return | |
history_text = "" | |
for msg in history: | |
history_text += f"User: {msg['user']}\n" | |
if msg.get("gemini"): | |
history_text += f"Assistant: {msg['gemini']}\n" | |
full_prompt = f"{history_text}User: {query}\n" | |
headers = {"Content-Type": "application/json"} | |
payload = { | |
"contents": [{"parts": [{"text": full_prompt}]}] | |
} | |
try: | |
async with httpx.AsyncClient() as client: | |
async with client.stream( | |
"POST", | |
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key={gemini_api_key}", | |
headers=headers, | |
json=payload | |
) as response: | |
response.raise_for_status() | |
buffer = "" | |
async for chunk in response.aiter_text(): | |
if not chunk.strip(): | |
continue | |
buffer += chunk | |
try: | |
json_data = json.loads(buffer.strip(", \n")) | |
buffer = "" | |
# handle both list and dict format | |
objects = json_data if isinstance(json_data, list) else [json_data] | |
for obj in objects: | |
candidates = obj.get("candidates", []) | |
if candidates: | |
parts = candidates[0].get("content", {}).get("parts", []) | |
for part in parts: | |
text = part.get("text", "") | |
if text: | |
yield text | |
except json.JSONDecodeError: | |
continue # wait for more data | |
except Exception as e: | |
logger.error(f"Gemini API error: {e}") | |
yield f"[Gemini Error]: {e}" | |