Spaces:
Running
on
Zero
Running
on
Zero
import os | |
from posixpath import isfile | |
from re import A | |
import sys | |
import os.path as osp | |
from typing import List, Dict, Tuple, Optional, Union, Any | |
import yaml | |
from omegaconf import OmegaConf | |
import math | |
import librosa | |
import soundfile | |
import numpy as np | |
from einops import rearrange | |
import torch | |
import torch.nn.functional as F | |
from pydub import AudioSegment | |
from audio_separator.separator import Separator | |
from transformers import Wav2Vec2FeatureExtractor, HubertModel | |
from src.utils.rprint import rlog as log | |
from src.utils.util import resample_audio | |
from src.models.audio.wav2vec_modified import Wav2VecModel | |
from src.models.audio.hubert import HubertModel_ as HubertModel | |
def pad_audio(audio, audio_unit=320, pad_threshold=80): | |
batch_size, audio_len = audio.shape | |
n_units = audio_len // audio_unit | |
side_len = math.ceil((audio_unit * n_units + pad_threshold - audio_len) / 2) | |
if side_len >= 0: | |
reflect_len = side_len // 2 | |
replicate_len = side_len % 2 | |
if reflect_len > 0: | |
audio = F.pad(audio, (reflect_len, reflect_len), mode='reflect') | |
audio = F.pad(audio, (reflect_len, reflect_len), mode='reflect') | |
if replicate_len > 0: | |
audio = F.pad(audio, (1, 1), mode='replicate') | |
return audio | |
def cut_audio(audio_path: str, save_dir: str, length=60) -> List[str]: | |
"""Cut audio into sub-divisions and return subfile paths. Supports wav format. | |
Args: | |
audio_path (str): the source audio file path | |
save_dir (str): the save directory of sub-divisions | |
length (int, optional): The max length of each sub-division. Defaults to 60 secs. | |
Returns: | |
List[str]: the subfile paths | |
""" | |
audio_name = osp.basename(audio_path).split('.')[0] | |
audio = AudioSegment.from_wav(audio_path) | |
segment_length = length * 1000. # pydub uses milliseconds | |
num_segments = math.ceil(len(audio) / segment_length) | |
os.makedirs(save_dir, exist_ok=True) | |
audio_list = [] | |
if num_segments > 1: | |
for i in range(num_segments): | |
start_time = i * segment_length | |
end_time = min((i + 1) * segment_length, len(audio)) | |
segment = audio[start_time:end_time] | |
path = osp.join(save_dir, f"{audio_name}_segment_{i+1}.wav") | |
audio_list.append(path) | |
segment.export(path, format="wav") | |
else: | |
audio_list = [audio_path] | |
return audio_list | |
class AudioProcessor(object): | |
def __init__(self, cfg_path: str, is_training: bool = False, device_id=0) -> None: | |
cfg = OmegaConf.load(cfg_path) | |
self.cfg = cfg | |
self.is_training = is_training | |
log("========================================= Audio Processer =========================================") | |
log(OmegaConf.to_yaml(cfg)) | |
# setting device | |
self.device_id = device_id | |
self.use_half = cfg.device_params.flag_use_half_precision | |
if cfg.device_params.flag_force_cpu: | |
self.device = 'cpu' | |
else: | |
try: | |
if torch.backends.mps.is_available(): | |
self.device = 'mps' | |
else: | |
self.device = 'cuda:' + str(self.device_id) | |
except: | |
self.device = 'cuda:' + str(self.device_id) | |
# init audio separator | |
self.audio_separator = None | |
self.cache_dir = cfg.cache_dir | |
self.tmp_dir = cfg.tmp_dir | |
self.use_audio_separator = cfg.model_params.use_audio_separator | |
self.audio_separator_name = cfg.model_params.audio_separator_name | |
self.audio_separator_path = cfg.model_weights.audio_separator_path | |
self.set_audio_separator(cfg.cache_dir) | |
# load audio encoder, wav2vec or hubert | |
self.model_name = cfg.model_params.model_name | |
self.is_chinese = cfg.model_params.is_chinese | |
self.audio_encoder, self.feature_extractor = self.load_model( | |
model_name = cfg.model_params.model_name, | |
model_type = cfg.model_params.model_type, | |
is_chinese = cfg.model_params.is_chinese, | |
) | |
self.only_last_features = cfg.model_params.only_last_features | |
if cfg.model_params.only_last_features: | |
self.feature_shape = (1, 768) | |
else: | |
self.feature_shape = (12, 768) # features of 12 blocks | |
# init data params | |
self.sample_strategy = cfg.data_params.sample_strategy | |
self.sample_rate = cfg.data_params.sample_rate | |
self.fps = cfg.data_params.fps | |
self.audio_unit = cfg.data_params.sample_rate / cfg.data_params.fps # num of audio samples per frame | |
self.max_length = cfg.data_params.max_length | |
self.subclip_len = cfg.data_params.sub_clip_length | |
self.save_to_cpu = cfg.data_params.save_to_cpu | |
self.pad_mode = cfg.data_params.audio_pad_mode | |
log("========================================= Audio Processer: Done =========================================") | |
def load_model(self, model_name: str="wav2vec", model_type: str="base", is_chinese: bool = False): | |
assert model_name in ["wav2vec", "hubert"], f"Unknown audio model {model_name}, only support wav2vec or hubert" | |
assert model_type in ["base", "large"], f"Unknown audio model type {model_type}, only support base or large" | |
if model_name == "wav2vec": | |
# load wav2vec model weights | |
if is_chinese: | |
if model_type == "base": | |
model_weight_path = self.cfg.model_weights.wav2vec_path.chinese.base | |
else: | |
model_weight_path = self.cfg.model_weights.wav2vec_path.chinese.large | |
else: | |
if model_type == "base": | |
model_weight_path = self.cfg.model_weights.wav2vec_path.default.base | |
else: | |
model_weight_path = self.cfg.model_weights.wav2vec_path.default.large | |
if model_weight_path is None: | |
raise ValueError(f"model_weight_path is None") | |
audio_encoder = Wav2VecModel.from_pretrained(model_weight_path, local_files_only=True).to(device=self.device) | |
else: | |
if is_chinese: | |
if model_type == "base": | |
model_weight_path = self.cfg.model_weights.hubert_path.chinese.base | |
else: | |
model_weight_path = self.cfg.model_weights.hubert_path.chinese.large | |
else: | |
if model_type == "base": | |
model_weight_path = self.cfg.model_weights.hubert_path.default.base | |
else: | |
model_weight_path = self.cfg.model_weights.hubert_path.default.large | |
if model_weight_path is None: | |
raise ValueError(f"model_weight_path is None") | |
audio_encoder = HubertModel.from_pretrained(model_weight_path, local_files_only=True).to(device=self.device) | |
log(f"{model_name}-{model_type}-chinese-{is_chinese} model has beed loaded from {model_weight_path}") | |
total_params = sum(p.numel() for p in audio_encoder.parameters()) | |
print('Number of parameter: % .4fM' % (total_params / 1e6)) | |
# weights initialization | |
audio_encoder.feature_extractor._freeze_parameters() | |
if not self.cfg.model_params.is_original: | |
frozen_layers = [0, 1] | |
for name, param in audio_encoder.named_parameters(): | |
if name.startswith("feature_projection"): | |
param.requires_grad = False | |
if name.startswith("encoder.layers"): | |
layer = int(name.split(".")[2]) | |
if layer in frozen_layers: | |
param.requires_grad = False | |
audio_encoder = audio_encoder.to(self.device) | |
if self.use_half: | |
audio_encoder = audio_encoder.half() | |
audio_encoder.eval() | |
# feature extractor | |
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_weight_path) | |
return audio_encoder, feature_extractor | |
def set_audio_separator(self, output_dir: str) -> None: | |
del self.audio_separator | |
if self.audio_separator_name is not None and self.use_audio_separator: | |
try: | |
os.makedirs(output_dir, exist_ok=True) | |
except OSError as _: | |
print("Fail to create the output cache dir.") | |
self.audio_separator = Separator( | |
output_dir=output_dir, | |
output_single_stem="vocals", | |
model_file_dir=self.audio_separator_path, | |
) | |
self.audio_separator.load_model(self.audio_separator_name) | |
assert self.audio_separator.model_instance is not None, "Fail to load audio separate model." | |
else: | |
self.audio_separator=None | |
log("Use audio directly without vocals seperator.") | |
def seperate_audio(self, audio_path: str, output_dir: Union[str, None] = None) -> str: | |
if output_dir is not None: | |
if output_dir != self.cache_dir: | |
# reload audio separator | |
self.set_audio_separator(output_dir) | |
if self.audio_separator is not None: | |
# 1. separate vocals | |
# TODO: process in memory | |
try: | |
outputs = self.audio_separator.separate(audio_path) | |
if len(outputs) <= 0: | |
raise RuntimeError("Audio separate failed.") | |
vocal_audio_file = outputs[0] | |
vocal_audio_name, _ = os.path.splitext(vocal_audio_file) | |
vocal_audio_file = os.path.join(self.audio_separator.output_dir, vocal_audio_file) | |
vocal_audio_file = resample_audio(vocal_audio_file, os.path.join(self.audio_separator.output_dir, f"{vocal_audio_name}-16k.wav"), self.sample_rate) | |
except Exception as e: | |
log(f"Fail to separate vocals from {audio_path}, error info [{e}]") | |
vocal_audio_file=audio_path | |
else: | |
vocal_audio_file=audio_path | |
return vocal_audio_file | |
def load_audio(self, audio_path: str, mono: bool = True, duration: Optional[float] = None) -> Any: | |
try: | |
audio_data, sampling_rate = librosa.load(audio_path, sr=self.sample_rate, mono=mono, duration=duration) | |
except Exception as e: | |
raise RuntimeError(f"Fail to load audio from {audio_path}, error info [{e}]") | |
return audio_data, sampling_rate | |
def prepare_audio_data(self, audio_data: Union[np.ndarray, torch.Tensor], n_frames: Optional[int]=None) -> Tuple[List[Any], int]: | |
"""Prepare audio data for processing. | |
""" | |
#print(f"==========> Using Wav2Vec2FeatureExtractor to extract audio features") | |
audio_data = np.squeeze(self.feature_extractor(audio_data, sampling_rate=self.sample_rate).input_values) | |
clip_len = int(len(audio_data) / self.audio_unit) | |
if n_frames is not None: | |
if abs(n_frames - clip_len) > 7: | |
log(f"The number of frames must be close to the clip length (in 280ms), got {n_frames} and {clip_len}") | |
return [], n_frames | |
clip_len = n_frames | |
else: | |
n_frames = clip_len | |
if isinstance(audio_data, np.ndarray): | |
audio_data = torch.from_numpy(audio_data).float().to(self.device) | |
assert audio_data.ndim == 1, 'Audio must be 1D tensor.' | |
# padding | |
# padding audio to fit the clip length | |
n_audio_samples = round(self.audio_unit * clip_len) | |
n_padding_audio_samples = n_audio_samples - len(audio_data) | |
n_padding_frames = math.ceil(n_padding_audio_samples / self.audio_unit) | |
if n_padding_audio_samples > 0: | |
if self.pad_mode == 'zero': | |
padding_value = 0 | |
elif self.pad_mode == 'replicate': | |
padding_value = float(audio_data[-1]) | |
else: | |
raise ValueError(f'Unknown pad mode: {self.pad_mode}') | |
audio_data = F.pad(audio_data, (0, n_padding_audio_samples), value=padding_value) | |
# devide audio into sub-divisions for saving GPU memory | |
audio_segments = [] | |
if clip_len <= self.subclip_len: | |
n_subdivision = 1 | |
subclip_len = clip_len | |
else: | |
n_subdivision = math.ceil(clip_len / self.subclip_len) | |
subclip_len = self.subclip_len | |
for i in range(0, n_subdivision): | |
start_idx = i * subclip_len | |
end_idx = min(start_idx + subclip_len, clip_len) | |
# debug | |
#log(f"[{i+1}/{n_subdivision}] data index [{round(start_idx * self.audio_unit)}, {round(end_idx * self.audio_unit)})") | |
audio_segments.append( | |
{ | |
"data": audio_data[round(start_idx * self.audio_unit):round(end_idx * self.audio_unit)].unsqueeze(0), | |
"start_idx": start_idx, | |
"end_idx": end_idx, | |
"length": end_idx - start_idx | |
} | |
) | |
return audio_segments, n_frames | |
def get_audio_embedding(self, audio, clip_len: int) -> torch.Tensor: | |
if audio.ndim == 2: | |
# Extract audio features | |
assert audio.shape[1] == 16000 * clip_len / self.fps, \ | |
f'Incorrect audio length {audio.shape[1]}' | |
# Extract audio features | |
if self.use_half: | |
audio = audio.half() | |
embeddings = self.audio_encoder( | |
pad_audio(audio), seq_len=clip_len, sample_strategy=self.sample_strategy, output_hidden_states=True | |
) # (N, L, 768) | |
assert len(embeddings) > 0, "Fail to extract audio embedding" | |
if self.only_last_features: | |
audio_emb = embeddings.last_hidden_state.squeeze(0) | |
else: | |
audio_emb = torch.stack( | |
embeddings.hidden_states[1:], dim=1 | |
).squeeze(0) | |
audio_emb = rearrange(audio_emb, "b s d -> s b d") | |
elif audio.ndim == 3: | |
assert audio.shape[1] == clip_len, f'Incorrect audio feature length {audio.shape[1]}' | |
audio_emb = audio | |
else: | |
raise ValueError(f'Incorrect audio input shape {audio.shape}') | |
return audio_emb | |
def get_audio_embeddings(self, audio_segments: List[Any]) -> Optional[torch.Tensor]: | |
audio_embs = [] | |
for audio_segment in audio_segments: | |
if self.is_training: | |
audio_emb = self.get_audio_embedding(audio_segment["data"], audio_segment["length"]) | |
else: | |
with torch.no_grad(): | |
audio_emb = self.get_audio_embedding(audio_segment["data"], audio_segment["length"]) | |
audio_emb = audio_emb.cpu() if self.save_to_cpu else audio_emb | |
audio_embs.append(audio_emb) | |
#log(f"audio segment [{audio_segment['start_idx']}, {audio_segment['end_idx']}) has been processed.") | |
if len(audio_embs) == 0: | |
return None | |
audio_emb = torch.cat(audio_embs, dim=0) | |
return audio_emb | |
def preprocess( | |
self, | |
audio_path: str, | |
n_frames: Optional[int] = None, | |
duration: Optional[float] = None, | |
need_seperate: bool = False | |
): | |
""" Preprocess a WAV audio file by separating the vocals from the background and resampling it to a 16 kHz sample rate. | |
The separated vocal track is then converted into wav2vec2 for further processing or analysis. | |
""" | |
if need_seperate: | |
vocal_audio_file = self.seperate_audio(audio_path) | |
else: | |
vocal_audio_file = audio_path | |
audio_data, sampling_rate = self.load_audio(vocal_audio_file, duration=duration) | |
assert sampling_rate == 16000, "The sample rate of audio must be 16000" | |
audio_segments, n_frames = self.prepare_audio_data(audio_data, n_frames) | |
audio_emb = self.get_audio_embeddings(audio_segments) | |
if audio_emb is None: | |
log(f"{audio_path} has been processed, but no audio embedding, set as 'None'.") | |
#else: | |
#log(f"{audio_path} has been processed, audio embedding shape {audio_emb.shape}.") | |
return audio_emb, n_frames | |
def preprocess_long( | |
self, | |
audio_path: str, | |
need_seperate: bool = False | |
): | |
audio_list = cut_audio(audio_path, self.tmp_dir, length=self.max_length) | |
audio_emb_list = [] | |
l = 0 | |
for idx, audio_path in enumerate(audio_list): | |
padding = (idx+1) == len(audio_list) | |
emb, length = self.preprocess(audio_path, need_seperate=need_seperate) | |
audio_emb_list.append(emb) | |
log(f"Processing audio {idx+1}/{len(audio_list)}, path: {audio_path} length: {length}") | |
l += length | |
audio_emb = torch.cat(audio_emb_list) | |
audio_length = l | |
# remove tmp file | |
if len(audio_list) > 1: | |
for audio_path in audio_list: | |
os.remove(audio_path) | |
return audio_emb, audio_length | |
def add_silent_audio(self, audio_path: str, silent_audio_path: Optional[str] = None, add_duration: float = 1., linear_fusion=False, mode="post"): | |
# mode, pre, post, both | |
assert mode in ["pre", "post", "both"], f"Unkown mode: {mode}, only support pre, post, both" | |
if silent_audio_path is None: | |
return audio_path, 0 | |
else: | |
audio_dir = osp.dirname(audio_path) | |
audio_name = osp.basename(audio_path) | |
temp_audio_path = osp.join(audio_dir, f"tmp_{audio_name}") | |
if osp.isfile(temp_audio_path): | |
os.remove(temp_audio_path) | |
audio, sr1 = librosa.load(audio_path, mono=True, sr=16000) | |
# denoise | |
audio = librosa.effects.preemphasis(audio) # enhance voice | |
# load silent audio | |
silent_audio, sr2 = librosa.load(silent_audio_path, mono=True, sr=16000) | |
silent_audio = silent_audio[:int(add_duration*sr2)] | |
if linear_fusion: | |
short_len = min(len(audio), len(silent_audio)) | |
fusion_ratio = np.linspace(0, 1.0, num=short_len) | |
# get pre padding audio | |
pre_pad_audio = fusion_ratio * silent_audio[:short_len] + (1 - fusion_ratio) * audio[:short_len] | |
if short_len < len(silent_audio): | |
pre_pad_audio = np.hstack((pre_pad_audio, silent_audio[short_len:])) | |
pre_pad_audio = np.flip(pre_pad_audio, axis=0) | |
# get post padding audio | |
post_pad_audio = (1 - fusion_ratio) * silent_audio[-short_len:] + fusion_ratio * audio[-short_len:] | |
if short_len < len(silent_audio): | |
post_pad_audio = np.hstack((silent_audio[:-short_len], post_pad_audio)) | |
post_pad_audio = np.flip(post_pad_audio, axis=0) | |
else: | |
pre_pad_audio = silent_audio | |
post_pad_audio = silent_audio | |
# padding audio | |
if mode == "both": | |
combined_audio = np.hstack((pre_pad_audio, audio, post_pad_audio)) | |
elif mode == "pre": | |
combined_audio = np.hstack((pre_pad_audio, audio)) | |
else: | |
combined_audio = np.hstack((audio, post_pad_audio)) | |
add_nframes = math.floor(add_duration * sr2 / self.audio_unit) | |
#print(f"audio length: {len(audio)}, pre_pad_audio length: {len(pre_pad_audio)}, post_pad_audio length: {len(post_pad_audio)}, combined_length: {len(combined_audio)}, total add {add_nframes*2} frames") | |
#print(f"audio duration: {librosa.get_duration(audio, sr=sr1)}, silent duration: {librosa.get_duration(silent_audio, sr=sr2)}, combined duration: {librosa.get_duration(combined_audio, sr=sr2)}") | |
soundfile.write(temp_audio_path, combined_audio, sr2) | |
return temp_audio_path, add_nframes | |
def get_long_audio_emb(self, audio_path: str) -> torch.Tensor: | |
audio_emb, length = self.preprocess_long(audio_path) | |
log(f"Load audio from {osp.realpath(audio_path)} done, audio_emb shape: {audio_emb.shape}.") | |
return audio_emb | |
def __enter__(self): | |
return self | |