""" Audio Preprocessing Module for Multilingual Audio Intelligence System This module handles the standardization of diverse audio inputs into a consistent format suitable for downstream ML models. It supports various audio formats (wav, mp3, ogg, flac), sample rates (8k-48k), bit depths (4-32 bits), and handles SNR variations as specified in PS-6 requirements. Key Features: - Format conversion and standardization - Intelligent resampling to 16kHz - Stereo to mono conversion - Volume normalization for SNR robustness - Memory-efficient processing - Robust error handling Dependencies: pydub, librosa, numpy System Dependencies: ffmpeg (for format conversion) """ import os import logging import numpy as np import librosa from pydub import AudioSegment from pydub.utils import which from typing import Tuple, Optional, Union import tempfile import warnings # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Suppress librosa warnings for cleaner output warnings.filterwarnings("ignore", category=UserWarning, module="librosa") class AudioProcessor: """ Handles audio preprocessing for the multilingual audio intelligence system. This class standardizes diverse audio inputs into a consistent format: - 16kHz sample rate (optimal for ASR models) - Single channel (mono) - Float32 numpy array format - Normalized amplitude """ def __init__(self, target_sample_rate: int = 16000): """ Initialize AudioProcessor with target specifications. Args: target_sample_rate (int): Target sample rate in Hz. Default 16kHz optimized for Whisper and pyannote models. """ self.target_sample_rate = target_sample_rate self.supported_formats = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] # Verify ffmpeg availability if not which("ffmpeg"): logger.warning("ffmpeg not found. Some format conversions may fail.") def process_audio(self, audio_input: Union[str, bytes, np.ndarray], input_sample_rate: Optional[int] = None) -> Tuple[np.ndarray, int]: """ Main processing function that standardizes any audio input. Args: audio_input: Can be file path (str), audio bytes, or numpy array input_sample_rate: Required if audio_input is numpy array Returns: Tuple[np.ndarray, int]: (processed_audio_array, sample_rate) Raises: ValueError: If input format is unsupported or invalid FileNotFoundError: If audio file doesn't exist Exception: For processing errors """ try: # Determine input type and load audio if isinstance(audio_input, str): # File path input audio_array, original_sr = self._load_from_file(audio_input) elif isinstance(audio_input, bytes): # Bytes input (e.g., from uploaded file) audio_array, original_sr = self._load_from_bytes(audio_input) elif isinstance(audio_input, np.ndarray): # Numpy array input if input_sample_rate is None: raise ValueError("input_sample_rate must be provided for numpy array input") audio_array = audio_input.astype(np.float32) original_sr = input_sample_rate else: raise ValueError(f"Unsupported input type: {type(audio_input)}") logger.info(f"Loaded audio: {audio_array.shape}, {original_sr}Hz") # Apply preprocessing pipeline processed_audio = self._preprocess_pipeline(audio_array, original_sr) logger.info(f"Processed audio: {processed_audio.shape}, {self.target_sample_rate}Hz") return processed_audio, self.target_sample_rate except Exception as e: logger.error(f"Audio processing failed: {str(e)}") raise def _load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]: """Load audio from file path.""" if not os.path.exists(file_path): raise FileNotFoundError(f"Audio file not found: {file_path}") file_ext = os.path.splitext(file_path)[1].lower() if file_ext not in self.supported_formats: raise ValueError(f"Unsupported format {file_ext}. Supported: {self.supported_formats}") try: # Use librosa for robust loading with automatic resampling audio_array, sample_rate = librosa.load(file_path, sr=None, mono=False) return audio_array, sample_rate except Exception as e: # Fallback to pydub for format conversion logger.warning(f"librosa failed, trying pydub: {e}") return self._load_with_pydub(file_path) def _load_from_bytes(self, audio_bytes: bytes) -> Tuple[np.ndarray, int]: """Load audio from bytes (e.g., uploaded file).""" # Create temporary file for processing with tempfile.NamedTemporaryFile(delete=False, suffix='.audio') as tmp_file: tmp_file.write(audio_bytes) tmp_path = tmp_file.name try: # Try to detect format and load audio_array, sample_rate = self._load_with_pydub(tmp_path) return audio_array, sample_rate finally: # Clean up temporary file try: os.unlink(tmp_path) except OSError: pass def _load_with_pydub(self, file_path: str) -> Tuple[np.ndarray, int]: """Load audio using pydub with format detection.""" try: # Let pydub auto-detect format audio_segment = AudioSegment.from_file(file_path) # Convert to numpy array samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) # Handle stereo audio if audio_segment.channels == 2: samples = samples.reshape((-1, 2)) # Normalize to [-1, 1] range samples = samples / (2**15) # 16-bit normalization return samples, audio_segment.frame_rate except Exception as e: raise Exception(f"Failed to load audio with pydub: {str(e)}") def _preprocess_pipeline(self, audio_array: np.ndarray, original_sr: int) -> np.ndarray: """ Apply the complete preprocessing pipeline. Pipeline steps: 1. Convert stereo to mono 2. Resample to target sample rate 3. Normalize amplitude 4. Apply basic noise reduction (optional) """ # Step 1: Convert to mono if stereo if len(audio_array.shape) > 1 and audio_array.shape[0] == 2: # librosa format: (channels, samples) for stereo audio_array = np.mean(audio_array, axis=0) elif len(audio_array.shape) > 1 and audio_array.shape[1] == 2: # pydub format: (samples, channels) for stereo audio_array = np.mean(audio_array, axis=1) # Ensure 1D array audio_array = audio_array.flatten() logger.debug(f"After mono conversion: {audio_array.shape}") # Step 2: Resample if necessary if original_sr != self.target_sample_rate: audio_array = librosa.resample( audio_array, orig_sr=original_sr, target_sr=self.target_sample_rate, res_type='kaiser_best' # High quality resampling ) logger.debug(f"Resampled from {original_sr}Hz to {self.target_sample_rate}Hz") # Step 3: Amplitude normalization audio_array = self._normalize_audio(audio_array) # Step 4: Basic preprocessing for robustness audio_array = self._apply_preprocessing_filters(audio_array) return audio_array.astype(np.float32) def _normalize_audio(self, audio_array: np.ndarray) -> np.ndarray: """ Normalize audio amplitude to handle varying SNR conditions. Uses RMS-based normalization for better handling of varying signal-to-noise ratios (-5dB to 20dB as per PS-6 requirements). """ # Calculate RMS (Root Mean Square) rms = np.sqrt(np.mean(audio_array**2)) if rms > 0: # Target RMS level (prevents over-amplification) target_rms = 0.1 normalization_factor = target_rms / rms # Apply normalization with clipping protection normalized = audio_array * normalization_factor normalized = np.clip(normalized, -1.0, 1.0) logger.debug(f"RMS normalization: {rms:.4f} -> {target_rms:.4f}") return normalized return audio_array def _apply_preprocessing_filters(self, audio_array: np.ndarray) -> np.ndarray: """ Apply basic preprocessing filters for improved robustness. Includes: - DC offset removal - Light high-pass filtering (removes very low frequencies) """ # Remove DC offset audio_array = audio_array - np.mean(audio_array) # Simple high-pass filter to remove very low frequencies (< 80Hz) # This helps with handling background noise and rumble try: from scipy.signal import butter, filtfilt # Design high-pass filter nyquist = self.target_sample_rate / 2 cutoff = 80 / nyquist # 80Hz cutoff if cutoff < 1.0: # Valid frequency range b, a = butter(N=1, Wn=cutoff, btype='high') audio_array = filtfilt(b, a, audio_array) logger.debug("Applied high-pass filter (80Hz cutoff)") except ImportError: logger.debug("scipy not available, skipping high-pass filter") except Exception as e: logger.debug(f"High-pass filter failed: {e}") return audio_array def get_audio_info(self, audio_input: Union[str, bytes]) -> dict: """ Get detailed information about audio file without full processing. Returns: dict: Audio metadata including duration, sample rate, channels, etc. """ try: if isinstance(audio_input, str): # File path if not os.path.exists(audio_input): raise FileNotFoundError(f"Audio file not found: {audio_input}") audio_segment = AudioSegment.from_file(audio_input) else: # Bytes input with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(audio_input) tmp_path = tmp_file.name try: audio_segment = AudioSegment.from_file(tmp_path) finally: try: os.unlink(tmp_path) except OSError: pass return { 'duration_seconds': len(audio_segment) / 1000.0, 'sample_rate': audio_segment.frame_rate, 'channels': audio_segment.channels, 'sample_width': audio_segment.sample_width, 'frame_count': audio_segment.frame_count(), 'max_possible_amplitude': audio_segment.max_possible_amplitude } except Exception as e: logger.error(f"Failed to get audio info: {e}") return {} # Utility functions for common audio operations def validate_audio_file(file_path: str) -> bool: """ Quick validation of audio file without full loading. Args: file_path (str): Path to audio file Returns: bool: True if file appears to be valid audio """ try: processor = AudioProcessor() info = processor.get_audio_info(file_path) return info.get('duration_seconds', 0) > 0 except Exception: return False def estimate_processing_time(file_path: str) -> float: """ Estimate processing time based on audio duration. Args: file_path (str): Path to audio file Returns: float: Estimated processing time in seconds """ try: processor = AudioProcessor() info = processor.get_audio_info(file_path) duration = info.get('duration_seconds', 0) # Rough estimate: 0.1x to 0.3x real-time for preprocessing # depending on format conversion needs estimated_time = duration * 0.2 return max(estimated_time, 1.0) # Minimum 1 second except Exception: return 10.0 # Default estimate if __name__ == "__main__": # Example usage and testing processor = AudioProcessor() # Test with a sample file (if available) test_files = ["sample.wav", "sample.mp3", "test_audio.flac"] for test_file in test_files: if os.path.exists(test_file): try: print(f"\nTesting {test_file}:") # Get info info = processor.get_audio_info(test_file) print(f"Info: {info}") # Process audio, sr = processor.process_audio(test_file) print(f"Processed: shape={audio.shape}, sr={sr}") # Validate is_valid = validate_audio_file(test_file) print(f"Valid: {is_valid}") except Exception as e: print(f"Error processing {test_file}: {e}")