qfuxa commited on
Commit
566619b
·
1 Parent(s): ff49b3c

Refactor AudioProcessor methods for improved async handling and WebSocket integration

Browse files
Files changed (2) hide show
  1. audio.py +16 -15
  2. whisper_fastapi_online_server.py +16 -2
audio.py CHANGED
@@ -54,7 +54,7 @@ class AudioProcessor:
54
  / 32768.0)
55
  return pcm_array
56
 
57
- async def start_ffmpeg_decoder(self):
58
  """
59
  Start an FFmpeg process in async streaming mode that reads WebM from stdin
60
  and outputs raw s16le PCM on stdout. Returns the process object.
@@ -79,7 +79,7 @@ class AudioProcessor:
79
  await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
80
  except Exception as e:
81
  logger.warning(f"Error killing FFmpeg process: {e}")
82
- self.ffmpeg_process = await self.start_ffmpeg_decoder()
83
  self.pcm_buffer = bytearray()
84
 
85
  async def ffmpeg_stdout_reader(self):
@@ -198,10 +198,9 @@ class AudioProcessor:
198
  finally:
199
  self.diarization_queue.task_done()
200
 
201
- async def results_formatter(self, websocket):
202
  while True:
203
  try:
204
- # Get the current state
205
  state = await self.shared_state.get_current_state()
206
  tokens = state["tokens"]
207
  buffer_transcription = state["buffer_transcription"]
@@ -217,7 +216,6 @@ class AudioProcessor:
217
  sleep(0.5)
218
  state = await self.shared_state.get_current_state()
219
  tokens = state["tokens"]
220
- # Process tokens to create response
221
  previous_speaker = -1
222
  lines = []
223
  last_end_diarized = 0
@@ -273,22 +271,21 @@ class AudioProcessor:
273
  "beg": format_time(0),
274
  "end": format_time(tokens[-1].end) if tokens else format_time(0),
275
  "diff": 0
276
- }],
277
  "buffer_transcription": buffer_transcription,
278
  "buffer_diarization": buffer_diarization,
279
  "remaining_time_transcription": remaining_time_transcription,
280
  "remaining_time_diarization": remaining_time_diarization
281
-
282
  }
283
 
284
  response_content = ' '.join([str(line['speaker']) + ' ' + line["text"] for line in lines]) + ' | ' + buffer_transcription + ' | ' + buffer_diarization
285
 
286
  if response_content != self.shared_state.last_response_content:
287
  if lines or buffer_transcription or buffer_diarization:
288
- await websocket.send_json(response)
289
  self.shared_state.last_response_content = response_content
290
 
291
- # Add a small delay to avoid overwhelming the client
292
  await asyncio.sleep(0.1)
293
 
294
  except Exception as e:
@@ -296,18 +293,22 @@ class AudioProcessor:
296
  logger.warning(f"Traceback: {traceback.format_exc()}")
297
  await asyncio.sleep(0.5) # Back off on error
298
 
299
- async def create_tasks(self, websocket, diarization):
 
 
 
300
  tasks = []
301
  if self.args.transcription and self.online:
302
  tasks.append(asyncio.create_task(self.transcription_processor()))
303
- if self.args.diarization and diarization:
304
- tasks.append(asyncio.create_task(self.diarization_processor(diarization)))
305
- formatter_task = asyncio.create_task(self.results_formatter(websocket))
306
- tasks.append(formatter_task)
307
  stdout_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
308
  tasks.append(stdout_reader_task)
 
309
  self.tasks = tasks
310
- self.diarization = diarization
 
311
 
312
  async def cleanup(self):
313
  for task in self.tasks:
 
54
  / 32768.0)
55
  return pcm_array
56
 
57
+ def start_ffmpeg_decoder(self):
58
  """
59
  Start an FFmpeg process in async streaming mode that reads WebM from stdin
60
  and outputs raw s16le PCM on stdout. Returns the process object.
 
79
  await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
80
  except Exception as e:
81
  logger.warning(f"Error killing FFmpeg process: {e}")
82
+ self.ffmpeg_process = self.start_ffmpeg_decoder()
83
  self.pcm_buffer = bytearray()
84
 
85
  async def ffmpeg_stdout_reader(self):
 
198
  finally:
199
  self.diarization_queue.task_done()
200
 
201
+ async def results_formatter(self):
202
  while True:
203
  try:
 
204
  state = await self.shared_state.get_current_state()
205
  tokens = state["tokens"]
206
  buffer_transcription = state["buffer_transcription"]
 
216
  sleep(0.5)
217
  state = await self.shared_state.get_current_state()
218
  tokens = state["tokens"]
 
219
  previous_speaker = -1
220
  lines = []
221
  last_end_diarized = 0
 
271
  "beg": format_time(0),
272
  "end": format_time(tokens[-1].end) if tokens else format_time(0),
273
  "diff": 0
274
+ }],
275
  "buffer_transcription": buffer_transcription,
276
  "buffer_diarization": buffer_diarization,
277
  "remaining_time_transcription": remaining_time_transcription,
278
  "remaining_time_diarization": remaining_time_diarization
 
279
  }
280
 
281
  response_content = ' '.join([str(line['speaker']) + ' ' + line["text"] for line in lines]) + ' | ' + buffer_transcription + ' | ' + buffer_diarization
282
 
283
  if response_content != self.shared_state.last_response_content:
284
  if lines or buffer_transcription or buffer_diarization:
285
+ yield response
286
  self.shared_state.last_response_content = response_content
287
 
288
+ #small delay to avoid overwhelming the client
289
  await asyncio.sleep(0.1)
290
 
291
  except Exception as e:
 
293
  logger.warning(f"Traceback: {traceback.format_exc()}")
294
  await asyncio.sleep(0.5) # Back off on error
295
 
296
+ async def create_tasks(self, diarization=None):
297
+ if diarization:
298
+ self.diarization = diarization
299
+
300
  tasks = []
301
  if self.args.transcription and self.online:
302
  tasks.append(asyncio.create_task(self.transcription_processor()))
303
+ if self.args.diarization and self.diarization:
304
+ tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
305
+
 
306
  stdout_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
307
  tasks.append(stdout_reader_task)
308
+
309
  self.tasks = tasks
310
+
311
+ return self.results_formatter()
312
 
313
  async def cleanup(self):
314
  for task in self.tasks:
whisper_fastapi_online_server.py CHANGED
@@ -5,6 +5,7 @@ from fastapi.responses import HTMLResponse
5
  from fastapi.middleware.cors import CORSMiddleware
6
 
7
  from whisper_streaming_custom.whisper_online import backend_factory, warmup_asr
 
8
  import logging
9
  from parse_args import parse_args
10
  from audio import AudioProcessor
@@ -51,6 +52,16 @@ with open("web/live_transcription.html", "r", encoding="utf-8") as f:
51
  async def get():
52
  return HTMLResponse(html)
53
 
 
 
 
 
 
 
 
 
 
 
54
  @app.websocket("/asr")
55
  async def websocket_endpoint(websocket: WebSocket):
56
  audio_processor = AudioProcessor(args, asr, tokenizer)
@@ -58,14 +69,17 @@ async def websocket_endpoint(websocket: WebSocket):
58
  await websocket.accept()
59
  logger.info("WebSocket connection opened.")
60
 
61
- await audio_processor.create_tasks(websocket, diarization)
 
 
62
  try:
63
  while True:
64
  message = await websocket.receive_bytes()
65
- audio_processor.process_audio(message)
66
  except WebSocketDisconnect:
67
  logger.warning("WebSocket disconnected.")
68
  finally:
 
69
  audio_processor.cleanup()
70
  logger.info("WebSocket endpoint cleaned up.")
71
 
 
5
  from fastapi.middleware.cors import CORSMiddleware
6
 
7
  from whisper_streaming_custom.whisper_online import backend_factory, warmup_asr
8
+ import asyncio
9
  import logging
10
  from parse_args import parse_args
11
  from audio import AudioProcessor
 
52
  async def get():
53
  return HTMLResponse(html)
54
 
55
+
56
+ async def handle_websocket_results(websocket, results_generator):
57
+ """Consumes results from the audio processor and sends them via WebSocket."""
58
+ try:
59
+ async for response in results_generator:
60
+ await websocket.send_json(response)
61
+ except Exception as e:
62
+ logger.warning(f"Error in WebSocket results handler: {e}")
63
+
64
+
65
  @app.websocket("/asr")
66
  async def websocket_endpoint(websocket: WebSocket):
67
  audio_processor = AudioProcessor(args, asr, tokenizer)
 
69
  await websocket.accept()
70
  logger.info("WebSocket connection opened.")
71
 
72
+ results_generator = await audio_processor.create_tasks(diarization)
73
+ websocket_task = asyncio.create_task(handle_websocket_results(websocket, results_generator))
74
+
75
  try:
76
  while True:
77
  message = await websocket.receive_bytes()
78
+ await audio_processor.process_audio(message)
79
  except WebSocketDisconnect:
80
  logger.warning("WebSocket disconnected.")
81
  finally:
82
+ websocket_task.cancel()
83
  audio_processor.cleanup()
84
  logger.info("WebSocket endpoint cleaned up.")
85