Ganesh Chintalapati commited on
Commit
27f3c72
·
1 Parent(s): 7f76d3e

All working except google streaming

Browse files
Files changed (1) hide show
  1. app.py +121 -71
app.py CHANGED
@@ -82,11 +82,12 @@ async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerato
82
  logger.error(f"OpenAI Error: {str(e)}")
83
  yield f"Error: OpenAI Error: {str(e)}"
84
 
85
- async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str:
86
  anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
87
  if not anthropic_api_key:
88
  logger.error("Anthropic API key not provided")
89
- return "Error: Anthropic API key not provided."
 
90
 
91
  # Build message history with user and assistant roles
92
  messages = []
@@ -105,42 +106,55 @@ async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> str:
105
  payload = {
106
  "model": "claude-3-5-sonnet-20241022",
107
  "max_tokens": 1024,
108
- "messages": messages
 
109
  }
110
 
111
  try:
112
  async with httpx.AsyncClient(timeout=30.0) as client:
113
- logger.info(f"Sending Anthropic request: {payload}")
114
- response = await client.post("https://api.anthropic.com/v1/messages", headers=headers, json=payload)
115
-
116
- response.raise_for_status()
117
- response_json = response.json()
118
- logger.info(f"Anthropic response: {response_json}")
119
-
120
- # Validate response structure
121
- if not isinstance(response_json, dict) or "content" not in response_json or not response_json["content"]:
122
- logger.error(f"Invalid Anthropic response structure: {response_json}")
123
- return f"Error: Invalid Anthropic response structure"
124
-
125
- content = response_json["content"]
126
- if not isinstance(content, list) or not content or "text" not in content[0]:
127
- logger.error(f"Invalid Anthropic content format: {content}")
128
- return f"Error: Invalid Anthropic content format"
129
-
130
- return response_json["content"][0]["text"]
 
 
 
 
 
 
 
 
 
 
 
131
  except httpx.HTTPStatusError as e:
132
  response_text = await e.response.aread()
133
  logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}")
134
- return f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}"
135
  except Exception as e:
136
  logger.error(f"Anthropic Error: {str(e)}\nStack trace: {traceback.format_exc()}")
137
- return f"Error: Anthropic Error: {str(e)}"
138
 
139
- async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str:
140
  gemini_api_key = os.getenv("GEMINI_API_KEY")
141
  if not gemini_api_key:
142
  logger.error("Gemini API key not provided")
143
- return "Error: Gemini API key not provided."
 
144
 
145
  # Concatenate history as text for Gemini
146
  history_text = ""
@@ -158,45 +172,71 @@ async def ask_gemini(query: str, history: List[Dict[str, str]]) -> str:
158
 
159
  try:
160
  async with httpx.AsyncClient(timeout=30.0) as client:
161
- logger.info(f"Sending Gemini request: {payload}")
162
- response = await client.post(
163
- f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_api_key}",
 
164
  headers=headers,
165
  json=payload
166
- )
167
-
168
- response.raise_for_status()
169
- response_json = response.json()
170
- logger.info(f"Gemini response: {response_json}")
171
-
172
- # Validate response structure
173
- if not isinstance(response_json, dict) or "candidates" not in response_json or not response_json["candidates"]:
174
- logger.error(f"Invalid Gemini response structure: {response_json}")
175
- return f"Error: Invalid Gemini response structure"
176
-
177
- candidates = response_json["candidates"]
178
- if not isinstance(candidates, list) or not candidates or "content" not in candidates[0]:
179
- logger.error(f"Invalid Gemini candidates format: {candidates}")
180
- return f"Error: Invalid Gemini candidates format"
181
-
182
- content = candidates[0]["content"]
183
- if not isinstance(content, dict) or "parts" not in content or not content["parts"]:
184
- logger.error(f"Invalid Gemini content format: {content}")
185
- return f"Error: Invalid Gemini content format"
186
-
187
- parts = content["parts"]
188
- if not isinstance(parts, list) or not parts or "text" not in parts[0]:
189
- logger.error(f"Invalid Gemini parts format: {parts}")
190
- return f"Error: Invalid Gemini parts format"
191
-
192
- return parts[0]["text"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
  except httpx.HTTPStatusError as e:
194
  response_text = await e.response.aread()
195
  logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}")
196
- return f"Error: Gemini HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}"
197
  except Exception as e:
198
  logger.error(f"Gemini Error: {str(e)}\nStack trace: {traceback.format_exc()}")
199
- return f"Error: Gemini Error: {str(e)}"
200
 
201
  async def query_model(query: str, providers: List[str], history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]], None]:
202
  logger.info(f"Processing query with providers: {providers}")
@@ -225,30 +265,40 @@ async def query_model(query: str, providers: List[str], history: List[Dict[str,
225
  elif line.startswith("[Gemini]:"):
226
  gemini_messages.append({"role": "assistant", "content": line[len("[Gemini]:"):].strip()})
227
 
228
- # Handle OpenAI (streaming)
229
  if "OpenAI" in providers:
230
  openai_messages.append({"role": "user", "content": query})
 
 
 
 
 
 
 
 
 
 
231
  async for chunk in ask_openai(query, history):
232
  openai_response += chunk
233
- # Yield streaming updates for OpenAI
234
  openai_messages[-1] = {"role": "assistant", "content": openai_response}
235
  yield "", openai_messages, anthropic_messages, gemini_messages
236
- if openai_response.strip() and not openai_response.startswith("Error:"):
237
- openai_messages[-1] = {"role": "assistant", "content": openai_response}
238
 
239
- # Handle Anthropic (non-streaming)
240
  if "Anthropic" in providers:
241
- anthropic_messages.append({"role": "user", "content": query})
242
- anthropic_response = await ask_anthropic(query, history)
243
- if anthropic_response.strip() and not anthropic_response.startswith("Error:"):
244
- anthropic_messages.append({"role": "assistant", "content": anthropic_response})
 
245
 
246
- # Handle Gemini (non-streaming)
247
  if "Gemini" in providers:
248
- gemini_messages.append({"role": "user", "content": query})
249
- gemini_response = await ask_gemini(query, history)
250
- if gemini_response.strip() and not gemini_response.startswith("Error:"):
251
- gemini_messages.append({"role": "assistant", "content": gemini_response})
 
252
 
253
  # Combine responses for history
254
  responses = []
 
82
  logger.error(f"OpenAI Error: {str(e)}")
83
  yield f"Error: OpenAI Error: {str(e)}"
84
 
85
+ async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
86
  anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
87
  if not anthropic_api_key:
88
  logger.error("Anthropic API key not provided")
89
+ yield "Error: Anthropic API key not provided."
90
+ return
91
 
92
  # Build message history with user and assistant roles
93
  messages = []
 
106
  payload = {
107
  "model": "claude-3-5-sonnet-20241022",
108
  "max_tokens": 1024,
109
+ "messages": messages,
110
+ "stream": True
111
  }
112
 
113
  try:
114
  async with httpx.AsyncClient(timeout=30.0) as client:
115
+ logger.info(f"Sending Anthropic streaming request: {payload}")
116
+ async with client.stream("POST", "https://api.anthropic.com/v1/messages", headers=headers, json=payload) as response:
117
+ response.raise_for_status()
118
+ buffer = ""
119
+ async for chunk in response.aiter_text():
120
+ if chunk:
121
+ buffer += chunk
122
+ # Process complete JSON lines
123
+ while "\n" in buffer:
124
+ line, buffer = buffer.split("\n", 1)
125
+ if line.startswith("data: "):
126
+ data = line[6:] # Remove "data: " prefix
127
+ if data.strip() == "[DONE]":
128
+ break
129
+ if not data.strip():
130
+ continue
131
+ try:
132
+ json_data = json.loads(data)
133
+ if json_data.get("type") == "content_block_delta" and "delta" in json_data and "text" in json_data["delta"]:
134
+ yield json_data["delta"]["text"]
135
+ elif json_data.get("type") == "message_start" or json_data.get("type") == "message_delta":
136
+ continue # Skip metadata events
137
+ except json.JSONDecodeError as e:
138
+ logger.error(f"Error parsing Anthropic stream chunk: {str(e)} - Data: {data}")
139
+ yield f"Error parsing stream: {str(e)}"
140
+ except Exception as e:
141
+ logger.error(f"Unexpected error in Anthropic stream: {str(e)} - Data: {data}")
142
+ yield f"Error in stream: {str(e)}"
143
+
144
  except httpx.HTTPStatusError as e:
145
  response_text = await e.response.aread()
146
  logger.error(f"Anthropic HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}")
147
+ yield f"Error: Anthropic HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}"
148
  except Exception as e:
149
  logger.error(f"Anthropic Error: {str(e)}\nStack trace: {traceback.format_exc()}")
150
+ yield f"Error: Anthropic Error: {str(e)}"
151
 
152
+ async def ask_gemini(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
153
  gemini_api_key = os.getenv("GEMINI_API_KEY")
154
  if not gemini_api_key:
155
  logger.error("Gemini API key not provided")
156
+ yield "Error: Gemini API key not provided."
157
+ return
158
 
159
  # Concatenate history as text for Gemini
160
  history_text = ""
 
172
 
173
  try:
174
  async with httpx.AsyncClient(timeout=30.0) as client:
175
+ logger.info(f"Sending Gemini streaming request: {payload}")
176
+ async with client.stream(
177
+ "POST",
178
+ f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key={gemini_api_key}",
179
  headers=headers,
180
  json=payload
181
+ ) as response:
182
+ response.raise_for_status()
183
+ buffer = ""
184
+ async for chunk in response.aiter_text():
185
+ if chunk:
186
+ buffer += chunk
187
+ logger.info(f"Gemini stream chunk: {chunk}")
188
+ # Try to parse buffer as multiple JSON objects
189
+ while buffer.strip():
190
+ try:
191
+ # Attempt to parse the buffer as JSON
192
+ json_data = json.loads(buffer)
193
+ logger.info(f"Parsed Gemini JSON: {json_data}")
194
+ buffer = "" # Reset buffer after successful parse
195
+ # Handle both single object and list of objects
196
+ objects = json_data if isinstance(json_data, list) else [json_data]
197
+ for obj in objects:
198
+ if isinstance(obj, dict) and "candidates" in obj and obj["candidates"]:
199
+ content = obj["candidates"][0].get("content", {})
200
+ if "parts" in content and content["parts"]:
201
+ text = content["parts"][0].get("text", "")
202
+ if text:
203
+ yield text
204
+ break # Exit loop after successful parse
205
+ except json.JSONDecodeError as e:
206
+ # Check if buffer might contain a partial object followed by a comma
207
+ comma_index = buffer.rfind(",")
208
+ if comma_index != -1:
209
+ # Try parsing up to the last comma
210
+ try:
211
+ json_data = json.loads(buffer[:comma_index])
212
+ logger.info(f"Parsed Gemini JSON (before comma): {json_data}")
213
+ buffer = buffer[comma_index + 1:].strip()
214
+ objects = json_data if isinstance(json_data, list) else [json_data]
215
+ for obj in objects:
216
+ if isinstance(obj, dict) and "candidates" in obj and obj["candidates"]:
217
+ content = obj["candidates"][0].get("content", {})
218
+ if "parts" in content and content["parts"]:
219
+ text = content["parts"][0].get("text", "")
220
+ if text:
221
+ yield text
222
+ continue # Continue processing remaining buffer
223
+ except json.JSONDecodeError:
224
+ pass # Continue accumulating buffer
225
+ # If parsing fails, accumulate more data
226
+ break
227
+ except Exception as e:
228
+ logger.error(f"Unexpected error in Gemini stream: {str(e)} - Data: {buffer}")
229
+ yield f"Error in stream: {str(e)}"
230
+ buffer = ""
231
+ break
232
+
233
  except httpx.HTTPStatusError as e:
234
  response_text = await e.response.aread()
235
  logger.error(f"Gemini HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}")
236
+ yield f"Error: Gemini HTTP Status Error: {e.response.status_code}, {response_text.decode('utf-8')}"
237
  except Exception as e:
238
  logger.error(f"Gemini Error: {str(e)}\nStack trace: {traceback.format_exc()}")
239
+ yield f"Error: Gemini Error: {str(e)}"
240
 
241
  async def query_model(query: str, providers: List[str], history: List[Dict[str, str]]) -> AsyncGenerator[Tuple[str, List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]], None]:
242
  logger.info(f"Processing query with providers: {providers}")
 
265
  elif line.startswith("[Gemini]:"):
266
  gemini_messages.append({"role": "assistant", "content": line[len("[Gemini]:"):].strip()})
267
 
268
+ # Append the current query to all chatbots
269
  if "OpenAI" in providers:
270
  openai_messages.append({"role": "user", "content": query})
271
+ openai_messages.append({"role": "assistant", "content": ""})
272
+ if "Anthropic" in providers:
273
+ anthropic_messages.append({"role": "user", "content": query})
274
+ anthropic_messages.append({"role": "assistant", "content": ""})
275
+ if "Gemini" in providers:
276
+ gemini_messages.append({"role": "user", "content": query})
277
+ gemini_messages.append({"role": "assistant", "content": ""})
278
+
279
+ # Handle OpenAI (streaming)
280
+ if "OpenAI" in providers:
281
  async for chunk in ask_openai(query, history):
282
  openai_response += chunk
283
+ # Update OpenAI chatbot with streaming response
284
  openai_messages[-1] = {"role": "assistant", "content": openai_response}
285
  yield "", openai_messages, anthropic_messages, gemini_messages
 
 
286
 
287
+ # Handle Anthropic (streaming)
288
  if "Anthropic" in providers:
289
+ async for chunk in ask_anthropic(query, history):
290
+ anthropic_response += chunk
291
+ # Update Anthropic chatbot with streaming response
292
+ anthropic_messages[-1] = {"role": "assistant", "content": anthropic_response}
293
+ yield "", openai_messages, anthropic_messages, gemini_messages
294
 
295
+ # Handle Gemini (streaming)
296
  if "Gemini" in providers:
297
+ async for chunk in ask_gemini(query, history):
298
+ gemini_response += chunk
299
+ # Update Gemini chatbot with streaming response
300
+ gemini_messages[-1] = {"role": "assistant", "content": gemini_response}
301
+ yield "", openai_messages, anthropic_messages, gemini_messages
302
 
303
  # Combine responses for history
304
  responses = []