Solves stdin flushes blocking IOhttps://github.com/QuentinFuxa/WhisperLiveKit/issues/110
Browse fileshttps://github.com/QuentinFuxa/WhisperLiveKit/issues/106
https://github.com/QuentinFuxa/WhisperLiveKit/issues/90
https://github.com/QuentinFuxa/WhisperLiveKit/issues/87
https://github.com/QuentinFuxa/WhisperLiveKit/issues/81
https://github.com/QuentinFuxa/WhisperLiveKit/issues/2
whisperlivekit/audio_processor.py
CHANGED
@@ -205,22 +205,10 @@ class AudioProcessor:
|
|
205 |
self.last_ffmpeg_activity = time()
|
206 |
continue
|
207 |
|
208 |
-
|
209 |
-
|
210 |
-
chunk = await asyncio.wait_for(
|
211 |
-
loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size),
|
212 |
-
timeout=5.0 # Shorter timeout (5 seconds instead of 15)
|
213 |
-
)
|
214 |
-
if chunk:
|
215 |
-
self.last_ffmpeg_activity = time()
|
216 |
-
|
217 |
-
except asyncio.TimeoutError:
|
218 |
-
logger.warning("FFmpeg read timeout. Restarting...")
|
219 |
-
await self.restart_ffmpeg()
|
220 |
-
beg = time()
|
221 |
self.last_ffmpeg_activity = time()
|
222 |
-
|
223 |
-
|
224 |
if not chunk:
|
225 |
logger.info("FFmpeg stdout closed.")
|
226 |
break
|
@@ -233,7 +221,7 @@ class AudioProcessor:
|
|
233 |
self.convert_pcm_to_float(self.pcm_buffer).copy()
|
234 |
)
|
235 |
|
236 |
-
# Process when
|
237 |
if len(self.pcm_buffer) >= self.bytes_per_sec:
|
238 |
if len(self.pcm_buffer) > self.max_bytes_per_sec:
|
239 |
logger.warning(
|
@@ -492,19 +480,40 @@ class AudioProcessor:
|
|
492 |
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
|
493 |
logger.warning("FFmpeg process not available, restarting...")
|
494 |
await self.restart_ffmpeg()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
495 |
|
496 |
-
self.
|
497 |
-
self.ffmpeg_process.stdin.flush()
|
498 |
-
self.last_ffmpeg_activity = time() # Update activity timestamp
|
499 |
return
|
500 |
-
|
501 |
except (BrokenPipeError, AttributeError, OSError) as e:
|
502 |
retry_count += 1
|
503 |
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
|
504 |
|
505 |
if retry_count < max_retries:
|
506 |
await self.restart_ffmpeg()
|
507 |
-
await asyncio.sleep(0.5)
|
508 |
else:
|
509 |
logger.error("Maximum retries reached for FFmpeg process")
|
510 |
await self.restart_ffmpeg()
|
|
|
205 |
self.last_ffmpeg_activity = time()
|
206 |
continue
|
207 |
|
208 |
+
chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size)
|
209 |
+
if chunk:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
210 |
self.last_ffmpeg_activity = time()
|
211 |
+
|
|
|
212 |
if not chunk:
|
213 |
logger.info("FFmpeg stdout closed.")
|
214 |
break
|
|
|
221 |
self.convert_pcm_to_float(self.pcm_buffer).copy()
|
222 |
)
|
223 |
|
224 |
+
# Process when enough data
|
225 |
if len(self.pcm_buffer) >= self.bytes_per_sec:
|
226 |
if len(self.pcm_buffer) > self.max_bytes_per_sec:
|
227 |
logger.warning(
|
|
|
480 |
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
|
481 |
logger.warning("FFmpeg process not available, restarting...")
|
482 |
await self.restart_ffmpeg()
|
483 |
+
|
484 |
+
loop = asyncio.get_running_loop()
|
485 |
+
try:
|
486 |
+
await asyncio.wait_for(
|
487 |
+
loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)),
|
488 |
+
timeout=2.0
|
489 |
+
)
|
490 |
+
except asyncio.TimeoutError:
|
491 |
+
logger.warning("FFmpeg write operation timed out, restarting...")
|
492 |
+
await self.restart_ffmpeg()
|
493 |
+
retry_count += 1
|
494 |
+
continue
|
495 |
+
|
496 |
+
try:
|
497 |
+
await asyncio.wait_for(
|
498 |
+
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush),
|
499 |
+
timeout=2.0
|
500 |
+
)
|
501 |
+
except asyncio.TimeoutError:
|
502 |
+
logger.warning("FFmpeg flush operation timed out, restarting...")
|
503 |
+
await self.restart_ffmpeg()
|
504 |
+
retry_count += 1
|
505 |
+
continue
|
506 |
|
507 |
+
self.last_ffmpeg_activity = time()
|
|
|
|
|
508 |
return
|
509 |
+
|
510 |
except (BrokenPipeError, AttributeError, OSError) as e:
|
511 |
retry_count += 1
|
512 |
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
|
513 |
|
514 |
if retry_count < max_retries:
|
515 |
await self.restart_ffmpeg()
|
516 |
+
await asyncio.sleep(0.5)
|
517 |
else:
|
518 |
logger.error("Maximum retries reached for FFmpeg process")
|
519 |
await self.restart_ffmpeg()
|