qfuxa commited on
Commit
333d52d
·
1 Parent(s): d2e410d

Logs for https://github.com/QuentinFuxa/WhisperLiveKit/issues/110 https://github.com/QuentinFuxa/WhisperLiveKit/issues/106

Browse files

https://github.com/QuentinFuxa/WhisperLiveKit/issues/90
https://github.com/QuentinFuxa/WhisperLiveKit/issues/87
https://github.com/QuentinFuxa/WhisperLiveKit/issues/81
https://github.com/QuentinFuxa/WhisperLiveKit/issues/2

Files changed (1) hide show
  1. whisperlivekit/audio_processor.py +121 -19
whisperlivekit/audio_processor.py CHANGED
@@ -6,7 +6,6 @@ import math
6
  import logging
7
  import traceback
8
  from datetime import timedelta
9
- from typing import List, Dict, Any
10
  from whisperlivekit.timed_objects import ASRToken
11
  from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory
12
  from whisperlivekit.core import WhisperLiveKit
@@ -39,7 +38,10 @@ class AudioProcessor:
39
  self.bytes_per_sample = 2
40
  self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample
41
  self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz
42
-
 
 
 
43
  # State management
44
  self.tokens = []
45
  self.buffer_transcription = ""
@@ -78,14 +80,50 @@ class AudioProcessor:
78
 
79
  async def restart_ffmpeg(self):
80
  """Restart the FFmpeg process after failure."""
 
 
81
  if self.ffmpeg_process:
82
  try:
83
- self.ffmpeg_process.kill()
84
- await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  except Exception as e:
86
- logger.warning(f"Error killing FFmpeg process: {e}")
 
 
 
 
 
87
  self.ffmpeg_process = self.start_ffmpeg_decoder()
88
  self.pcm_buffer = bytearray()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep):
91
  """Thread-safe update of transcription with new data."""
@@ -154,21 +192,33 @@ class AudioProcessor:
154
 
155
  while True:
156
  try:
157
- # Calculate buffer size based on elapsed time
158
- elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec
159
  buffer_size = max(int(32000 * elapsed_time), 4096)
160
- beg = time()
161
 
162
- # Read chunk with timeout
 
 
 
 
 
 
 
 
163
  try:
164
  chunk = await asyncio.wait_for(
165
  loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size),
166
- timeout=15.0
167
  )
 
 
 
168
  except asyncio.TimeoutError:
169
  logger.warning("FFmpeg read timeout. Restarting...")
170
  await self.restart_ffmpeg()
171
  beg = time()
 
172
  continue
173
 
174
  if not chunk:
@@ -366,7 +416,7 @@ class AudioProcessor:
366
  logger.warning(f"Exception in results_formatter: {e}")
367
  logger.warning(f"Traceback: {traceback.format_exc()}")
368
  await asyncio.sleep(0.5) # Back off on error
369
-
370
  async def create_tasks(self):
371
  """Create and start processing tasks."""
372
 
@@ -378,6 +428,35 @@ class AudioProcessor:
378
  tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
379
 
380
  tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader()))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
381
  self.tasks = tasks
382
 
383
  return self.results_formatter()
@@ -399,11 +478,34 @@ class AudioProcessor:
399
 
400
  async def process_audio(self, message):
401
  """Process incoming audio data."""
402
- try:
403
- self.ffmpeg_process.stdin.write(message)
404
- self.ffmpeg_process.stdin.flush()
405
- except (BrokenPipeError, AttributeError) as e:
406
- logger.warning(f"Error writing to FFmpeg: {e}. Restarting...")
407
- await self.restart_ffmpeg()
408
- self.ffmpeg_process.stdin.write(message)
409
- self.ffmpeg_process.stdin.flush()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  import logging
7
  import traceback
8
  from datetime import timedelta
 
9
  from whisperlivekit.timed_objects import ASRToken
10
  from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory
11
  from whisperlivekit.core import WhisperLiveKit
 
38
  self.bytes_per_sample = 2
39
  self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample
40
  self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz
41
+ self.last_ffmpeg_activity = time()
42
+ self.ffmpeg_health_check_interval = 5
43
+ self.ffmpeg_max_idle_time = 10
44
+
45
  # State management
46
  self.tokens = []
47
  self.buffer_transcription = ""
 
80
 
81
  async def restart_ffmpeg(self):
82
  """Restart the FFmpeg process after failure."""
83
+ logger.warning("Restarting FFmpeg process...")
84
+
85
  if self.ffmpeg_process:
86
  try:
87
+ # we check if process is still running
88
+ if self.ffmpeg_process.poll() is None:
89
+ logger.info("Terminating existing FFmpeg process")
90
+ self.ffmpeg_process.stdin.close()
91
+ self.ffmpeg_process.terminate()
92
+
93
+ # wait for termination with timeout
94
+ try:
95
+ await asyncio.wait_for(
96
+ asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait),
97
+ timeout=5.0
98
+ )
99
+ except asyncio.TimeoutError:
100
+ logger.warning("FFmpeg process did not terminate, killing forcefully")
101
+ self.ffmpeg_process.kill()
102
+ await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
103
  except Exception as e:
104
+ logger.error(f"Error during FFmpeg process termination: {e}")
105
+ logger.error(traceback.format_exc())
106
+
107
+ # we start new process
108
+ try:
109
+ logger.info("Starting new FFmpeg process")
110
  self.ffmpeg_process = self.start_ffmpeg_decoder()
111
  self.pcm_buffer = bytearray()
112
+ self.last_ffmpeg_activity = time()
113
+ logger.info("FFmpeg process restarted successfully")
114
+ except Exception as e:
115
+ logger.error(f"Failed to restart FFmpeg process: {e}")
116
+ logger.error(traceback.format_exc())
117
+ # try again after 5s
118
+ await asyncio.sleep(5)
119
+ try:
120
+ self.ffmpeg_process = self.start_ffmpeg_decoder()
121
+ self.pcm_buffer = bytearray()
122
+ self.last_ffmpeg_activity = time()
123
+ logger.info("FFmpeg process restarted successfully on second attempt")
124
+ except Exception as e2:
125
+ logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}")
126
+ logger.critical(traceback.format_exc())
127
 
128
  async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep):
129
  """Thread-safe update of transcription with new data."""
 
192
 
193
  while True:
194
  try:
195
+ current_time = time()
196
+ elapsed_time = math.floor((current_time - beg) * 10) / 10
197
  buffer_size = max(int(32000 * elapsed_time), 4096)
198
+ beg = current_time
199
 
200
+ # Detect idle state much more quickly
201
+ if current_time - self.last_ffmpeg_activity > self.ffmpeg_max_idle_time:
202
+ logger.warning(f"FFmpeg process idle for {current_time - self.last_ffmpeg_activity:.2f}s. Restarting...")
203
+ await self.restart_ffmpeg()
204
+ beg = time()
205
+ self.last_ffmpeg_activity = time()
206
+ continue
207
+
208
+ # Reduce timeout for reading from FFmpeg
209
  try:
210
  chunk = await asyncio.wait_for(
211
  loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size),
212
+ timeout=5.0 # Shorter timeout (5 seconds instead of 15)
213
  )
214
+ if chunk:
215
+ self.last_ffmpeg_activity = time()
216
+
217
  except asyncio.TimeoutError:
218
  logger.warning("FFmpeg read timeout. Restarting...")
219
  await self.restart_ffmpeg()
220
  beg = time()
221
+ self.last_ffmpeg_activity = time()
222
  continue
223
 
224
  if not chunk:
 
416
  logger.warning(f"Exception in results_formatter: {e}")
417
  logger.warning(f"Traceback: {traceback.format_exc()}")
418
  await asyncio.sleep(0.5) # Back off on error
419
+
420
  async def create_tasks(self):
421
  """Create and start processing tasks."""
422
 
 
428
  tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
429
 
430
  tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader()))
431
+
432
+ # Monitor overall system health
433
+ async def watchdog():
434
+ while True:
435
+ try:
436
+ await asyncio.sleep(10) # Check every 10 seconds instead of 60
437
+
438
+ current_time = time()
439
+ # Check for stalled tasks
440
+ for i, task in enumerate(tasks):
441
+ if task.done():
442
+ exc = task.exception() if task.done() else None
443
+ task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
444
+ logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
445
+
446
+ # Check for FFmpeg process health with shorter thresholds
447
+ ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
448
+ if ffmpeg_idle_time > 15: # 15 seconds instead of 180
449
+ logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
450
+
451
+ # Force restart after 30 seconds of inactivity (instead of 600)
452
+ if ffmpeg_idle_time > 30:
453
+ logger.error("FFmpeg idle for too long, forcing restart")
454
+ await self.restart_ffmpeg()
455
+
456
+ except Exception as e:
457
+ logger.error(f"Error in watchdog task: {e}")
458
+
459
+ tasks.append(asyncio.create_task(watchdog()))
460
  self.tasks = tasks
461
 
462
  return self.results_formatter()
 
478
 
479
  async def process_audio(self, message):
480
  """Process incoming audio data."""
481
+ retry_count = 0
482
+ max_retries = 3
483
+
484
+ # Log periodic heartbeats showing ongoing audio proc
485
+ current_time = time()
486
+ if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10:
487
+ logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago")
488
+ self._last_heartbeat = current_time
489
+
490
+ while retry_count < max_retries:
491
+ try:
492
+ if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
493
+ logger.warning("FFmpeg process not available, restarting...")
494
+ await self.restart_ffmpeg()
495
+
496
+ self.ffmpeg_process.stdin.write(message)
497
+ self.ffmpeg_process.stdin.flush()
498
+ self.last_ffmpeg_activity = time() # Update activity timestamp
499
+ return
500
+
501
+ except (BrokenPipeError, AttributeError, OSError) as e:
502
+ retry_count += 1
503
+ logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
504
+
505
+ if retry_count < max_retries:
506
+ await self.restart_ffmpeg()
507
+ await asyncio.sleep(0.5) # Shorter pause between retries
508
+ else:
509
+ logger.error("Maximum retries reached for FFmpeg process")
510
+ await self.restart_ffmpeg()
511
+ return