qfuxa commited on
Commit
c2bd36d
·
1 Parent(s): 4f4f3a0

uses sentinel object when end of transcription, to properly terminate tasks

Browse files
Files changed (1) hide show
  1. whisperlivekit/audio_processor.py +154 -51
whisperlivekit/audio_processor.py CHANGED
@@ -15,6 +15,8 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
15
  logger = logging.getLogger(__name__)
16
  logger.setLevel(logging.DEBUG)
17
 
 
 
18
  def format_time(seconds: float) -> str:
19
  """Format seconds as HH:MM:SS."""
20
  return str(timedelta(seconds=int(seconds)))
@@ -41,8 +43,9 @@ class AudioProcessor:
41
  self.last_ffmpeg_activity = time()
42
  self.ffmpeg_health_check_interval = 5
43
  self.ffmpeg_max_idle_time = 10
44
-
45
  # State management
 
46
  self.tokens = []
47
  self.buffer_transcription = ""
48
  self.buffer_diarization = ""
@@ -62,6 +65,13 @@ class AudioProcessor:
62
  self.transcription_queue = asyncio.Queue() if self.args.transcription else None
63
  self.diarization_queue = asyncio.Queue() if self.args.diarization else None
64
  self.pcm_buffer = bytearray()
 
 
 
 
 
 
 
65
 
66
  # Initialize transcription engine if enabled
67
  if self.args.transcription:
@@ -210,7 +220,7 @@ class AudioProcessor:
210
  self.last_ffmpeg_activity = time()
211
 
212
  if not chunk:
213
- logger.info("FFmpeg stdout closed.")
214
  break
215
 
216
  self.pcm_buffer.extend(chunk)
@@ -245,6 +255,15 @@ class AudioProcessor:
245
  logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
246
  logger.warning(f"Traceback: {traceback.format_exc()}")
247
  break
 
 
 
 
 
 
 
 
 
248
 
249
  async def transcription_processor(self):
250
  """Process audio chunks for transcription."""
@@ -254,7 +273,16 @@ class AudioProcessor:
254
  while True:
255
  try:
256
  pcm_array = await self.transcription_queue.get()
 
 
 
 
257
 
 
 
 
 
 
258
  logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.")
259
 
260
  # Process transcription
@@ -278,12 +306,15 @@ class AudioProcessor:
278
  await self.update_transcription(
279
  new_tokens, buffer, end_buffer, self.full_transcription, self.sep
280
  )
 
281
 
282
  except Exception as e:
283
  logger.warning(f"Exception in transcription_processor: {e}")
284
  logger.warning(f"Traceback: {traceback.format_exc()}")
285
- finally:
286
- self.transcription_queue.task_done()
 
 
287
 
288
  async def diarization_processor(self, diarization_obj):
289
  """Process audio chunks for speaker diarization."""
@@ -292,6 +323,10 @@ class AudioProcessor:
292
  while True:
293
  try:
294
  pcm_array = await self.diarization_queue.get()
 
 
 
 
295
 
296
  # Process diarization
297
  await diarization_obj.diarize(pcm_array)
@@ -303,12 +338,15 @@ class AudioProcessor:
303
  )
304
 
305
  await self.update_diarization(new_end, buffer_diarization)
 
306
 
307
  except Exception as e:
308
  logger.warning(f"Exception in diarization_processor: {e}")
309
  logger.warning(f"Traceback: {traceback.format_exc()}")
310
- finally:
311
- self.diarization_queue.task_done()
 
 
312
 
313
  async def results_formatter(self):
314
  """Format processing results for output."""
@@ -398,6 +436,19 @@ class AudioProcessor:
398
  yield response
399
  self.last_response_content = response_content
400
 
 
 
 
 
 
 
 
 
 
 
 
 
 
401
  await asyncio.sleep(0.1) # Avoid overwhelming the client
402
 
403
  except Exception as e:
@@ -407,65 +458,117 @@ class AudioProcessor:
407
 
408
  async def create_tasks(self):
409
  """Create and start processing tasks."""
410
-
411
- tasks = []
 
412
  if self.args.transcription and self.online:
413
- tasks.append(asyncio.create_task(self.transcription_processor()))
 
 
414
 
415
  if self.args.diarization and self.diarization:
416
- tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
417
-
418
- tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader()))
419
 
420
- # Monitor overall system health
421
- async def watchdog():
422
- while True:
423
- try:
424
- await asyncio.sleep(10) # Check every 10 seconds instead of 60
425
-
426
- current_time = time()
427
- # Check for stalled tasks
428
- for i, task in enumerate(tasks):
429
- if task.done():
430
- exc = task.exception() if task.done() else None
431
- task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
432
- logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
433
-
434
- # Check for FFmpeg process health with shorter thresholds
435
- ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
436
- if ffmpeg_idle_time > 15: # 15 seconds instead of 180
437
- logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
438
-
439
- # Force restart after 30 seconds of inactivity (instead of 600)
440
- if ffmpeg_idle_time > 30:
441
- logger.error("FFmpeg idle for too long, forcing restart")
442
- await self.restart_ffmpeg()
443
-
444
- except Exception as e:
445
- logger.error(f"Error in watchdog task: {e}")
446
 
447
- tasks.append(asyncio.create_task(watchdog()))
448
- self.tasks = tasks
 
449
 
450
  return self.results_formatter()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
451
 
452
  async def cleanup(self):
453
  """Clean up resources when processing is complete."""
454
- for task in self.tasks:
455
- task.cancel()
456
-
457
- try:
458
- await asyncio.gather(*self.tasks, return_exceptions=True)
459
- self.ffmpeg_process.stdin.close()
460
- self.ffmpeg_process.wait()
461
- except Exception as e:
462
- logger.warning(f"Error during cleanup: {e}")
 
 
 
 
 
 
 
463
 
464
- if self.args.diarization and hasattr(self, 'diarization'):
 
 
 
 
 
 
 
 
 
 
 
 
465
  self.diarization.close()
 
 
466
 
467
  async def process_audio(self, message):
468
  """Process incoming audio data."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
469
  retry_count = 0
470
  max_retries = 3
471
 
@@ -517,4 +620,4 @@ class AudioProcessor:
517
  else:
518
  logger.error("Maximum retries reached for FFmpeg process")
519
  await self.restart_ffmpeg()
520
- return
 
15
  logger = logging.getLogger(__name__)
16
  logger.setLevel(logging.DEBUG)
17
 
18
+ SENTINEL = object() # unique sentinel object for end of stream marker
19
+
20
  def format_time(seconds: float) -> str:
21
  """Format seconds as HH:MM:SS."""
22
  return str(timedelta(seconds=int(seconds)))
 
43
  self.last_ffmpeg_activity = time()
44
  self.ffmpeg_health_check_interval = 5
45
  self.ffmpeg_max_idle_time = 10
46
+
47
  # State management
48
+ self.is_stopping = False
49
  self.tokens = []
50
  self.buffer_transcription = ""
51
  self.buffer_diarization = ""
 
65
  self.transcription_queue = asyncio.Queue() if self.args.transcription else None
66
  self.diarization_queue = asyncio.Queue() if self.args.diarization else None
67
  self.pcm_buffer = bytearray()
68
+
69
+ # Task references
70
+ self.transcription_task = None
71
+ self.diarization_task = None
72
+ self.ffmpeg_reader_task = None
73
+ self.watchdog_task = None
74
+ self.all_tasks_for_cleanup = []
75
 
76
  # Initialize transcription engine if enabled
77
  if self.args.transcription:
 
220
  self.last_ffmpeg_activity = time()
221
 
222
  if not chunk:
223
+ logger.info("FFmpeg stdout closed, no more data to read.")
224
  break
225
 
226
  self.pcm_buffer.extend(chunk)
 
255
  logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
256
  logger.warning(f"Traceback: {traceback.format_exc()}")
257
  break
258
+
259
+ logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
260
+ if self.args.transcription and self.transcription_queue:
261
+ await self.transcription_queue.put(SENTINEL)
262
+ logger.debug("Sentinel put into transcription_queue.")
263
+ if self.args.diarization and self.diarization_queue:
264
+ await self.diarization_queue.put(SENTINEL)
265
+ logger.debug("Sentinel put into diarization_queue.")
266
+
267
 
268
  async def transcription_processor(self):
269
  """Process audio chunks for transcription."""
 
273
  while True:
274
  try:
275
  pcm_array = await self.transcription_queue.get()
276
+ if pcm_array is SENTINEL:
277
+ logger.debug("Transcription processor received sentinel. Finishing.")
278
+ self.transcription_queue.task_done()
279
+ break
280
 
281
+ if not self.online: # Should not happen if queue is used
282
+ logger.warning("Transcription processor: self.online not initialized.")
283
+ self.transcription_queue.task_done()
284
+ continue
285
+
286
  logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.")
287
 
288
  # Process transcription
 
306
  await self.update_transcription(
307
  new_tokens, buffer, end_buffer, self.full_transcription, self.sep
308
  )
309
+ self.transcription_queue.task_done()
310
 
311
  except Exception as e:
312
  logger.warning(f"Exception in transcription_processor: {e}")
313
  logger.warning(f"Traceback: {traceback.format_exc()}")
314
+ if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
315
+ self.transcription_queue.task_done()
316
+ logger.info("Transcription processor task finished.")
317
+
318
 
319
  async def diarization_processor(self, diarization_obj):
320
  """Process audio chunks for speaker diarization."""
 
323
  while True:
324
  try:
325
  pcm_array = await self.diarization_queue.get()
326
+ if pcm_array is SENTINEL:
327
+ logger.debug("Diarization processor received sentinel. Finishing.")
328
+ self.diarization_queue.task_done()
329
+ break
330
 
331
  # Process diarization
332
  await diarization_obj.diarize(pcm_array)
 
338
  )
339
 
340
  await self.update_diarization(new_end, buffer_diarization)
341
+ self.diarization_queue.task_done()
342
 
343
  except Exception as e:
344
  logger.warning(f"Exception in diarization_processor: {e}")
345
  logger.warning(f"Traceback: {traceback.format_exc()}")
346
+ if 'pcm_array' in locals() and pcm_array is not SENTINEL:
347
+ self.diarization_queue.task_done()
348
+ logger.info("Diarization processor task finished.")
349
+
350
 
351
  async def results_formatter(self):
352
  """Format processing results for output."""
 
436
  yield response
437
  self.last_response_content = response_content
438
 
439
+ # Check for termination condition
440
+ if self.is_stopping:
441
+ all_processors_done = True
442
+ if self.args.transcription and self.transcription_task and not self.transcription_task.done():
443
+ all_processors_done = False
444
+ if self.args.diarization and self.diarization_task and not self.diarization_task.done():
445
+ all_processors_done = False
446
+
447
+ if all_processors_done:
448
+ logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
449
+ final_state = await self.get_current_state()
450
+ return
451
+
452
  await asyncio.sleep(0.1) # Avoid overwhelming the client
453
 
454
  except Exception as e:
 
458
 
459
  async def create_tasks(self):
460
  """Create and start processing tasks."""
461
+ self.all_tasks_for_cleanup = []
462
+ processing_tasks_for_watchdog = []
463
+
464
  if self.args.transcription and self.online:
465
+ self.transcription_task = asyncio.create_task(self.transcription_processor())
466
+ self.all_tasks_for_cleanup.append(self.transcription_task)
467
+ processing_tasks_for_watchdog.append(self.transcription_task)
468
 
469
  if self.args.diarization and self.diarization:
470
+ self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
471
+ self.all_tasks_for_cleanup.append(self.diarization_task)
472
+ processing_tasks_for_watchdog.append(self.diarization_task)
473
 
474
+ self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
475
+ self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
476
+ processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
477
 
478
+ # Monitor overall system health
479
+ self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
480
+ self.all_tasks_for_cleanup.append(self.watchdog_task)
481
 
482
  return self.results_formatter()
483
+
484
+ async def watchdog(self, tasks_to_monitor):
485
+ """Monitors the health of critical processing tasks."""
486
+ while True:
487
+ try:
488
+ await asyncio.sleep(10)
489
+ current_time = time()
490
+
491
+ for i, task in enumerate(tasks_to_monitor):
492
+ if task.done():
493
+ exc = task.exception()
494
+ task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}"
495
+ if exc:
496
+ logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
497
+ else:
498
+ logger.info(f"{task_name} completed normally.")
499
+
500
+ ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
501
+ if ffmpeg_idle_time > 15:
502
+ logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.")
503
+ if ffmpeg_idle_time > 30 and not self.is_stopping:
504
+ logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.")
505
+ await self.restart_ffmpeg()
506
+ except asyncio.CancelledError:
507
+ logger.info("Watchdog task cancelled.")
508
+ break
509
+ except Exception as e:
510
+ logger.error(f"Error in watchdog task: {e}", exc_info=True)
511
 
512
  async def cleanup(self):
513
  """Clean up resources when processing is complete."""
514
+ logger.info("Starting cleanup of AudioProcessor resources.")
515
+ for task in self.all_tasks_for_cleanup:
516
+ if task and not task.done():
517
+ task.cancel()
518
+
519
+ created_tasks = [t for t in self.all_tasks_for_cleanup if t]
520
+ if created_tasks:
521
+ await asyncio.gather(*created_tasks, return_exceptions=True)
522
+ logger.info("All processing tasks cancelled or finished.")
523
+
524
+ if self.ffmpeg_process:
525
+ if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
526
+ try:
527
+ self.ffmpeg_process.stdin.close()
528
+ except Exception as e:
529
+ logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}")
530
 
531
+ # Wait for ffmpeg process to terminate
532
+ if self.ffmpeg_process.poll() is None: # Check if process is still running
533
+ logger.info("Waiting for FFmpeg process to terminate...")
534
+ try:
535
+ # Run wait in executor to avoid blocking async loop
536
+ await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout
537
+ except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor
538
+ logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}")
539
+ self.ffmpeg_process.kill()
540
+ await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill
541
+ logger.info("FFmpeg process terminated.")
542
+
543
+ if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
544
  self.diarization.close()
545
+ logger.info("AudioProcessor cleanup complete.")
546
+
547
 
548
  async def process_audio(self, message):
549
  """Process incoming audio data."""
550
+ # If already stopping or stdin is closed, ignore further audio, especially residual chunks.
551
+ if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
552
+ logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).")
553
+ if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
554
+ logger.info("Received empty message while already in stopping state; ensuring stdin is closed.")
555
+ try:
556
+ self.ffmpeg_process.stdin.close()
557
+ except Exception as e:
558
+ logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}")
559
+ return
560
+
561
+ if not message: # primary signal to start stopping
562
+ logger.info("Empty audio message received, initiating stop sequence.")
563
+ self.is_stopping = True
564
+ if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
565
+ try:
566
+ self.ffmpeg_process.stdin.close()
567
+ logger.info("FFmpeg stdin closed due to primary stop signal.")
568
+ except Exception as e:
569
+ logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
570
+ return
571
+
572
  retry_count = 0
573
  max_retries = 3
574
 
 
620
  else:
621
  logger.error("Maximum retries reached for FFmpeg process")
622
  await self.restart_ffmpeg()
623
+ return