Spaces:
Paused
Paused
| """ | |
| Voice Activity Detector for Flare | |
| ================================== | |
| Detects speech/silence in audio streams | |
| """ | |
| import struct | |
| from typing import Tuple | |
| from datetime import datetime | |
| from utils.logger import log_debug, log_warning | |
| class VoiceActivityDetector: | |
| """Detect speech and silence in audio stream""" | |
| def __init__(self, | |
| energy_threshold: float = 500, | |
| silence_threshold_ms: int = 2000, | |
| sample_rate: int = 16000): | |
| """ | |
| Initialize VAD | |
| Args: | |
| energy_threshold: RMS energy threshold for speech detection | |
| silence_threshold_ms: Milliseconds of silence before considering speech ended | |
| sample_rate: Audio sample rate | |
| """ | |
| self.energy_threshold = energy_threshold | |
| self.silence_threshold_ms = silence_threshold_ms | |
| self.sample_rate = sample_rate | |
| # State tracking | |
| self.is_speaking = False | |
| self.silence_start: Optional[datetime] = None | |
| self.speech_start: Optional[datetime] = None | |
| self.last_speech_time: Optional[datetime] = None | |
| # Statistics | |
| self.total_speech_chunks = 0 | |
| self.total_silence_chunks = 0 | |
| def process_chunk(self, audio_chunk: bytes) -> Tuple[bool, int]: | |
| """ | |
| Process audio chunk and detect speech/silence | |
| Args: | |
| audio_chunk: Raw PCM audio data (LINEAR16) | |
| Returns: | |
| Tuple of (is_speech, silence_duration_ms) | |
| """ | |
| try: | |
| # Calculate RMS energy | |
| rms_energy = self._calculate_rms_energy(audio_chunk) | |
| is_speech = rms_energy > self.energy_threshold | |
| now = datetime.utcnow() | |
| if is_speech: | |
| # Speech detected | |
| if not self.is_speaking: | |
| # Speech just started | |
| self.is_speaking = True | |
| self.speech_start = now | |
| log_debug(f"π€ Speech started (RMS: {rms_energy})") | |
| self.last_speech_time = now | |
| self.silence_start = None | |
| self.total_speech_chunks += 1 | |
| return True, 0 | |
| else: | |
| # Silence detected | |
| self.total_silence_chunks += 1 | |
| if self.is_speaking: | |
| # Was speaking, now silent | |
| if self.silence_start is None: | |
| self.silence_start = now | |
| log_debug(f"π Silence started (RMS: {rms_energy})") | |
| # Calculate silence duration | |
| silence_duration = (now - self.silence_start).total_seconds() * 1000 | |
| if silence_duration >= self.silence_threshold_ms: | |
| # Speech has ended | |
| self.is_speaking = False | |
| log_debug(f"π¬ Speech ended after {silence_duration:.0f}ms of silence") | |
| return False, int(silence_duration) | |
| else: | |
| # Already in silence | |
| return False, 0 | |
| except Exception as e: | |
| log_warning(f"VAD processing error: {e}") | |
| # On error, assume speech to avoid cutting off | |
| return True, 0 | |
| def _calculate_rms_energy(self, audio_chunk: bytes) -> float: | |
| """Calculate RMS energy of audio chunk""" | |
| try: | |
| # Handle empty or invalid chunk | |
| if not audio_chunk or len(audio_chunk) < 2: | |
| return 0.0 | |
| # Ensure even number of bytes for 16-bit audio | |
| if len(audio_chunk) % 2 != 0: | |
| audio_chunk = audio_chunk[:-1] | |
| # Convert bytes to int16 samples | |
| num_samples = len(audio_chunk) // 2 | |
| samples = struct.unpack(f'{num_samples}h', audio_chunk) | |
| if not samples: | |
| return 0.0 | |
| # Calculate RMS | |
| sum_squares = sum(s * s for s in samples) | |
| rms = (sum_squares / len(samples)) ** 0.5 | |
| return rms | |
| except Exception as e: | |
| log_warning(f"RMS calculation error: {e}") | |
| return 0.0 | |
| def reset(self): | |
| """Reset VAD state""" | |
| self.is_speaking = False | |
| self.silence_start = None | |
| self.speech_start = None | |
| self.last_speech_time = None | |
| log_debug("π VAD state reset") | |
| def get_speech_duration(self) -> float: | |
| """Get current speech duration in seconds""" | |
| if self.speech_start and self.is_speaking: | |
| return (datetime.utcnow() - self.speech_start).total_seconds() | |
| return 0.0 | |
| def get_silence_duration(self) -> float: | |
| """Get current silence duration in seconds""" | |
| if self.silence_start and not self.is_speaking: | |
| return (datetime.utcnow() - self.silence_start).total_seconds() | |
| return 0.0 | |
| def get_stats(self) -> dict: | |
| """Get VAD statistics""" | |
| return { | |
| "is_speaking": self.is_speaking, | |
| "speech_chunks": self.total_speech_chunks, | |
| "silence_chunks": self.total_silence_chunks, | |
| "speech_duration": self.get_speech_duration(), | |
| "silence_duration": self.get_silence_duration(), | |
| "energy_threshold": self.energy_threshold, | |
| "silence_threshold_ms": self.silence_threshold_ms | |
| } |