bibibi12345 commited on
Commit
0a33ddd
·
1 Parent(s): 04c79ee
Files changed (2) hide show
  1. app/api_helpers.py +84 -28
  2. app/openai_handler.py +49 -21
app/api_helpers.py CHANGED
@@ -34,6 +34,7 @@ class StreamingReasoningProcessor:
34
  self.tag_buffer = ""
35
  self.inside_tag = False
36
  self.reasoning_buffer = ""
 
37
 
38
  def process_chunk(self, content: str) -> tuple[str, str]:
39
  """
@@ -45,9 +46,14 @@ class StreamingReasoningProcessor:
45
  Returns:
46
  A tuple of:
47
  - processed_content: Content with reasoning tags removed
48
- - current_reasoning: Complete reasoning text if a closing tag was found
49
  """
50
- # Add new content to buffer
 
 
 
 
 
51
  self.tag_buffer += content
52
 
53
  processed_content = ""
@@ -58,12 +64,27 @@ class StreamingReasoningProcessor:
58
  # Look for opening tag
59
  open_pos = self.tag_buffer.find(self.open_tag)
60
  if open_pos == -1:
61
- # No opening tag found
62
- if len(self.tag_buffer) >= len(self.open_tag):
63
- # Safe to output all but the last few chars (in case tag is split)
64
- safe_length = len(self.tag_buffer) - len(self.open_tag) + 1
65
- processed_content += self.tag_buffer[:safe_length]
66
- self.tag_buffer = self.tag_buffer[safe_length:]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  break
68
  else:
69
  # Found opening tag
@@ -74,18 +95,40 @@ class StreamingReasoningProcessor:
74
  # Inside tag, look for closing tag
75
  close_pos = self.tag_buffer.find(self.close_tag)
76
  if close_pos == -1:
77
- # No closing tag yet
78
- if len(self.tag_buffer) >= len(self.close_tag):
79
- # Safe to add to reasoning buffer
80
- safe_length = len(self.tag_buffer) - len(self.close_tag) + 1
81
- self.reasoning_buffer += self.tag_buffer[:safe_length]
82
- self.tag_buffer = self.tag_buffer[safe_length:]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  break
84
  else:
85
  # Found closing tag
86
- self.reasoning_buffer += self.tag_buffer[:close_pos]
87
- current_reasoning = self.reasoning_buffer
88
- self.reasoning_buffer = ""
 
 
89
  self.tag_buffer = self.tag_buffer[close_pos + len(self.close_tag):]
90
  self.inside_tag = False
91
 
@@ -103,17 +146,30 @@ class StreamingReasoningProcessor:
103
  remaining_content = ""
104
  remaining_reasoning = ""
105
 
106
- if self.tag_buffer and not self.inside_tag:
107
- # If we have buffered content and we're not inside a tag,
108
- # it's safe to output all of it
109
- remaining_content = self.tag_buffer
110
- self.tag_buffer = ""
111
- elif self.inside_tag:
112
- # If we're inside a tag when the stream ends, we have an unclosed tag
113
- # Return the partial content as regular content (including the opening tag)
114
- remaining_content = f"<{self.tag_name}>{self.reasoning_buffer}{self.tag_buffer}"
115
- self.reasoning_buffer = ""
116
- self.tag_buffer = ""
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  self.inside_tag = False
118
 
119
  return remaining_content, remaining_reasoning
 
34
  self.tag_buffer = ""
35
  self.inside_tag = False
36
  self.reasoning_buffer = ""
37
+ self.partial_tag_buffer = "" # Buffer for potential partial tags
38
 
39
  def process_chunk(self, content: str) -> tuple[str, str]:
40
  """
 
46
  Returns:
47
  A tuple of:
48
  - processed_content: Content with reasoning tags removed
49
+ - current_reasoning: Reasoning text found in this chunk (partial or complete)
50
  """
51
+ # Add new content to buffer, but also handle any partial tag from before
52
+ if self.partial_tag_buffer:
53
+ # We had a partial tag from the previous chunk
54
+ content = self.partial_tag_buffer + content
55
+ self.partial_tag_buffer = ""
56
+
57
  self.tag_buffer += content
58
 
59
  processed_content = ""
 
64
  # Look for opening tag
65
  open_pos = self.tag_buffer.find(self.open_tag)
66
  if open_pos == -1:
67
+ # No complete opening tag found
68
+ # Check if we might have a partial tag at the end
69
+ partial_match = False
70
+ for i in range(1, min(len(self.open_tag), len(self.tag_buffer) + 1)):
71
+ if self.tag_buffer[-i:] == self.open_tag[:i]:
72
+ partial_match = True
73
+ # Output everything except the potential partial tag
74
+ if len(self.tag_buffer) > i:
75
+ processed_content += self.tag_buffer[:-i]
76
+ self.partial_tag_buffer = self.tag_buffer[-i:]
77
+ self.tag_buffer = ""
78
+ else:
79
+ # Entire buffer is partial tag
80
+ self.partial_tag_buffer = self.tag_buffer
81
+ self.tag_buffer = ""
82
+ break
83
+
84
+ if not partial_match:
85
+ # No partial tag, output everything
86
+ processed_content += self.tag_buffer
87
+ self.tag_buffer = ""
88
  break
89
  else:
90
  # Found opening tag
 
95
  # Inside tag, look for closing tag
96
  close_pos = self.tag_buffer.find(self.close_tag)
97
  if close_pos == -1:
98
+ # No complete closing tag yet
99
+ # Check for partial closing tag
100
+ partial_match = False
101
+ for i in range(1, min(len(self.close_tag), len(self.tag_buffer) + 1)):
102
+ if self.tag_buffer[-i:] == self.close_tag[:i]:
103
+ partial_match = True
104
+ # Add everything except potential partial tag to reasoning
105
+ if len(self.tag_buffer) > i:
106
+ new_reasoning = self.tag_buffer[:-i]
107
+ self.reasoning_buffer += new_reasoning
108
+ if new_reasoning: # Stream reasoning as it arrives
109
+ current_reasoning = new_reasoning
110
+ self.partial_tag_buffer = self.tag_buffer[-i:]
111
+ self.tag_buffer = ""
112
+ else:
113
+ # Entire buffer is partial tag
114
+ self.partial_tag_buffer = self.tag_buffer
115
+ self.tag_buffer = ""
116
+ break
117
+
118
+ if not partial_match:
119
+ # No partial tag, add all to reasoning and stream it
120
+ if self.tag_buffer:
121
+ self.reasoning_buffer += self.tag_buffer
122
+ current_reasoning = self.tag_buffer
123
+ self.tag_buffer = ""
124
  break
125
  else:
126
  # Found closing tag
127
+ final_reasoning_chunk = self.tag_buffer[:close_pos]
128
+ self.reasoning_buffer += final_reasoning_chunk
129
+ if final_reasoning_chunk: # Include the last chunk of reasoning
130
+ current_reasoning = final_reasoning_chunk
131
+ self.reasoning_buffer = "" # Clear buffer after complete tag
132
  self.tag_buffer = self.tag_buffer[close_pos + len(self.close_tag):]
133
  self.inside_tag = False
134
 
 
146
  remaining_content = ""
147
  remaining_reasoning = ""
148
 
149
+ # First handle any partial tag buffer
150
+ if self.partial_tag_buffer:
151
+ # The partial tag wasn't completed, so treat it as regular content
152
+ remaining_content += self.partial_tag_buffer
153
+ self.partial_tag_buffer = ""
154
+
155
+ if not self.inside_tag:
156
+ # If we're not inside a tag, output any remaining buffer
157
+ if self.tag_buffer:
158
+ remaining_content += self.tag_buffer
159
+ self.tag_buffer = ""
160
+ else:
161
+ # If we're inside a tag when stream ends, we have incomplete reasoning
162
+ # First, yield any reasoning we've accumulated
163
+ if self.reasoning_buffer:
164
+ remaining_reasoning = self.reasoning_buffer
165
+ self.reasoning_buffer = ""
166
+
167
+ # Then output the remaining buffer as content (it's an incomplete tag)
168
+ if self.tag_buffer:
169
+ # Don't include the opening tag in output - just the buffer content
170
+ remaining_content += self.tag_buffer
171
+ self.tag_buffer = ""
172
+
173
  self.inside_tag = False
174
 
175
  return remaining_content, remaining_reasoning
app/openai_handler.py CHANGED
@@ -121,6 +121,7 @@ class OpenAIDirectHandler:
121
  # Create processor for tag-based extraction across chunks
122
  reasoning_processor = StreamingReasoningProcessor(VERTEX_REASONING_TAG)
123
  chunk_count = 0
 
124
 
125
  async for chunk in stream_response:
126
  chunk_count += 1
@@ -145,20 +146,36 @@ class OpenAIDirectHandler:
145
  if processed_content or current_reasoning:
146
  print(f"DEBUG: Chunk {chunk_count} - Processed content: '{processed_content}', Reasoning: '{current_reasoning[:50]}...' if len(current_reasoning) > 50 else '{current_reasoning}'")
147
 
148
- # Update delta with processed content
 
 
 
149
  if current_reasoning:
150
- delta['reasoning_content'] = current_reasoning
 
 
 
 
151
  if processed_content:
152
- delta['content'] = processed_content
153
- elif 'content' in delta:
154
- del delta['content']
155
-
156
- yield f"data: {json.dumps(chunk_as_dict)}\n\n"
 
 
 
 
 
 
 
 
 
157
 
158
  except Exception as chunk_error:
159
  error_msg = f"Error processing OpenAI chunk for {request.model}: {str(chunk_error)}"
160
  print(f"ERROR: {error_msg}")
161
- if len(error_msg) > 1024:
162
  error_msg = error_msg[:1024] + "..."
163
  error_response = create_openai_error_response(500, error_msg, "server_error")
164
  yield f"data: {json.dumps(error_response)}\n\n"
@@ -173,35 +190,46 @@ class OpenAIDirectHandler:
173
  # Flush any remaining buffered content
174
  remaining_content, remaining_reasoning = reasoning_processor.flush_remaining()
175
 
176
- if remaining_content:
177
- print(f"DEBUG: Flushing remaining content: '{remaining_content}'")
178
- final_chunk = {
 
179
  "id": f"chatcmpl-{int(time.time())}",
180
  "object": "chat.completion.chunk",
181
  "created": int(time.time()),
182
  "model": request.model,
183
- "choices": [{"index": 0, "delta": {"content": remaining_content}, "finish_reason": None}]
184
  }
185
- yield f"data: {json.dumps(final_chunk)}\n\n"
186
-
187
- # Send a proper finish reason chunk
188
- finish_chunk = {
 
 
189
  "id": f"chatcmpl-{int(time.time())}",
190
  "object": "chat.completion.chunk",
191
  "created": int(time.time()),
192
  "model": request.model,
193
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
194
  }
195
- yield f"data: {json.dumps(finish_chunk)}\n\n"
 
196
 
197
- # Note: remaining_reasoning is not used here since incomplete reasoning
198
- # is treated as regular content when tags are unclosed
 
 
 
 
 
 
 
199
 
200
  yield "data: [DONE]\n\n"
201
 
202
  except Exception as stream_error:
203
  error_msg = str(stream_error)
204
- if len(error_msg) > 1024:
205
  error_msg = error_msg[:1024] + "..."
206
  error_msg_full = f"Error during OpenAI streaming for {request.model}: {error_msg}"
207
  print(f"ERROR: {error_msg_full}")
 
121
  # Create processor for tag-based extraction across chunks
122
  reasoning_processor = StreamingReasoningProcessor(VERTEX_REASONING_TAG)
123
  chunk_count = 0
124
+ has_sent_content = False
125
 
126
  async for chunk in stream_response:
127
  chunk_count += 1
 
146
  if processed_content or current_reasoning:
147
  print(f"DEBUG: Chunk {chunk_count} - Processed content: '{processed_content}', Reasoning: '{current_reasoning[:50]}...' if len(current_reasoning) > 50 else '{current_reasoning}'")
148
 
149
+ # Send chunks for both reasoning and content as they arrive
150
+ chunks_to_send = []
151
+
152
+ # If we have reasoning content, send it
153
  if current_reasoning:
154
+ reasoning_chunk = chunk_as_dict.copy()
155
+ reasoning_chunk['choices'][0]['delta'] = {'reasoning_content': current_reasoning}
156
+ chunks_to_send.append(reasoning_chunk)
157
+
158
+ # If we have regular content, send it
159
  if processed_content:
160
+ content_chunk = chunk_as_dict.copy()
161
+ content_chunk['choices'][0]['delta'] = {'content': processed_content}
162
+ chunks_to_send.append(content_chunk)
163
+ has_sent_content = True
164
+
165
+ # Send all chunks
166
+ for chunk_to_send in chunks_to_send:
167
+ yield f"data: {json.dumps(chunk_to_send)}\n\n"
168
+ else:
169
+ # Still yield the chunk even if no content (could have other delta fields)
170
+ yield f"data: {json.dumps(chunk_as_dict)}\n\n"
171
+ else:
172
+ # Yield chunks without choices too (they might contain metadata)
173
+ yield f"data: {json.dumps(chunk_as_dict)}\n\n"
174
 
175
  except Exception as chunk_error:
176
  error_msg = f"Error processing OpenAI chunk for {request.model}: {str(chunk_error)}"
177
  print(f"ERROR: {error_msg}")
178
+ if len(error_msg) > 1024:
179
  error_msg = error_msg[:1024] + "..."
180
  error_response = create_openai_error_response(500, error_msg, "server_error")
181
  yield f"data: {json.dumps(error_response)}\n\n"
 
190
  # Flush any remaining buffered content
191
  remaining_content, remaining_reasoning = reasoning_processor.flush_remaining()
192
 
193
+ # Send any remaining reasoning first
194
+ if remaining_reasoning:
195
+ print(f"DEBUG: Flushing remaining reasoning: '{remaining_reasoning[:50]}...' if len(remaining_reasoning) > 50 else '{remaining_reasoning}'")
196
+ reasoning_chunk = {
197
  "id": f"chatcmpl-{int(time.time())}",
198
  "object": "chat.completion.chunk",
199
  "created": int(time.time()),
200
  "model": request.model,
201
+ "choices": [{"index": 0, "delta": {"reasoning_content": remaining_reasoning}, "finish_reason": None}]
202
  }
203
+ yield f"data: {json.dumps(reasoning_chunk)}\n\n"
204
+
205
+ # Send any remaining content
206
+ if remaining_content:
207
+ print(f"DEBUG: Flushing remaining content: '{remaining_content}'")
208
+ final_chunk = {
209
  "id": f"chatcmpl-{int(time.time())}",
210
  "object": "chat.completion.chunk",
211
  "created": int(time.time()),
212
  "model": request.model,
213
+ "choices": [{"index": 0, "delta": {"content": remaining_content}, "finish_reason": None}]
214
  }
215
+ yield f"data: {json.dumps(final_chunk)}\n\n"
216
+ has_sent_content = True
217
 
218
+ # Always send a finish reason chunk
219
+ finish_chunk = {
220
+ "id": f"chatcmpl-{int(time.time())}",
221
+ "object": "chat.completion.chunk",
222
+ "created": int(time.time()),
223
+ "model": request.model,
224
+ "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
225
+ }
226
+ yield f"data: {json.dumps(finish_chunk)}\n\n"
227
 
228
  yield "data: [DONE]\n\n"
229
 
230
  except Exception as stream_error:
231
  error_msg = str(stream_error)
232
+ if len(error_msg) > 1024:
233
  error_msg = error_msg[:1024] + "..."
234
  error_msg_full = f"Error during OpenAI streaming for {request.model}: {error_msg}"
235
  print(f"ERROR: {error_msg_full}")