|
import os |
|
import re |
|
import sys |
|
import codecs |
|
import librosa |
|
import logging |
|
|
|
import numpy as np |
|
import soundfile as sf |
|
|
|
from pydub import AudioSegment |
|
|
|
sys.path.append(os.getcwd()) |
|
|
|
from main.tools import huggingface |
|
from main.configs.config import Config |
|
|
|
for l in ["httpx", "httpcore"]: |
|
logging.getLogger(l).setLevel(logging.ERROR) |
|
|
|
translations = Config().translations |
|
|
|
|
|
def check_predictors(method, f0_onnx=False): |
|
if f0_onnx and method not in ["harvest", "dio"]: method += "-onnx" |
|
|
|
def download(predictors): |
|
if not os.path.exists(os.path.join("assets", "models", "predictors", predictors)): huggingface.HF_download_file(codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/cerqvpgbef/", "rot13") + predictors, os.path.join("assets", "models", "predictors", predictors)) |
|
|
|
model_dict = {**dict.fromkeys(["rmvpe", "rmvpe-legacy"], "rmvpe.pt"), **dict.fromkeys(["rmvpe-onnx", "rmvpe-legacy-onnx"], "rmvpe.onnx"), **dict.fromkeys(["fcpe"], "fcpe.pt"), **dict.fromkeys(["fcpe-legacy"], "fcpe_legacy.pt"), **dict.fromkeys(["fcpe-onnx"], "fcpe.onnx"), **dict.fromkeys(["fcpe-legacy-onnx"], "fcpe_legacy.onnx"), **dict.fromkeys(["crepe-full", "mangio-crepe-full"], "crepe_full.pth"), **dict.fromkeys(["crepe-full-onnx", "mangio-crepe-full-onnx"], "crepe_full.onnx"), **dict.fromkeys(["crepe-large", "mangio-crepe-large"], "crepe_large.pth"), **dict.fromkeys(["crepe-large-onnx", "mangio-crepe-large-onnx"], "crepe_large.onnx"), **dict.fromkeys(["crepe-medium", "mangio-crepe-medium"], "crepe_medium.pth"), **dict.fromkeys(["crepe-medium-onnx", "mangio-crepe-medium-onnx"], "crepe_medium.onnx"), **dict.fromkeys(["crepe-small", "mangio-crepe-small"], "crepe_small.pth"), **dict.fromkeys(["crepe-small-onnx", "mangio-crepe-small-onnx"], "crepe_small.onnx"), **dict.fromkeys(["crepe-tiny", "mangio-crepe-tiny"], "crepe_tiny.pth"), **dict.fromkeys(["crepe-tiny-onnx", "mangio-crepe-tiny-onnx"], "crepe_tiny.onnx"), **dict.fromkeys(["harvest", "dio"], "world.pth")} |
|
|
|
if "hybrid" in method: |
|
methods_str = re.search("hybrid\[(.+)\]", method) |
|
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")] |
|
|
|
for method in methods: |
|
if method in model_dict: download(model_dict[method]) |
|
elif method in model_dict: download(model_dict[method]) |
|
|
|
def check_embedders(hubert, embedders_mode="fairseq"): |
|
huggingface_url = codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/rzorqqref/", "rot13") |
|
if embedders_mode == "spin": embedders_mode, hubert = "transformers", "spin" |
|
|
|
if hubert in ["contentvec_base", "hubert_base", "japanese_hubert_base", "korean_hubert_base", "chinese_hubert_base", "portuguese_hubert_base", "spin"]: |
|
if embedders_mode == "fairseq": hubert += ".pt" |
|
elif embedders_mode == "onnx": hubert += ".onnx" |
|
|
|
model_path = os.path.join("assets", "models", "embedders", hubert) |
|
|
|
if embedders_mode == "fairseq": |
|
if not os.path.exists(model_path): huggingface.HF_download_file("".join([huggingface_url, "fairseq/", hubert]), model_path) |
|
elif embedders_mode == "onnx": |
|
if not os.path.exists(model_path): huggingface.HF_download_file("".join([huggingface_url, "onnx/", hubert]), model_path) |
|
elif embedders_mode == "transformers": |
|
url, hubert = ("transformers/", hubert) if hubert != "spin" else ("spin", "") |
|
|
|
bin_file = os.path.join(model_path, "model.safetensors") |
|
config_file = os.path.join(model_path, "config.json") |
|
|
|
os.makedirs(model_path, exist_ok=True) |
|
|
|
if not os.path.exists(bin_file): huggingface.HF_download_file("".join([huggingface_url, url, hubert, "/model.safetensors"]), bin_file) |
|
if not os.path.exists(config_file): huggingface.HF_download_file("".join([huggingface_url, url, hubert, "/config.json"]), config_file) |
|
else: raise ValueError(translations["option_not_valid"]) |
|
|
|
def check_spk_diarization(model_size): |
|
whisper_model = os.path.join("assets", "models", "speaker_diarization", "models", f"{model_size}.pt") |
|
if not os.path.exists(whisper_model): huggingface.HF_download_file("".join([codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/fcrnxre_qvnevmngvba/", "rot13"), model_size, ".pt"]), whisper_model) |
|
|
|
speechbrain_path = os.path.join("assets", "models", "speaker_diarization", "models", "speechbrain") |
|
if not os.path.exists(speechbrain_path): os.makedirs(speechbrain_path, exist_ok=True) |
|
|
|
for f in ["classifier.ckpt", "config.json", "embedding_model.ckpt", "hyperparams.yaml", "mean_var_norm_emb.ckpt"]: |
|
speechbrain_model = os.path.join(speechbrain_path, f) |
|
|
|
if not os.path.exists(speechbrain_model): huggingface.HF_download_file(codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/fcrnxre_qvnevmngvba/fcrrpuoenva/", "rot13") + f, speechbrain_model) |
|
|
|
def check_audioldm2(model): |
|
for f in ["feature_extractor", "language_model", "projection_model", "scheduler", "text_encoder", "text_encoder_2", "tokenizer", "tokenizer_2", "unet", "vae", "vocoder"]: |
|
folder_path = os.path.join("assets", "models", "audioldm2", model, f) |
|
|
|
if not os.path.exists(folder_path): os.makedirs(folder_path, exist_ok=True) |
|
|
|
for f in ["feature_extractor/preprocessor_config.json","language_model/config.json","language_model/model.safetensors","model_index.json","projection_model/config.json","projection_model/diffusion_pytorch_model.safetensors","scheduler/scheduler_config.json","text_encoder/config.json","text_encoder/model.safetensors","text_encoder_2/config.json","text_encoder_2/model.safetensors","tokenizer/merges.txt","tokenizer/special_tokens_map.json","tokenizer/tokenizer.json","tokenizer/tokenizer_config.json","tokenizer/vocab.json","tokenizer_2/special_tokens_map.json","tokenizer_2/spiece.model","tokenizer_2/tokenizer.json","tokenizer_2/tokenizer_config.json","unet/config.json","unet/diffusion_pytorch_model.safetensors","vae/config.json","vae/diffusion_pytorch_model.safetensors","vocoder/config.json","vocoder/model.safetensors"]: |
|
model_path = os.path.join("assets", "models", "audioldm2", model, f) |
|
|
|
if not os.path.exists(model_path): huggingface.HF_download_file("".join([codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/nhqvbyqz/", "rot13"), model, "/", f]), model_path) |
|
|
|
def load_audio(logger, file, sample_rate=16000, formant_shifting=False, formant_qfrency=0.8, formant_timbre=0.8): |
|
try: |
|
file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
|
if not os.path.isfile(file): raise FileNotFoundError(translations["not_found"].format(name=file)) |
|
|
|
try: |
|
logger.debug(translations['read_sf']) |
|
audio, sr = sf.read(file, dtype=np.float32) |
|
except: |
|
logger.debug(translations['read_librosa']) |
|
audio, sr = librosa.load(file, sr=None) |
|
|
|
if len(audio.shape) > 1: audio = librosa.to_mono(audio.T) |
|
if sr != sample_rate: audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate, res_type="soxr_vhq") |
|
|
|
if formant_shifting: |
|
from main.library.algorithm.stftpitchshift import StftPitchShift |
|
|
|
pitchshifter = StftPitchShift(1024, 32, sample_rate) |
|
audio = pitchshifter.shiftpitch(audio, factors=1, quefrency=formant_qfrency * 1e-3, distortion=formant_timbre) |
|
except Exception as e: |
|
raise RuntimeError(f"{translations['errors_loading_audio']}: {e}") |
|
|
|
return audio.flatten() |
|
|
|
def pydub_convert(audio): |
|
samples = np.frombuffer(audio.raw_data, dtype=np.int16) |
|
if samples.dtype != np.int16: samples = (samples * 32767).astype(np.int16) |
|
|
|
return AudioSegment(samples.tobytes(), frame_rate=audio.frame_rate, sample_width=samples.dtype.itemsize, channels=audio.channels) |
|
|
|
def pydub_load(input_path): |
|
try: |
|
if input_path.endswith(".wav"): audio = AudioSegment.from_wav(input_path) |
|
elif input_path.endswith(".mp3"): audio = AudioSegment.from_mp3(input_path) |
|
elif input_path.endswith(".ogg"): audio = AudioSegment.from_ogg(input_path) |
|
else: audio = AudioSegment.from_file(input_path) |
|
except: |
|
audio = AudioSegment.from_file(input_path) |
|
|
|
return audio |
|
|
|
def load_embedders_model(embedder_model, embedders_mode="fairseq", providers=None): |
|
if embedders_mode == "fairseq": embedder_model += ".pt" |
|
elif embedders_mode == "onnx": embedder_model += ".onnx" |
|
elif embedders_mode == "spin": embedders_mode, embedder_model = "transformers", "spin" |
|
|
|
embedder_model_path = os.path.join("assets", "models", "embedders", embedder_model) |
|
if not os.path.exists(embedder_model_path): raise FileNotFoundError(f"{translations['not_found'].format(name=translations['model'])}: {embedder_model}") |
|
|
|
try: |
|
if embedders_mode == "fairseq": |
|
from main.library.architectures import fairseq |
|
|
|
models, saved_cfg, _ = fairseq.load_model(embedder_model_path) |
|
|
|
embed_suffix = ".pt" |
|
hubert_model = models[0] |
|
elif embedders_mode == "onnx": |
|
import onnxruntime |
|
|
|
sess_options = onnxruntime.SessionOptions() |
|
sess_options.log_severity_level = 3 |
|
|
|
embed_suffix, saved_cfg = ".onnx", None |
|
hubert_model = onnxruntime.InferenceSession(embedder_model_path, sess_options=sess_options, providers=providers) |
|
elif embedders_mode == "transformers": |
|
from torch import nn |
|
from transformers import HubertModel |
|
|
|
class HubertModelWithFinalProj(HubertModel): |
|
def __init__(self, config): |
|
super().__init__(config) |
|
self.final_proj = nn.Linear(config.hidden_size, config.classifier_proj_size) |
|
|
|
embed_suffix, saved_cfg = ".safetensors", None |
|
hubert_model = HubertModelWithFinalProj.from_pretrained(embedder_model_path) |
|
else: raise ValueError(translations["option_not_valid"]) |
|
except Exception as e: |
|
raise RuntimeError(translations["read_model_error"].format(e=e)) |
|
|
|
return hubert_model, saved_cfg, embed_suffix |
|
|
|
def cut(audio, sr, db_thresh=-60, min_interval=250): |
|
from main.inference.preprocess import Slicer, get_rms |
|
|
|
class Slicer2(Slicer): |
|
def slice2(self, waveform): |
|
samples = waveform.mean(axis=0) if len(waveform.shape) > 1 else waveform |
|
|
|
if samples.shape[0] <= self.min_length: return [(waveform, 0, samples.shape[0])] |
|
rms_list = get_rms(y=samples, frame_length=self.win_size, hop_length=self.hop_size).squeeze(0) |
|
|
|
sil_tags = [] |
|
silence_start, clip_start = None, 0 |
|
|
|
for i, rms in enumerate(rms_list): |
|
if rms < self.threshold: |
|
if silence_start is None: silence_start = i |
|
continue |
|
|
|
if silence_start is None: continue |
|
|
|
is_leading_silence = silence_start == 0 and i > self.max_sil_kept |
|
need_slice_middle = (i - silence_start >= self.min_interval and i - clip_start >= self.min_length) |
|
|
|
if not is_leading_silence and not need_slice_middle: |
|
silence_start = None |
|
continue |
|
|
|
if i - silence_start <= self.max_sil_kept: |
|
pos = rms_list[silence_start : i + 1].argmin() + silence_start |
|
sil_tags.append((0, pos) if silence_start == 0 else (pos, pos)) |
|
clip_start = pos |
|
elif i - silence_start <= self.max_sil_kept * 2: |
|
pos = rms_list[i - self.max_sil_kept : silence_start + self.max_sil_kept + 1].argmin() |
|
pos += i - self.max_sil_kept |
|
|
|
pos_r = (rms_list[i - self.max_sil_kept : i + 1].argmin() + i - self.max_sil_kept) |
|
|
|
if silence_start == 0: |
|
sil_tags.append((0, pos_r)) |
|
clip_start = pos_r |
|
else: |
|
sil_tags.append((min((rms_list[silence_start : silence_start + self.max_sil_kept + 1].argmin() + silence_start), pos), max(pos_r, pos))) |
|
clip_start = max(pos_r, pos) |
|
else: |
|
pos_r = (rms_list[i - self.max_sil_kept : i + 1].argmin() + i - self.max_sil_kept) |
|
sil_tags.append((0, pos_r) if silence_start == 0 else ((rms_list[silence_start : silence_start + self.max_sil_kept + 1].argmin() + silence_start), pos_r)) |
|
clip_start = pos_r |
|
|
|
silence_start = None |
|
|
|
total_frames = rms_list.shape[0] |
|
if (silence_start is not None and total_frames - silence_start >= self.min_interval): sil_tags.append((rms_list[silence_start : min(total_frames, silence_start + self.max_sil_kept) + 1].argmin() + silence_start, total_frames + 1)) |
|
|
|
if not sil_tags: return [(waveform, 0, samples.shape[-1])] |
|
else: |
|
chunks = [] |
|
if sil_tags[0][0] > 0: chunks.append((self._apply_slice(waveform, 0, sil_tags[0][0]), 0, sil_tags[0][0] * self.hop_size)) |
|
|
|
for i in range(len(sil_tags) - 1): |
|
chunks.append((self._apply_slice(waveform, sil_tags[i][1], sil_tags[i + 1][0]), sil_tags[i][1] * self.hop_size, sil_tags[i + 1][0] * self.hop_size)) |
|
|
|
if sil_tags[-1][1] < total_frames: chunks.append((self._apply_slice(waveform, sil_tags[-1][1], total_frames), sil_tags[-1][1] * self.hop_size, samples.shape[-1])) |
|
return chunks |
|
|
|
slicer = Slicer2(sr=sr, threshold=db_thresh, min_interval=min_interval) |
|
return slicer.slice2(audio) |
|
|
|
def restore(segments, total_len, dtype=np.float32): |
|
out = [] |
|
last_end = 0 |
|
|
|
for start, end, processed_seg in segments: |
|
if start > last_end: out.append(np.zeros(start - last_end, dtype=dtype)) |
|
|
|
out.append(processed_seg) |
|
last_end = end |
|
|
|
if last_end < total_len: out.append(np.zeros(total_len - last_end, dtype=dtype)) |
|
return np.concatenate(out, axis=-1) |