|
import asyncio |
|
import numpy as np |
|
import ffmpeg |
|
from time import time, sleep |
|
import math |
|
import logging |
|
import traceback |
|
from datetime import timedelta |
|
from whisperlivekit.timed_objects import ASRToken |
|
from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory |
|
from whisperlivekit.core import TranscriptionEngine |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.DEBUG) |
|
|
|
SENTINEL = object() |
|
|
|
def format_time(seconds: float) -> str: |
|
"""Format seconds as HH:MM:SS.""" |
|
return str(timedelta(seconds=int(seconds))) |
|
|
|
class AudioProcessor: |
|
""" |
|
Processes audio streams for transcription and diarization. |
|
Handles audio processing, state management, and result formatting. |
|
""" |
|
|
|
def __init__(self, **kwargs): |
|
"""Initialize the audio processor with configuration, models, and state.""" |
|
|
|
if 'transcription_engine' in kwargs and isinstance(kwargs['transcription_engine'], TranscriptionEngine): |
|
models = kwargs['transcription_engine'] |
|
else: |
|
models = TranscriptionEngine(**kwargs) |
|
|
|
|
|
self.args = models.args |
|
self.sample_rate = 16000 |
|
self.channels = 1 |
|
self.samples_per_sec = int(self.sample_rate * self.args.min_chunk_size) |
|
self.bytes_per_sample = 2 |
|
self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample |
|
self.max_bytes_per_sec = 32000 * 5 |
|
self.last_ffmpeg_activity = time() |
|
self.ffmpeg_health_check_interval = 5 |
|
self.ffmpeg_max_idle_time = 10 |
|
|
|
|
|
self.is_stopping = False |
|
self.tokens = [] |
|
self.buffer_transcription = "" |
|
self.buffer_diarization = "" |
|
self.full_transcription = "" |
|
self.end_buffer = 0 |
|
self.end_attributed_speaker = 0 |
|
self.lock = asyncio.Lock() |
|
self.beg_loop = time() |
|
self.sep = " " |
|
self.last_response_content = "" |
|
|
|
|
|
self.asr = models.asr |
|
self.tokenizer = models.tokenizer |
|
self.diarization = models.diarization |
|
self.ffmpeg_process = self.start_ffmpeg_decoder() |
|
self.transcription_queue = asyncio.Queue() if self.args.transcription else None |
|
self.diarization_queue = asyncio.Queue() if self.args.diarization else None |
|
self.pcm_buffer = bytearray() |
|
|
|
|
|
self.transcription_task = None |
|
self.diarization_task = None |
|
self.ffmpeg_reader_task = None |
|
self.watchdog_task = None |
|
self.all_tasks_for_cleanup = [] |
|
|
|
|
|
if self.args.transcription: |
|
self.online = online_factory(self.args, models.asr, models.tokenizer) |
|
|
|
def convert_pcm_to_float(self, pcm_buffer): |
|
"""Convert PCM buffer in s16le format to normalized NumPy array.""" |
|
return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0 |
|
|
|
def start_ffmpeg_decoder(self): |
|
"""Start FFmpeg process for WebM to PCM conversion.""" |
|
try: |
|
return (ffmpeg.input("pipe:0", format="webm") |
|
.output("pipe:1", format="s16le", acodec="pcm_s16le", |
|
ac=self.channels, ar=str(self.sample_rate)) |
|
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)) |
|
except FileNotFoundError: |
|
error = """ |
|
FFmpeg is not installed or not found in your system's PATH. |
|
Please install FFmpeg to enable audio processing. |
|
|
|
Installation instructions: |
|
|
|
# Ubuntu/Debian: |
|
sudo apt update && sudo apt install ffmpeg |
|
|
|
# macOS (using Homebrew): |
|
brew install ffmpeg |
|
|
|
# Windows: |
|
# 1. Download the latest static build from https://ffmpeg.org/download.html |
|
# 2. Extract the archive (e.g., to C:\\FFmpeg). |
|
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable. |
|
|
|
After installation, please restart the application. |
|
""" |
|
logger.error(error) |
|
raise FileNotFoundError(error) |
|
|
|
async def restart_ffmpeg(self): |
|
"""Restart the FFmpeg process after failure.""" |
|
logger.warning("Restarting FFmpeg process...") |
|
|
|
if self.ffmpeg_process: |
|
try: |
|
|
|
if self.ffmpeg_process.poll() is None: |
|
logger.info("Terminating existing FFmpeg process") |
|
self.ffmpeg_process.stdin.close() |
|
self.ffmpeg_process.terminate() |
|
|
|
|
|
try: |
|
await asyncio.wait_for( |
|
asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait), |
|
timeout=5.0 |
|
) |
|
except asyncio.TimeoutError: |
|
logger.warning("FFmpeg process did not terminate, killing forcefully") |
|
self.ffmpeg_process.kill() |
|
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) |
|
except Exception as e: |
|
logger.error(f"Error during FFmpeg process termination: {e}") |
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
try: |
|
logger.info("Starting new FFmpeg process") |
|
self.ffmpeg_process = self.start_ffmpeg_decoder() |
|
self.pcm_buffer = bytearray() |
|
self.last_ffmpeg_activity = time() |
|
logger.info("FFmpeg process restarted successfully") |
|
except Exception as e: |
|
logger.error(f"Failed to restart FFmpeg process: {e}") |
|
logger.error(traceback.format_exc()) |
|
|
|
await asyncio.sleep(5) |
|
try: |
|
self.ffmpeg_process = self.start_ffmpeg_decoder() |
|
self.pcm_buffer = bytearray() |
|
self.last_ffmpeg_activity = time() |
|
logger.info("FFmpeg process restarted successfully on second attempt") |
|
except Exception as e2: |
|
logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}") |
|
logger.critical(traceback.format_exc()) |
|
|
|
async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep): |
|
"""Thread-safe update of transcription with new data.""" |
|
async with self.lock: |
|
self.tokens.extend(new_tokens) |
|
self.buffer_transcription = buffer |
|
self.end_buffer = end_buffer |
|
self.full_transcription = full_transcription |
|
self.sep = sep |
|
|
|
async def update_diarization(self, end_attributed_speaker, buffer_diarization=""): |
|
"""Thread-safe update of diarization with new data.""" |
|
async with self.lock: |
|
self.end_attributed_speaker = end_attributed_speaker |
|
if buffer_diarization: |
|
self.buffer_diarization = buffer_diarization |
|
|
|
async def add_dummy_token(self): |
|
"""Placeholder token when no transcription is available.""" |
|
async with self.lock: |
|
current_time = time() - self.beg_loop |
|
self.tokens.append(ASRToken( |
|
start=current_time, end=current_time + 1, |
|
text=".", speaker=-1, is_dummy=True |
|
)) |
|
|
|
async def get_current_state(self): |
|
"""Get current state.""" |
|
async with self.lock: |
|
current_time = time() |
|
|
|
|
|
remaining_transcription = 0 |
|
if self.end_buffer > 0: |
|
remaining_transcription = max(0, round(current_time - self.beg_loop - self.end_buffer, 2)) |
|
|
|
remaining_diarization = 0 |
|
if self.tokens: |
|
latest_end = max(self.end_buffer, self.tokens[-1].end if self.tokens else 0) |
|
remaining_diarization = max(0, round(latest_end - self.end_attributed_speaker, 2)) |
|
|
|
return { |
|
"tokens": self.tokens.copy(), |
|
"buffer_transcription": self.buffer_transcription, |
|
"buffer_diarization": self.buffer_diarization, |
|
"end_buffer": self.end_buffer, |
|
"end_attributed_speaker": self.end_attributed_speaker, |
|
"sep": self.sep, |
|
"remaining_time_transcription": remaining_transcription, |
|
"remaining_time_diarization": remaining_diarization |
|
} |
|
|
|
async def reset(self): |
|
"""Reset all state variables to initial values.""" |
|
async with self.lock: |
|
self.tokens = [] |
|
self.buffer_transcription = self.buffer_diarization = "" |
|
self.end_buffer = self.end_attributed_speaker = 0 |
|
self.full_transcription = self.last_response_content = "" |
|
self.beg_loop = time() |
|
|
|
async def ffmpeg_stdout_reader(self): |
|
"""Read audio data from FFmpeg stdout and process it.""" |
|
loop = asyncio.get_event_loop() |
|
beg = time() |
|
|
|
while True: |
|
try: |
|
current_time = time() |
|
elapsed_time = math.floor((current_time - beg) * 10) / 10 |
|
buffer_size = max(int(32000 * elapsed_time), 4096) |
|
beg = current_time |
|
|
|
|
|
if current_time - self.last_ffmpeg_activity > self.ffmpeg_max_idle_time: |
|
logger.warning(f"FFmpeg process idle for {current_time - self.last_ffmpeg_activity:.2f}s. Restarting...") |
|
await self.restart_ffmpeg() |
|
beg = time() |
|
self.last_ffmpeg_activity = time() |
|
continue |
|
|
|
chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size) |
|
if chunk: |
|
self.last_ffmpeg_activity = time() |
|
|
|
if not chunk: |
|
logger.info("FFmpeg stdout closed, no more data to read.") |
|
break |
|
|
|
self.pcm_buffer.extend(chunk) |
|
|
|
|
|
if self.args.diarization and self.diarization_queue: |
|
await self.diarization_queue.put( |
|
self.convert_pcm_to_float(self.pcm_buffer).copy() |
|
) |
|
|
|
|
|
if len(self.pcm_buffer) >= self.bytes_per_sec: |
|
if len(self.pcm_buffer) > self.max_bytes_per_sec: |
|
logger.warning( |
|
f"Audio buffer too large: {len(self.pcm_buffer) / self.bytes_per_sec:.2f}s. " |
|
f"Consider using a smaller model." |
|
) |
|
|
|
|
|
pcm_array = self.convert_pcm_to_float(self.pcm_buffer[:self.max_bytes_per_sec]) |
|
self.pcm_buffer = self.pcm_buffer[self.max_bytes_per_sec:] |
|
|
|
|
|
if self.args.transcription and self.transcription_queue: |
|
await self.transcription_queue.put(pcm_array.copy()) |
|
|
|
|
|
if not self.args.transcription and not self.args.diarization: |
|
await asyncio.sleep(0.1) |
|
|
|
except Exception as e: |
|
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}") |
|
logger.warning(f"Traceback: {traceback.format_exc()}") |
|
break |
|
|
|
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.") |
|
if self.args.transcription and self.transcription_queue: |
|
await self.transcription_queue.put(SENTINEL) |
|
logger.debug("Sentinel put into transcription_queue.") |
|
if self.args.diarization and self.diarization_queue: |
|
await self.diarization_queue.put(SENTINEL) |
|
logger.debug("Sentinel put into diarization_queue.") |
|
|
|
|
|
async def transcription_processor(self): |
|
"""Process audio chunks for transcription.""" |
|
self.full_transcription = "" |
|
self.sep = self.online.asr.sep |
|
cumulative_pcm_duration_stream_time = 0.0 |
|
|
|
while True: |
|
try: |
|
pcm_array = await self.transcription_queue.get() |
|
if pcm_array is SENTINEL: |
|
logger.debug("Transcription processor received sentinel. Finishing.") |
|
self.transcription_queue.task_done() |
|
break |
|
|
|
if not self.online: |
|
logger.warning("Transcription processor: self.online not initialized.") |
|
self.transcription_queue.task_done() |
|
continue |
|
|
|
asr_internal_buffer_duration_s = len(self.online.audio_buffer) / self.online.SAMPLING_RATE |
|
transcription_lag_s = max(0.0, time() - self.beg_loop - self.end_buffer) |
|
|
|
logger.info( |
|
f"ASR processing: internal_buffer={asr_internal_buffer_duration_s:.2f}s, " |
|
f"lag={transcription_lag_s:.2f}s." |
|
) |
|
|
|
|
|
duration_this_chunk = len(pcm_array) / self.sample_rate if isinstance(pcm_array, np.ndarray) else 0 |
|
cumulative_pcm_duration_stream_time += duration_this_chunk |
|
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time |
|
|
|
self.online.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm) |
|
new_tokens, current_audio_processed_upto = self.online.process_iter() |
|
|
|
if new_tokens: |
|
self.full_transcription += self.sep.join([t.text for t in new_tokens]) |
|
|
|
|
|
_buffer_transcript_obj = self.online.get_buffer() |
|
buffer_text = _buffer_transcript_obj.text |
|
|
|
candidate_end_times = [self.end_buffer] |
|
|
|
if new_tokens: |
|
candidate_end_times.append(new_tokens[-1].end) |
|
|
|
if _buffer_transcript_obj.end is not None: |
|
candidate_end_times.append(_buffer_transcript_obj.end) |
|
|
|
candidate_end_times.append(current_audio_processed_upto) |
|
|
|
new_end_buffer = max(candidate_end_times) |
|
|
|
|
|
if buffer_text in self.full_transcription: |
|
buffer_text = "" |
|
|
|
await self.update_transcription( |
|
new_tokens, buffer_text, new_end_buffer, self.full_transcription, self.sep |
|
) |
|
self.transcription_queue.task_done() |
|
|
|
except Exception as e: |
|
logger.warning(f"Exception in transcription_processor: {e}") |
|
logger.warning(f"Traceback: {traceback.format_exc()}") |
|
if 'pcm_array' in locals() and pcm_array is not SENTINEL : |
|
self.transcription_queue.task_done() |
|
logger.info("Transcription processor task finished.") |
|
|
|
|
|
async def diarization_processor(self, diarization_obj): |
|
"""Process audio chunks for speaker diarization.""" |
|
buffer_diarization = "" |
|
|
|
while True: |
|
try: |
|
pcm_array = await self.diarization_queue.get() |
|
if pcm_array is SENTINEL: |
|
logger.debug("Diarization processor received sentinel. Finishing.") |
|
self.diarization_queue.task_done() |
|
break |
|
|
|
|
|
await diarization_obj.diarize(pcm_array) |
|
|
|
|
|
state = await self.get_current_state() |
|
new_end = diarization_obj.assign_speakers_to_tokens( |
|
state["end_attributed_speaker"], state["tokens"] |
|
) |
|
|
|
await self.update_diarization(new_end, buffer_diarization) |
|
self.diarization_queue.task_done() |
|
|
|
except Exception as e: |
|
logger.warning(f"Exception in diarization_processor: {e}") |
|
logger.warning(f"Traceback: {traceback.format_exc()}") |
|
if 'pcm_array' in locals() and pcm_array is not SENTINEL: |
|
self.diarization_queue.task_done() |
|
logger.info("Diarization processor task finished.") |
|
|
|
|
|
async def results_formatter(self): |
|
"""Format processing results for output.""" |
|
while True: |
|
try: |
|
|
|
state = await self.get_current_state() |
|
tokens = state["tokens"] |
|
buffer_transcription = state["buffer_transcription"] |
|
buffer_diarization = state["buffer_diarization"] |
|
end_attributed_speaker = state["end_attributed_speaker"] |
|
sep = state["sep"] |
|
|
|
|
|
if (not tokens or tokens[-1].is_dummy) and not self.args.transcription and self.args.diarization: |
|
await self.add_dummy_token() |
|
sleep(0.5) |
|
state = await self.get_current_state() |
|
tokens = state["tokens"] |
|
|
|
|
|
previous_speaker = -1 |
|
lines = [] |
|
last_end_diarized = 0 |
|
undiarized_text = [] |
|
|
|
|
|
for token in tokens: |
|
speaker = token.speaker |
|
|
|
|
|
if self.args.diarization: |
|
if (speaker in [-1, 0]) and token.end >= end_attributed_speaker: |
|
undiarized_text.append(token.text) |
|
continue |
|
elif (speaker in [-1, 0]) and token.end < end_attributed_speaker: |
|
speaker = previous_speaker |
|
if speaker not in [-1, 0]: |
|
last_end_diarized = max(token.end, last_end_diarized) |
|
|
|
|
|
if speaker != previous_speaker or not lines: |
|
lines.append({ |
|
"speaker": speaker, |
|
"text": token.text, |
|
"beg": format_time(token.start), |
|
"end": format_time(token.end), |
|
"diff": round(token.end - last_end_diarized, 2) |
|
}) |
|
previous_speaker = speaker |
|
elif token.text: |
|
lines[-1]["text"] += sep + token.text |
|
lines[-1]["end"] = format_time(token.end) |
|
lines[-1]["diff"] = round(token.end - last_end_diarized, 2) |
|
|
|
|
|
if undiarized_text: |
|
combined = sep.join(undiarized_text) |
|
if buffer_transcription: |
|
combined += sep |
|
await self.update_diarization(end_attributed_speaker, combined) |
|
buffer_diarization = combined |
|
|
|
response_status = "active_transcription" |
|
final_lines_for_response = lines.copy() |
|
|
|
if not tokens and not buffer_transcription and not buffer_diarization: |
|
response_status = "no_audio_detected" |
|
final_lines_for_response = [] |
|
elif response_status == "active_transcription" and not final_lines_for_response: |
|
final_lines_for_response = [{ |
|
"speaker": 1, |
|
"text": "", |
|
"beg": format_time(state.get("end_buffer", 0)), |
|
"end": format_time(state.get("end_buffer", 0)), |
|
"diff": 0 |
|
}] |
|
|
|
response = { |
|
"status": response_status, |
|
"lines": final_lines_for_response, |
|
"buffer_transcription": buffer_transcription, |
|
"buffer_diarization": buffer_diarization, |
|
"remaining_time_transcription": state["remaining_time_transcription"], |
|
"remaining_time_diarization": state["remaining_time_diarization"] |
|
} |
|
|
|
current_response_signature = f"{response_status} | " + \ |
|
' '.join([f"{line['speaker']} {line['text']}" for line in final_lines_for_response]) + \ |
|
f" | {buffer_transcription} | {buffer_diarization}" |
|
|
|
if current_response_signature != self.last_response_content and \ |
|
(final_lines_for_response or buffer_transcription or buffer_diarization or response_status == "no_audio_detected"): |
|
yield response |
|
self.last_response_content = current_response_signature |
|
|
|
|
|
if self.is_stopping: |
|
all_processors_done = True |
|
if self.args.transcription and self.transcription_task and not self.transcription_task.done(): |
|
all_processors_done = False |
|
if self.args.diarization and self.diarization_task and not self.diarization_task.done(): |
|
all_processors_done = False |
|
|
|
if all_processors_done: |
|
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.") |
|
final_state = await self.get_current_state() |
|
return |
|
|
|
await asyncio.sleep(0.1) |
|
|
|
except Exception as e: |
|
logger.warning(f"Exception in results_formatter: {e}") |
|
logger.warning(f"Traceback: {traceback.format_exc()}") |
|
await asyncio.sleep(0.5) |
|
|
|
async def create_tasks(self): |
|
"""Create and start processing tasks.""" |
|
self.all_tasks_for_cleanup = [] |
|
processing_tasks_for_watchdog = [] |
|
|
|
if self.args.transcription and self.online: |
|
self.transcription_task = asyncio.create_task(self.transcription_processor()) |
|
self.all_tasks_for_cleanup.append(self.transcription_task) |
|
processing_tasks_for_watchdog.append(self.transcription_task) |
|
|
|
if self.args.diarization and self.diarization: |
|
self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization)) |
|
self.all_tasks_for_cleanup.append(self.diarization_task) |
|
processing_tasks_for_watchdog.append(self.diarization_task) |
|
|
|
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader()) |
|
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task) |
|
processing_tasks_for_watchdog.append(self.ffmpeg_reader_task) |
|
|
|
|
|
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog)) |
|
self.all_tasks_for_cleanup.append(self.watchdog_task) |
|
|
|
return self.results_formatter() |
|
|
|
async def watchdog(self, tasks_to_monitor): |
|
"""Monitors the health of critical processing tasks.""" |
|
while True: |
|
try: |
|
await asyncio.sleep(10) |
|
current_time = time() |
|
|
|
for i, task in enumerate(tasks_to_monitor): |
|
if task.done(): |
|
exc = task.exception() |
|
task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}" |
|
if exc: |
|
logger.error(f"{task_name} unexpectedly completed with exception: {exc}") |
|
else: |
|
logger.info(f"{task_name} completed normally.") |
|
|
|
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity |
|
if ffmpeg_idle_time > 15: |
|
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.") |
|
if ffmpeg_idle_time > 30 and not self.is_stopping: |
|
logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.") |
|
await self.restart_ffmpeg() |
|
except asyncio.CancelledError: |
|
logger.info("Watchdog task cancelled.") |
|
break |
|
except Exception as e: |
|
logger.error(f"Error in watchdog task: {e}", exc_info=True) |
|
|
|
async def cleanup(self): |
|
"""Clean up resources when processing is complete.""" |
|
logger.info("Starting cleanup of AudioProcessor resources.") |
|
for task in self.all_tasks_for_cleanup: |
|
if task and not task.done(): |
|
task.cancel() |
|
|
|
created_tasks = [t for t in self.all_tasks_for_cleanup if t] |
|
if created_tasks: |
|
await asyncio.gather(*created_tasks, return_exceptions=True) |
|
logger.info("All processing tasks cancelled or finished.") |
|
|
|
if self.ffmpeg_process: |
|
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: |
|
try: |
|
self.ffmpeg_process.stdin.close() |
|
except Exception as e: |
|
logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}") |
|
|
|
|
|
if self.ffmpeg_process.poll() is None: |
|
logger.info("Waiting for FFmpeg process to terminate...") |
|
try: |
|
|
|
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) |
|
except Exception as e: |
|
logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}") |
|
self.ffmpeg_process.kill() |
|
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) |
|
logger.info("FFmpeg process terminated.") |
|
|
|
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'): |
|
self.diarization.close() |
|
logger.info("AudioProcessor cleanup complete.") |
|
|
|
|
|
async def process_audio(self, message): |
|
"""Process incoming audio data.""" |
|
|
|
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed): |
|
logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).") |
|
if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: |
|
logger.info("Received empty message while already in stopping state; ensuring stdin is closed.") |
|
try: |
|
self.ffmpeg_process.stdin.close() |
|
except Exception as e: |
|
logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}") |
|
return |
|
|
|
if not message: |
|
logger.info("Empty audio message received, initiating stop sequence.") |
|
self.is_stopping = True |
|
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: |
|
try: |
|
self.ffmpeg_process.stdin.close() |
|
logger.info("FFmpeg stdin closed due to primary stop signal.") |
|
except Exception as e: |
|
logger.warning(f"Error closing ffmpeg stdin on stop: {e}") |
|
return |
|
|
|
retry_count = 0 |
|
max_retries = 3 |
|
|
|
|
|
current_time = time() |
|
if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10: |
|
logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago") |
|
self._last_heartbeat = current_time |
|
|
|
while retry_count < max_retries: |
|
try: |
|
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None: |
|
logger.warning("FFmpeg process not available, restarting...") |
|
await self.restart_ffmpeg() |
|
|
|
loop = asyncio.get_running_loop() |
|
try: |
|
await asyncio.wait_for( |
|
loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)), |
|
timeout=2.0 |
|
) |
|
except asyncio.TimeoutError: |
|
logger.warning("FFmpeg write operation timed out, restarting...") |
|
await self.restart_ffmpeg() |
|
retry_count += 1 |
|
continue |
|
|
|
try: |
|
await asyncio.wait_for( |
|
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush), |
|
timeout=2.0 |
|
) |
|
except asyncio.TimeoutError: |
|
logger.warning("FFmpeg flush operation timed out, restarting...") |
|
await self.restart_ffmpeg() |
|
retry_count += 1 |
|
continue |
|
|
|
self.last_ffmpeg_activity = time() |
|
return |
|
|
|
except (BrokenPipeError, AttributeError, OSError) as e: |
|
retry_count += 1 |
|
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...") |
|
|
|
if retry_count < max_retries: |
|
await self.restart_ffmpeg() |
|
await asyncio.sleep(0.5) |
|
else: |
|
logger.error("Maximum retries reached for FFmpeg process") |
|
await self.restart_ffmpeg() |
|
return |
|
|