qfuxa commited on
Commit
ca4162a
·
2 Parent(s): 2ebc949 d7ef93c

Merge pull request #46 from QuentinFuxa/solving-ffmpeg-process-freezing-unexpectedly

Browse files
Files changed (1) hide show
  1. whisper_fastapi_online_server.py +53 -36
whisper_fastapi_online_server.py CHANGED
@@ -108,18 +108,30 @@ async def websocket_endpoint(websocket: WebSocket):
108
  await websocket.accept()
109
  print("WebSocket connection opened.")
110
 
111
- ffmpeg_process = await start_ffmpeg_decoder()
112
  pcm_buffer = bytearray()
113
- print("Loading online.")
114
  online = online_factory(args, asr, tokenizer)
115
- print("Online loaded.")
116
 
117
- if args.diarization:
118
- diarization = DiartDiarization(SAMPLE_RATE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
- # Continuously read decoded PCM from ffmpeg stdout in a background task
121
  async def ffmpeg_stdout_reader():
122
- nonlocal pcm_buffer
123
  loop = asyncio.get_event_loop()
124
  full_transcription = ""
125
  beg = time()
@@ -131,9 +143,23 @@ async def websocket_endpoint(websocket: WebSocket):
131
  elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec
132
  ffmpeg_buffer_from_duration = max(int(32000 * elapsed_time), 4096)
133
  beg = time()
134
- chunk = await loop.run_in_executor(
135
- None, ffmpeg_process.stdout.read, ffmpeg_buffer_from_duration
136
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  if not chunk:
138
  print("FFmpeg stdout closed.")
139
  break
@@ -151,13 +177,13 @@ async def websocket_endpoint(websocket: WebSocket):
151
 
152
  if transcription:
153
  chunk_history.append({
154
- "beg": transcription.start,
155
- "end": transcription.end,
156
- "text": transcription.text,
157
- "speaker": "0"
158
  })
159
-
160
- full_transcription += transcription.text
161
  buffer = online.get_buffer()
162
 
163
  if buffer in full_transcription: # With VAC, the buffer is not updated until the next chunk is processed
@@ -200,37 +226,28 @@ async def websocket_endpoint(websocket: WebSocket):
200
  while True:
201
  # Receive incoming WebM audio chunks from the client
202
  message = await websocket.receive_bytes()
203
- # Pass them to ffmpeg via stdin
204
- ffmpeg_process.stdin.write(message)
205
- ffmpeg_process.stdin.flush()
206
-
 
 
 
 
207
  except WebSocketDisconnect:
208
- print("WebSocket connection closed.")
209
- except Exception as e:
210
- print(f"Error in websocket loop: {e}")
211
  finally:
212
- # Clean up ffmpeg and the reader task
213
- try:
214
- ffmpeg_process.stdin.close()
215
- except:
216
- pass
217
  stdout_reader_task.cancel()
218
-
219
  try:
220
- ffmpeg_process.stdout.close()
 
221
  except:
222
  pass
223
-
224
- ffmpeg_process.wait()
225
- del online
226
-
227
  if args.diarization:
228
- # Stop Diart
229
  diarization.close()
230
 
231
 
232
 
233
-
234
  if __name__ == "__main__":
235
  import uvicorn
236
 
 
108
  await websocket.accept()
109
  print("WebSocket connection opened.")
110
 
111
+ ffmpeg_process = None
112
  pcm_buffer = bytearray()
 
113
  online = online_factory(args, asr, tokenizer)
114
+ diarization = DiartDiarization(SAMPLE_RATE) if args.diarization else None
115
 
116
+ async def restart_ffmpeg():
117
+ nonlocal ffmpeg_process, online, diarization, pcm_buffer
118
+ if ffmpeg_process:
119
+ try:
120
+ ffmpeg_process.kill()
121
+ await asyncio.get_event_loop().run_in_executor(None, ffmpeg_process.wait)
122
+ except Exception as e:
123
+ print(f"Error killing FFmpeg process: {e}")
124
+ ffmpeg_process = await start_ffmpeg_decoder()
125
+ pcm_buffer = bytearray()
126
+ online = online_factory(args, asr, tokenizer)
127
+ if args.diarization:
128
+ diarization = DiartDiarization(SAMPLE_RATE)
129
+ print("FFmpeg process started.")
130
+
131
+ await restart_ffmpeg()
132
 
 
133
  async def ffmpeg_stdout_reader():
134
+ nonlocal ffmpeg_process, online, diarization, pcm_buffer
135
  loop = asyncio.get_event_loop()
136
  full_transcription = ""
137
  beg = time()
 
143
  elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec
144
  ffmpeg_buffer_from_duration = max(int(32000 * elapsed_time), 4096)
145
  beg = time()
146
+
147
+ # Read chunk with timeout
148
+ try:
149
+ chunk = await asyncio.wait_for(
150
+ loop.run_in_executor(
151
+ None, ffmpeg_process.stdout.read, ffmpeg_buffer_from_duration
152
+ ),
153
+ timeout=5.0
154
+ )
155
+ except asyncio.TimeoutError:
156
+ print("FFmpeg read timeout. Restarting...")
157
+ await restart_ffmpeg()
158
+ full_transcription = ""
159
+ chunk_history = []
160
+ beg = time()
161
+ continue # Skip processing and read from new process
162
+
163
  if not chunk:
164
  print("FFmpeg stdout closed.")
165
  break
 
177
 
178
  if transcription:
179
  chunk_history.append({
180
+ "beg": transcription.start,
181
+ "end": transcription.end,
182
+ "text": transcription.text,
183
+ "speaker": "0"
184
  })
185
+
186
+ full_transcription += transcription.text if transcription else ""
187
  buffer = online.get_buffer()
188
 
189
  if buffer in full_transcription: # With VAC, the buffer is not updated until the next chunk is processed
 
226
  while True:
227
  # Receive incoming WebM audio chunks from the client
228
  message = await websocket.receive_bytes()
229
+ try:
230
+ ffmpeg_process.stdin.write(message)
231
+ ffmpeg_process.stdin.flush()
232
+ except (BrokenPipeError, AttributeError) as e:
233
+ print(f"Error writing to FFmpeg: {e}. Restarting...")
234
+ await restart_ffmpeg()
235
+ ffmpeg_process.stdin.write(message)
236
+ ffmpeg_process.stdin.flush()
237
  except WebSocketDisconnect:
238
+ print("WebSocket disconnected.")
 
 
239
  finally:
 
 
 
 
 
240
  stdout_reader_task.cancel()
 
241
  try:
242
+ ffmpeg_process.stdin.close()
243
+ ffmpeg_process.wait()
244
  except:
245
  pass
 
 
 
 
246
  if args.diarization:
 
247
  diarization.close()
248
 
249
 
250
 
 
251
  if __name__ == "__main__":
252
  import uvicorn
253