|
""" |
|
Audio Preprocessing Module for Multilingual Audio Intelligence System |
|
|
|
This module handles the standardization of diverse audio inputs into a consistent |
|
format suitable for downstream ML models. It supports various audio formats |
|
(wav, mp3, ogg, flac), sample rates (8k-48k), bit depths (4-32 bits), and |
|
handles SNR variations as specified in PS-6 requirements. |
|
|
|
Key Features: |
|
- Format conversion and standardization |
|
- Intelligent resampling to 16kHz |
|
- Stereo to mono conversion |
|
- Volume normalization for SNR robustness |
|
- Memory-efficient processing |
|
- Robust error handling |
|
|
|
Dependencies: pydub, librosa, numpy |
|
System Dependencies: ffmpeg (for format conversion) |
|
""" |
|
|
|
import os |
|
import logging |
|
import numpy as np |
|
import librosa |
|
from pydub import AudioSegment |
|
from pydub.utils import which |
|
from typing import Tuple, Optional, Union |
|
import tempfile |
|
import warnings |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning, module="librosa") |
|
|
|
|
|
class AudioProcessor: |
|
""" |
|
Handles audio preprocessing for the multilingual audio intelligence system. |
|
|
|
This class standardizes diverse audio inputs into a consistent format: |
|
- 16kHz sample rate (optimal for ASR models) |
|
- Single channel (mono) |
|
- Float32 numpy array format |
|
- Normalized amplitude |
|
""" |
|
|
|
def __init__(self, target_sample_rate: int = 16000): |
|
""" |
|
Initialize AudioProcessor with target specifications. |
|
|
|
Args: |
|
target_sample_rate (int): Target sample rate in Hz. Default 16kHz |
|
optimized for Whisper and pyannote models. |
|
""" |
|
self.target_sample_rate = target_sample_rate |
|
self.supported_formats = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] |
|
|
|
|
|
if not which("ffmpeg"): |
|
logger.warning("ffmpeg not found. Some format conversions may fail.") |
|
|
|
def process_audio(self, audio_input: Union[str, bytes, np.ndarray], |
|
input_sample_rate: Optional[int] = None) -> Tuple[np.ndarray, int]: |
|
""" |
|
Main processing function that standardizes any audio input. |
|
|
|
Args: |
|
audio_input: Can be file path (str), audio bytes, or numpy array |
|
input_sample_rate: Required if audio_input is numpy array |
|
|
|
Returns: |
|
Tuple[np.ndarray, int]: (processed_audio_array, sample_rate) |
|
|
|
Raises: |
|
ValueError: If input format is unsupported or invalid |
|
FileNotFoundError: If audio file doesn't exist |
|
Exception: For processing errors |
|
""" |
|
try: |
|
|
|
if isinstance(audio_input, str): |
|
|
|
audio_array, original_sr = self._load_from_file(audio_input) |
|
elif isinstance(audio_input, bytes): |
|
|
|
audio_array, original_sr = self._load_from_bytes(audio_input) |
|
elif isinstance(audio_input, np.ndarray): |
|
|
|
if input_sample_rate is None: |
|
raise ValueError("input_sample_rate must be provided for numpy array input") |
|
audio_array = audio_input.astype(np.float32) |
|
original_sr = input_sample_rate |
|
else: |
|
raise ValueError(f"Unsupported input type: {type(audio_input)}") |
|
|
|
logger.info(f"Loaded audio: {audio_array.shape}, {original_sr}Hz") |
|
|
|
|
|
processed_audio = self._preprocess_pipeline(audio_array, original_sr) |
|
|
|
logger.info(f"Processed audio: {processed_audio.shape}, {self.target_sample_rate}Hz") |
|
|
|
return processed_audio, self.target_sample_rate |
|
|
|
except Exception as e: |
|
logger.error(f"Audio processing failed: {str(e)}") |
|
raise |
|
|
|
def _load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]: |
|
"""Load audio from file path.""" |
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"Audio file not found: {file_path}") |
|
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
if file_ext not in self.supported_formats: |
|
raise ValueError(f"Unsupported format {file_ext}. Supported: {self.supported_formats}") |
|
|
|
try: |
|
|
|
audio_array, sample_rate = librosa.load(file_path, sr=None, mono=False) |
|
return audio_array, sample_rate |
|
except Exception as e: |
|
|
|
logger.warning(f"librosa failed, trying pydub: {e}") |
|
return self._load_with_pydub(file_path) |
|
|
|
def _load_from_bytes(self, audio_bytes: bytes) -> Tuple[np.ndarray, int]: |
|
"""Load audio from bytes (e.g., uploaded file).""" |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.audio') as tmp_file: |
|
tmp_file.write(audio_bytes) |
|
tmp_path = tmp_file.name |
|
|
|
try: |
|
|
|
audio_array, sample_rate = self._load_with_pydub(tmp_path) |
|
return audio_array, sample_rate |
|
finally: |
|
|
|
try: |
|
os.unlink(tmp_path) |
|
except OSError: |
|
pass |
|
|
|
def _load_with_pydub(self, file_path: str) -> Tuple[np.ndarray, int]: |
|
"""Load audio using pydub with format detection.""" |
|
try: |
|
|
|
audio_segment = AudioSegment.from_file(file_path) |
|
|
|
|
|
samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) |
|
|
|
|
|
if audio_segment.channels == 2: |
|
samples = samples.reshape((-1, 2)) |
|
|
|
|
|
samples = samples / (2**15) |
|
|
|
return samples, audio_segment.frame_rate |
|
|
|
except Exception as e: |
|
raise Exception(f"Failed to load audio with pydub: {str(e)}") |
|
|
|
def _preprocess_pipeline(self, audio_array: np.ndarray, original_sr: int) -> np.ndarray: |
|
""" |
|
Apply the complete preprocessing pipeline. |
|
|
|
Pipeline steps: |
|
1. Convert stereo to mono |
|
2. Resample to target sample rate |
|
3. Normalize amplitude |
|
4. Apply basic noise reduction (optional) |
|
""" |
|
|
|
if len(audio_array.shape) > 1 and audio_array.shape[0] == 2: |
|
|
|
audio_array = np.mean(audio_array, axis=0) |
|
elif len(audio_array.shape) > 1 and audio_array.shape[1] == 2: |
|
|
|
audio_array = np.mean(audio_array, axis=1) |
|
|
|
|
|
audio_array = audio_array.flatten() |
|
|
|
logger.debug(f"After mono conversion: {audio_array.shape}") |
|
|
|
|
|
if original_sr != self.target_sample_rate: |
|
audio_array = librosa.resample( |
|
audio_array, |
|
orig_sr=original_sr, |
|
target_sr=self.target_sample_rate, |
|
res_type='kaiser_best' |
|
) |
|
logger.debug(f"Resampled from {original_sr}Hz to {self.target_sample_rate}Hz") |
|
|
|
|
|
audio_array = self._normalize_audio(audio_array) |
|
|
|
|
|
audio_array = self._apply_preprocessing_filters(audio_array) |
|
|
|
return audio_array.astype(np.float32) |
|
|
|
def _normalize_audio(self, audio_array: np.ndarray) -> np.ndarray: |
|
""" |
|
Normalize audio amplitude to handle varying SNR conditions. |
|
|
|
Uses RMS-based normalization for better handling of varying |
|
signal-to-noise ratios (-5dB to 20dB as per PS-6 requirements). |
|
""" |
|
|
|
rms = np.sqrt(np.mean(audio_array**2)) |
|
|
|
if rms > 0: |
|
|
|
target_rms = 0.1 |
|
normalization_factor = target_rms / rms |
|
|
|
|
|
normalized = audio_array * normalization_factor |
|
normalized = np.clip(normalized, -1.0, 1.0) |
|
|
|
logger.debug(f"RMS normalization: {rms:.4f} -> {target_rms:.4f}") |
|
return normalized |
|
|
|
return audio_array |
|
|
|
def _apply_preprocessing_filters(self, audio_array: np.ndarray) -> np.ndarray: |
|
""" |
|
Apply basic preprocessing filters for improved robustness. |
|
|
|
Includes: |
|
- DC offset removal |
|
- Light high-pass filtering (removes very low frequencies) |
|
""" |
|
|
|
audio_array = audio_array - np.mean(audio_array) |
|
|
|
|
|
|
|
try: |
|
from scipy.signal import butter, filtfilt |
|
|
|
|
|
nyquist = self.target_sample_rate / 2 |
|
cutoff = 80 / nyquist |
|
|
|
if cutoff < 1.0: |
|
b, a = butter(N=1, Wn=cutoff, btype='high') |
|
audio_array = filtfilt(b, a, audio_array) |
|
logger.debug("Applied high-pass filter (80Hz cutoff)") |
|
|
|
except ImportError: |
|
logger.debug("scipy not available, skipping high-pass filter") |
|
except Exception as e: |
|
logger.debug(f"High-pass filter failed: {e}") |
|
|
|
return audio_array |
|
|
|
def get_audio_info(self, audio_input: Union[str, bytes]) -> dict: |
|
""" |
|
Get detailed information about audio file without full processing. |
|
|
|
Returns: |
|
dict: Audio metadata including duration, sample rate, channels, etc. |
|
""" |
|
try: |
|
if isinstance(audio_input, str): |
|
|
|
if not os.path.exists(audio_input): |
|
raise FileNotFoundError(f"Audio file not found: {audio_input}") |
|
audio_segment = AudioSegment.from_file(audio_input) |
|
else: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp_file: |
|
tmp_file.write(audio_input) |
|
tmp_path = tmp_file.name |
|
|
|
try: |
|
audio_segment = AudioSegment.from_file(tmp_path) |
|
finally: |
|
try: |
|
os.unlink(tmp_path) |
|
except OSError: |
|
pass |
|
|
|
return { |
|
'duration_seconds': len(audio_segment) / 1000.0, |
|
'sample_rate': audio_segment.frame_rate, |
|
'channels': audio_segment.channels, |
|
'sample_width': audio_segment.sample_width, |
|
'frame_count': audio_segment.frame_count(), |
|
'max_possible_amplitude': audio_segment.max_possible_amplitude |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get audio info: {e}") |
|
return {} |
|
|
|
|
|
|
|
def validate_audio_file(file_path: str) -> bool: |
|
""" |
|
Quick validation of audio file without full loading. |
|
|
|
Args: |
|
file_path (str): Path to audio file |
|
|
|
Returns: |
|
bool: True if file appears to be valid audio |
|
""" |
|
try: |
|
processor = AudioProcessor() |
|
info = processor.get_audio_info(file_path) |
|
return info.get('duration_seconds', 0) > 0 |
|
except Exception: |
|
return False |
|
|
|
|
|
def estimate_processing_time(file_path: str) -> float: |
|
""" |
|
Estimate processing time based on audio duration. |
|
|
|
Args: |
|
file_path (str): Path to audio file |
|
|
|
Returns: |
|
float: Estimated processing time in seconds |
|
""" |
|
try: |
|
processor = AudioProcessor() |
|
info = processor.get_audio_info(file_path) |
|
duration = info.get('duration_seconds', 0) |
|
|
|
|
|
|
|
estimated_time = duration * 0.2 |
|
return max(estimated_time, 1.0) |
|
except Exception: |
|
return 10.0 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
processor = AudioProcessor() |
|
|
|
|
|
test_files = ["sample.wav", "sample.mp3", "test_audio.flac"] |
|
|
|
for test_file in test_files: |
|
if os.path.exists(test_file): |
|
try: |
|
print(f"\nTesting {test_file}:") |
|
|
|
|
|
info = processor.get_audio_info(test_file) |
|
print(f"Info: {info}") |
|
|
|
|
|
audio, sr = processor.process_audio(test_file) |
|
print(f"Processed: shape={audio.shape}, sr={sr}") |
|
|
|
|
|
is_valid = validate_audio_file(test_file) |
|
print(f"Valid: {is_valid}") |
|
|
|
except Exception as e: |
|
print(f"Error processing {test_file}: {e}") |