xtts-v2 / test.py
kevinwang676's picture
Create test.py
a360a9d verified
import os,shutil,sys,pdb,re
now_dir = os.getcwd()
sys.path.append(now_dir)
import json,yaml,warnings,torch
import platform
import psutil
import signal
from pathlib import Path
warnings.filterwarnings("ignore")
torch.manual_seed(233333)
tmp = os.path.join(now_dir, "TEMP")
os.makedirs(tmp, exist_ok=True)
os.environ["TEMP"] = tmp
if(os.path.exists(tmp)):
for name in os.listdir(tmp):
if(name=="jieba.cache"):continue
path="%s/%s"%(tmp,name)
delete=os.remove if os.path.isfile(path) else shutil.rmtree
try:
delete(path)
except Exception as e:
print(str(e))
pass
import site
site_packages_roots = []
for path in site.getsitepackages():
if "packages" in path:
site_packages_roots.append(path)
if(site_packages_roots==[]):site_packages_roots=["%s/runtime/Lib/site-packages" % now_dir]
#os.environ["OPENBLAS_NUM_THREADS"] = "4"
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1"
os.environ["all_proxy"] = ""
for site_packages_root in site_packages_roots:
if os.path.exists(site_packages_root):
try:
with open("%s/users.pth" % (site_packages_root), "w") as f:
f.write(
"%s\n%s/tools\n%s/tools/damo_asr\n%s/GPT_SoVITS\n%s/tools/uvr5"
% (now_dir, now_dir, now_dir, now_dir, now_dir)
)
break
except PermissionError:
pass
from tools import my_utils
import traceback
import shutil
import pdb
import gradio as gr
from subprocess import Popen
import signal
from config import python_exec,infer_device,is_half,exp_root,webui_port_main,webui_port_infer_tts,webui_port_uvr5,webui_port_subfix,is_share
from tools.i18n.i18n import I18nAuto
i18n = I18nAuto()
from scipy.io import wavfile
from tools.my_utils import load_audio
from multiprocessing import cpu_count
import argparse
import os
import sys
import tempfile
import gradio as gr
import librosa.display
import numpy as np
import torch
import torchaudio
import traceback
from TTS.demos.xtts_ft_demo.utils.formatter import format_audio_list
from TTS.demos.xtts_ft_demo.utils.gpt_train import train_gpt
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
# from .list to .csv
import pandas as pd
from sklearn.model_selection import train_test_split
def split_csv(input_csv, train_csv, eval_csv, eval_size=0.15):
# Load the data from the CSV file
data = pd.read_csv(input_csv, delimiter='|', header=0)
# Split the data into training and evaluation sets
train_data, eval_data = train_test_split(data, test_size=eval_size, random_state=42)
# Save the training data to a CSV file
train_data.to_csv(train_csv, index=False, sep='|')
# Save the evaluation data to a CSV file
eval_data.to_csv(eval_csv, index=False, sep='|')
print("CSV files have been successfully split.")
def convert_list_to_csv(input_file, output_file):
try:
# Open the input .list file to read
with open(input_file, 'r', encoding='utf-8') as infile:
# Open the output .csv file to write
with open(output_file, 'w', encoding='utf-8') as outfile:
# Write the header to the CSV
outfile.write("audio_file|text|speaker_name\n")
# Process each line in the input file
for line in infile:
parts = line.strip().split('|')
if len(parts) == 4:
# Extract relevant parts: WAV file path and transcription
wav_path = parts[0]
transcription = parts[3]
# Write the formatted line to the CSV file
outfile.write(f"{wav_path}|{transcription}|coqui\n")
print("Conversion to CSV completed successfully.")
split_csv(output_file, "train.csv", "eval.csv")
print("Split completed successfully")
return "train.csv", "eval.csv"
except Exception as e:
print(f"An error occurred: {e}")
def clear_gpu_cache():
# clear the GPU cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
XTTS_MODEL = None
def load_model(xtts_checkpoint, xtts_config, xtts_vocab):
global XTTS_MODEL
clear_gpu_cache()
if not xtts_checkpoint or not xtts_config or not xtts_vocab:
return "You need to run the previous steps or manually set the `XTTS checkpoint path`, `XTTS config path`, and `XTTS vocab path` fields !!"
config = XttsConfig()
config.load_json(xtts_config)
XTTS_MODEL = Xtts.init_from_config(config)
print("Loading XTTS model! ")
XTTS_MODEL.load_checkpoint(config, checkpoint_path=xtts_checkpoint, vocab_path=xtts_vocab, use_deepspeed=False)
if torch.cuda.is_available():
XTTS_MODEL.cuda()
print("Model Loaded!")
return "Model Loaded!"
def run_tts(lang, tts_text, speaker_audio_file):
if XTTS_MODEL is None or not speaker_audio_file:
return "You need to run the previous step to load the model !!", None, None
gpt_cond_latent, speaker_embedding = XTTS_MODEL.get_conditioning_latents(audio_path=speaker_audio_file, gpt_cond_len=XTTS_MODEL.config.gpt_cond_len, max_ref_length=XTTS_MODEL.config.max_ref_len, sound_norm_refs=XTTS_MODEL.config.sound_norm_refs)
out = XTTS_MODEL.inference(
text=tts_text,
language=lang,
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
temperature=XTTS_MODEL.config.temperature, # Add custom parameters here
length_penalty=XTTS_MODEL.config.length_penalty,
repetition_penalty=XTTS_MODEL.config.repetition_penalty,
top_k=XTTS_MODEL.config.top_k,
top_p=XTTS_MODEL.config.top_p,
)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as fp:
out["wav"] = torch.tensor(out["wav"]).unsqueeze(0)
out_path = fp.name
torchaudio.save(out_path, out["wav"], 24000)
return "Speech generated !", out_path, speaker_audio_file
# define a logger to redirect
class Logger:
def __init__(self, filename="log.out"):
self.log_file = filename
self.terminal = sys.stdout
self.log = open(self.log_file, "w")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
self.terminal.flush()
self.log.flush()
def isatty(self):
return False
# redirect stdout and stderr to a file
sys.stdout = Logger()
sys.stderr = sys.stdout
# logging.basicConfig(stream=sys.stdout, level=logging.INFO)
import logging
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
def read_logs():
sys.stdout.flush()
with open(sys.stdout.log_file, "r") as f:
return f.read()
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' # 当遇到mps不支持的步骤时使用cpu
n_cpu=cpu_count()
ngpu = torch.cuda.device_count()
gpu_infos = []
mem = []
if_gpu_ok = False
# 判断是否有能用来训练和加速推理的N卡
if torch.cuda.is_available() or ngpu != 0:
for i in range(ngpu):
gpu_name = torch.cuda.get_device_name(i)
if any(value in gpu_name.upper()for value in ["10","16","20","30","40","A2","A3","A4","P4","A50","500","A60","70","80","90","M4","T4","TITAN","L4","4060"]):
# A10#A100#V100#A40#P40#M40#K80#A4500
if_gpu_ok = True # 至少有一张能用的N卡
gpu_infos.append("%s\t%s" % (i, gpu_name))
mem.append(int(torch.cuda.get_device_properties(i).total_memory/ 1024/ 1024/ 1024+ 0.4))
# 判断是否支持mps加速
if torch.backends.mps.is_available():
if_gpu_ok = True
gpu_infos.append("%s\t%s" % ("0", "Apple GPU"))
mem.append(psutil.virtual_memory().total/ 1024 / 1024 / 1024) # 实测使用系统内存作为显存不会爆显存
if if_gpu_ok and len(gpu_infos) > 0:
gpu_info = "\n".join(gpu_infos)
default_batch_size = min(mem) // 2
else:
gpu_info = i18n("很遗憾您这没有能用的显卡来支持您训练")
default_batch_size = 1
gpus = "-".join([i[0] for i in gpu_infos])
pretrained_sovits_name="GPT_SoVITS/pretrained_models/s2G488k.pth"
pretrained_gpt_name="GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt"
def get_weights_names():
SoVITS_names = [pretrained_sovits_name]
for name in os.listdir(SoVITS_weight_root):
if name.endswith(".pth"):SoVITS_names.append(name)
GPT_names = [pretrained_gpt_name]
for name in os.listdir(GPT_weight_root):
if name.endswith(".ckpt"): GPT_names.append(name)
return SoVITS_names,GPT_names
SoVITS_weight_root="SoVITS_weights"
GPT_weight_root="GPT_weights"
os.makedirs(SoVITS_weight_root,exist_ok=True)
os.makedirs(GPT_weight_root,exist_ok=True)
SoVITS_names,GPT_names = get_weights_names()
def custom_sort_key(s):
# 使用正则表达式提取字符串中的数字部分和非数字部分
parts = re.split('(\d+)', s)
# 将数字部分转换为整数,非数字部分保持不变
parts = [int(part) if part.isdigit() else part for part in parts]
return parts
def change_choices():
SoVITS_names, GPT_names = get_weights_names()
return {"choices": sorted(SoVITS_names,key=custom_sort_key), "__type__": "update"}, {"choices": sorted(GPT_names,key=custom_sort_key), "__type__": "update"}
p_label=None
p_uvr5=None
p_asr=None
p_denoise=None
p_tts_inference=None
def kill_proc_tree(pid, including_parent=True):
try:
parent = psutil.Process(pid)
except psutil.NoSuchProcess:
# Process already terminated
return
children = parent.children(recursive=True)
for child in children:
try:
os.kill(child.pid, signal.SIGTERM) # or signal.SIGKILL
except OSError:
pass
if including_parent:
try:
os.kill(parent.pid, signal.SIGTERM) # or signal.SIGKILL
except OSError:
pass
system=platform.system()
def kill_process(pid):
if(system=="Windows"):
cmd = "taskkill /t /f /pid %s" % pid
os.system(cmd)
else:
kill_proc_tree(pid)
def change_label(if_label,path_list):
global p_label
if(if_label==True and p_label==None):
path_list=my_utils.clean_path(path_list)
cmd = '"%s" tools/subfix_webui.py --load_list "%s" --webui_port %s --is_share %s'%(python_exec,path_list,webui_port_subfix,is_share)
yield i18n("打标工具WebUI已开启")
print(cmd)
p_label = Popen(cmd, shell=True)
elif(if_label==False and p_label!=None):
kill_process(p_label.pid)
p_label=None
yield i18n("打标工具WebUI已关闭")
def change_uvr5(if_uvr5):
global p_uvr5
if(if_uvr5==True and p_uvr5==None):
cmd = '"%s" tools/uvr5/webui.py "%s" %s %s %s'%(python_exec,infer_device,is_half,webui_port_uvr5,is_share)
yield i18n("UVR5已开启")
print(cmd)
p_uvr5 = Popen(cmd, shell=True)
elif(if_uvr5==False and p_uvr5!=None):
kill_process(p_uvr5.pid)
p_uvr5=None
yield i18n("UVR5已关闭")
def change_tts_inference(if_tts,bert_path,cnhubert_base_path,gpu_number,gpt_path,sovits_path):
global p_tts_inference
if(if_tts==True and p_tts_inference==None):
os.environ["gpt_path"]=gpt_path if "/" in gpt_path else "%s/%s"%(GPT_weight_root,gpt_path)
os.environ["sovits_path"]=sovits_path if "/"in sovits_path else "%s/%s"%(SoVITS_weight_root,sovits_path)
os.environ["cnhubert_base_path"]=cnhubert_base_path
os.environ["bert_path"]=bert_path
os.environ["_CUDA_VISIBLE_DEVICES"]=gpu_number
os.environ["is_half"]=str(is_half)
os.environ["infer_ttswebui"]=str(webui_port_infer_tts)
os.environ["is_share"]=str(is_share)
cmd = '"%s" GPT_SoVITS/inference_webui.py'%(python_exec)
yield i18n("TTS推理进程已开启")
print(cmd)
p_tts_inference = Popen(cmd, shell=True)
elif(if_tts==False and p_tts_inference!=None):
kill_process(p_tts_inference.pid)
p_tts_inference=None
yield i18n("TTS推理进程已关闭")
from tools.asr.config import asr_dict
def open_asr(asr_inp_dir, asr_opt_dir, asr_model, asr_model_size, asr_lang):
global p_asr
if(p_asr==None):
asr_inp_dir=my_utils.clean_path(asr_inp_dir)
cmd = f'"{python_exec}" tools/asr/{asr_dict[asr_model]["path"]}'
cmd += f' -i "{asr_inp_dir}"'
cmd += f' -o "{asr_opt_dir}"'
cmd += f' -s {asr_model_size}'
cmd += f' -l {asr_lang}'
cmd += " -p %s"%("float16"if is_half==True else "float32")
yield "ASR任务开启:%s"%cmd,{"__type__":"update","visible":False},{"__type__":"update","visible":True}
print(cmd)
p_asr = Popen(cmd, shell=True)
p_asr.wait()
p_asr=None
yield f"ASR任务完成, 查看终端进行下一步",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的ASR任务,需先终止才能开启下一次任务",{"__type__":"update","visible":False},{"__type__":"update","visible":True}
# return None
def close_asr():
global p_asr
if(p_asr!=None):
kill_process(p_asr.pid)
p_asr=None
return "已终止ASR进程",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
def open_denoise(denoise_inp_dir, denoise_opt_dir):
global p_denoise
if(p_denoise==None):
denoise_inp_dir=my_utils.clean_path(denoise_inp_dir)
denoise_opt_dir=my_utils.clean_path(denoise_opt_dir)
cmd = '"%s" tools/cmd-denoise.py -i "%s" -o "%s" -p %s'%(python_exec,denoise_inp_dir,denoise_opt_dir,"float16"if is_half==True else "float32")
yield "语音降噪任务开启:%s"%cmd,{"__type__":"update","visible":False},{"__type__":"update","visible":True}
print(cmd)
p_denoise = Popen(cmd, shell=True)
p_denoise.wait()
p_denoise=None
yield f"语音降噪任务完成, 查看终端进行下一步",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的语音降噪任务,需先终止才能开启下一次任务",{"__type__":"update","visible":False},{"__type__":"update","visible":True}
# return None
def close_denoise():
global p_denoise
if(p_denoise!=None):
kill_process(p_denoise.pid)
p_denoise=None
return "已终止语音降噪进程",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
p_train_SoVITS=None
def open1Ba(batch_size,total_epoch,exp_name,text_low_lr_rate,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers1Ba,pretrained_s2G,pretrained_s2D):
global p_train_SoVITS
if(p_train_SoVITS==None):
with open("GPT_SoVITS/configs/s2.json")as f:
data=f.read()
data=json.loads(data)
s2_dir="%s/%s"%(exp_root,exp_name)
os.makedirs("%s/logs_s2"%(s2_dir),exist_ok=True)
if(is_half==False):
data["train"]["fp16_run"]=False
batch_size=max(1,batch_size//2)
data["train"]["batch_size"]=batch_size
data["train"]["epochs"]=total_epoch
data["train"]["text_low_lr_rate"]=text_low_lr_rate
data["train"]["pretrained_s2G"]=pretrained_s2G
data["train"]["pretrained_s2D"]=pretrained_s2D
data["train"]["if_save_latest"]=if_save_latest
data["train"]["if_save_every_weights"]=if_save_every_weights
data["train"]["save_every_epoch"]=save_every_epoch
data["train"]["gpu_numbers"]=gpu_numbers1Ba
data["data"]["exp_dir"]=data["s2_ckpt_dir"]=s2_dir
data["save_weight_dir"]=SoVITS_weight_root
data["name"]=exp_name
tmp_config_path="%s/tmp_s2.json"%tmp
with open(tmp_config_path,"w")as f:f.write(json.dumps(data))
cmd = '"%s" GPT_SoVITS/s2_train.py --config "%s"'%(python_exec,tmp_config_path)
yield "SoVITS训练开始:%s"%cmd,{"__type__":"update","visible":False},{"__type__":"update","visible":True}
print(cmd)
p_train_SoVITS = Popen(cmd, shell=True)
p_train_SoVITS.wait()
p_train_SoVITS=None
yield "SoVITS训练完成",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的SoVITS训练任务,需先终止才能开启下一次任务",{"__type__":"update","visible":False},{"__type__":"update","visible":True}
def close1Ba():
global p_train_SoVITS
if(p_train_SoVITS!=None):
kill_process(p_train_SoVITS.pid)
p_train_SoVITS=None
return "已终止SoVITS训练",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
p_train_GPT=None
def open1Bb(batch_size,total_epoch,exp_name,if_dpo,if_save_latest,if_save_every_weights,save_every_epoch,gpu_numbers,pretrained_s1):
global p_train_GPT
if(p_train_GPT==None):
with open("GPT_SoVITS/configs/s1longer.yaml")as f:
data=f.read()
data=yaml.load(data, Loader=yaml.FullLoader)
s1_dir="%s/%s"%(exp_root,exp_name)
os.makedirs("%s/logs_s1"%(s1_dir),exist_ok=True)
if(is_half==False):
data["train"]["precision"]="32"
batch_size = max(1, batch_size // 2)
data["train"]["batch_size"]=batch_size
data["train"]["epochs"]=total_epoch
data["pretrained_s1"]=pretrained_s1
data["train"]["save_every_n_epoch"]=save_every_epoch
data["train"]["if_save_every_weights"]=if_save_every_weights
data["train"]["if_save_latest"]=if_save_latest
data["train"]["if_dpo"]=if_dpo
data["train"]["half_weights_save_dir"]=GPT_weight_root
data["train"]["exp_name"]=exp_name
data["train_semantic_path"]="%s/6-name2semantic.tsv"%s1_dir
data["train_phoneme_path"]="%s/2-name2text.txt"%s1_dir
data["output_dir"]="%s/logs_s1"%s1_dir
os.environ["_CUDA_VISIBLE_DEVICES"]=gpu_numbers.replace("-",",")
os.environ["hz"]="25hz"
tmp_config_path="%s/tmp_s1.yaml"%tmp
with open(tmp_config_path, "w") as f:f.write(yaml.dump(data, default_flow_style=False))
# cmd = '"%s" GPT_SoVITS/s1_train.py --config_file "%s" --train_semantic_path "%s/6-name2semantic.tsv" --train_phoneme_path "%s/2-name2text.txt" --output_dir "%s/logs_s1"'%(python_exec,tmp_config_path,s1_dir,s1_dir,s1_dir)
cmd = '"%s" GPT_SoVITS/s1_train.py --config_file "%s" '%(python_exec,tmp_config_path)
yield "GPT训练开始:%s"%cmd,{"__type__":"update","visible":False},{"__type__":"update","visible":True}
print(cmd)
p_train_GPT = Popen(cmd, shell=True)
p_train_GPT.wait()
p_train_GPT=None
yield "GPT训练完成",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的GPT训练任务,需先终止才能开启下一次任务",{"__type__":"update","visible":False},{"__type__":"update","visible":True}
def close1Bb():
global p_train_GPT
if(p_train_GPT!=None):
kill_process(p_train_GPT.pid)
p_train_GPT=None
return "已终止GPT训练",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
ps_slice=[]
def open_slice(inp,opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_parts):
global ps_slice
inp = my_utils.clean_path(inp)
opt_root = my_utils.clean_path(opt_root)
if(os.path.exists(inp)==False):
yield "输入路径不存在",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
return
if os.path.isfile(inp):n_parts=1
elif os.path.isdir(inp):pass
else:
yield "输入路径存在但既不是文件也不是文件夹",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
return
if (ps_slice == []):
for i_part in range(n_parts):
cmd = '"%s" tools/slice_audio.py "%s" "%s" %s %s %s %s %s %s %s %s %s''' % (python_exec,inp, opt_root, threshold, min_length, min_interval, hop_size, max_sil_kept, _max, alpha, i_part, n_parts)
print(cmd)
p = Popen(cmd, shell=True)
ps_slice.append(p)
yield "切割执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps_slice:
p.wait()
ps_slice=[]
yield "切割结束",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的切割任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
def close_slice():
global ps_slice
if (ps_slice != []):
for p_slice in ps_slice:
try:
kill_process(p_slice.pid)
except:
traceback.print_exc()
ps_slice=[]
return "已终止所有切割进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
ps1a=[]
def open1a(inp_text,inp_wav_dir,exp_name,gpu_numbers,bert_pretrained_dir):
global ps1a
inp_text = my_utils.clean_path(inp_text)
inp_wav_dir = my_utils.clean_path(inp_wav_dir)
if (ps1a == []):
opt_dir="%s/%s"%(exp_root,exp_name)
config={
"inp_text":inp_text,
"inp_wav_dir":inp_wav_dir,
"exp_name":exp_name,
"opt_dir":opt_dir,
"bert_pretrained_dir":bert_pretrained_dir,
}
gpu_names=gpu_numbers.split("-")
all_parts=len(gpu_names)
for i_part in range(all_parts):
config.update(
{
"i_part": str(i_part),
"all_parts": str(all_parts),
"_CUDA_VISIBLE_DEVICES": gpu_names[i_part],
"is_half": str(is_half)
}
)
os.environ.update(config)
cmd = '"%s" GPT_SoVITS/prepare_datasets/1-get-text.py'%python_exec
print(cmd)
p = Popen(cmd, shell=True)
ps1a.append(p)
yield "文本进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps1a:
p.wait()
opt = []
for i_part in range(all_parts):
txt_path = "%s/2-name2text-%s.txt" % (opt_dir, i_part)
with open(txt_path, "r", encoding="utf8") as f:
opt += f.read().strip("\n").split("\n")
os.remove(txt_path)
path_text = "%s/2-name2text.txt" % opt_dir
with open(path_text, "w", encoding="utf8") as f:
f.write("\n".join(opt) + "\n")
ps1a=[]
yield "文本进程结束",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的文本任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
def close1a():
global ps1a
if (ps1a != []):
for p1a in ps1a:
try:
kill_process(p1a.pid)
except:
traceback.print_exc()
ps1a=[]
return "已终止所有1a进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
ps1b=[]
def open1b(inp_text,inp_wav_dir,exp_name,gpu_numbers,ssl_pretrained_dir):
global ps1b
inp_text = my_utils.clean_path(inp_text)
inp_wav_dir = my_utils.clean_path(inp_wav_dir)
if (ps1b == []):
config={
"inp_text":inp_text,
"inp_wav_dir":inp_wav_dir,
"exp_name":exp_name,
"opt_dir":"%s/%s"%(exp_root,exp_name),
"cnhubert_base_dir":ssl_pretrained_dir,
"is_half": str(is_half)
}
gpu_names=gpu_numbers.split("-")
all_parts=len(gpu_names)
for i_part in range(all_parts):
config.update(
{
"i_part": str(i_part),
"all_parts": str(all_parts),
"_CUDA_VISIBLE_DEVICES": gpu_names[i_part],
}
)
os.environ.update(config)
cmd = '"%s" GPT_SoVITS/prepare_datasets/2-get-hubert-wav32k.py'%python_exec
print(cmd)
p = Popen(cmd, shell=True)
ps1b.append(p)
yield "SSL提取进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps1b:
p.wait()
ps1b=[]
yield "SSL提取进程结束",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的SSL提取任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
def close1b():
global ps1b
if (ps1b != []):
for p1b in ps1b:
try:
kill_process(p1b.pid)
except:
traceback.print_exc()
ps1b=[]
return "已终止所有1b进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
ps1c=[]
def open1c(inp_text,exp_name,gpu_numbers,pretrained_s2G_path):
global ps1c
inp_text = my_utils.clean_path(inp_text)
if (ps1c == []):
opt_dir="%s/%s"%(exp_root,exp_name)
config={
"inp_text":inp_text,
"exp_name":exp_name,
"opt_dir":opt_dir,
"pretrained_s2G":pretrained_s2G_path,
"s2config_path":"GPT_SoVITS/configs/s2.json",
"is_half": str(is_half)
}
gpu_names=gpu_numbers.split("-")
all_parts=len(gpu_names)
for i_part in range(all_parts):
config.update(
{
"i_part": str(i_part),
"all_parts": str(all_parts),
"_CUDA_VISIBLE_DEVICES": gpu_names[i_part],
}
)
os.environ.update(config)
cmd = '"%s" GPT_SoVITS/prepare_datasets/3-get-semantic.py'%python_exec
print(cmd)
p = Popen(cmd, shell=True)
ps1c.append(p)
yield "语义token提取进程执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps1c:
p.wait()
opt = ["item_name\tsemantic_audio"]
path_semantic = "%s/6-name2semantic.tsv" % opt_dir
for i_part in range(all_parts):
semantic_path = "%s/6-name2semantic-%s.tsv" % (opt_dir, i_part)
with open(semantic_path, "r", encoding="utf8") as f:
opt += f.read().strip("\n").split("\n")
os.remove(semantic_path)
with open(path_semantic, "w", encoding="utf8") as f:
f.write("\n".join(opt) + "\n")
ps1c=[]
yield "语义token提取进程结束",{"__type__":"update","visible":True},{"__type__":"update","visible":False}
else:
yield "已有正在进行的语义token提取任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
def close1c():
global ps1c
if (ps1c != []):
for p1c in ps1c:
try:
kill_process(p1c.pid)
except:
traceback.print_exc()
ps1c=[]
return "已终止所有语义token进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
#####inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,cnhubert_base_dir,pretrained_s2G
ps1abc=[]
def open1abc(inp_text,inp_wav_dir,exp_name,gpu_numbers1a,gpu_numbers1Ba,gpu_numbers1c,bert_pretrained_dir,ssl_pretrained_dir,pretrained_s2G_path):
global ps1abc
inp_text = my_utils.clean_path(inp_text)
inp_wav_dir = my_utils.clean_path(inp_wav_dir)
if (ps1abc == []):
opt_dir="%s/%s"%(exp_root,exp_name)
try:
#############################1a
path_text="%s/2-name2text.txt" % opt_dir
if(os.path.exists(path_text)==False or (os.path.exists(path_text)==True and len(open(path_text,"r",encoding="utf8").read().strip("\n").split("\n"))<2)):
config={
"inp_text":inp_text,
"inp_wav_dir":inp_wav_dir,
"exp_name":exp_name,
"opt_dir":opt_dir,
"bert_pretrained_dir":bert_pretrained_dir,
"is_half": str(is_half)
}
gpu_names=gpu_numbers1a.split("-")
all_parts=len(gpu_names)
for i_part in range(all_parts):
config.update(
{
"i_part": str(i_part),
"all_parts": str(all_parts),
"_CUDA_VISIBLE_DEVICES": gpu_names[i_part],
}
)
os.environ.update(config)
cmd = '"%s" GPT_SoVITS/prepare_datasets/1-get-text.py'%python_exec
print(cmd)
p = Popen(cmd, shell=True)
ps1abc.append(p)
yield "进度:1a-ing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps1abc:p.wait()
opt = []
for i_part in range(all_parts):#txt_path="%s/2-name2text-%s.txt"%(opt_dir,i_part)
txt_path = "%s/2-name2text-%s.txt" % (opt_dir, i_part)
with open(txt_path, "r",encoding="utf8") as f:
opt += f.read().strip("\n").split("\n")
os.remove(txt_path)
with open(path_text, "w",encoding="utf8") as f:
f.write("\n".join(opt) + "\n")
yield "进度:1a-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
ps1abc=[]
#############################1b
config={
"inp_text":inp_text,
"inp_wav_dir":inp_wav_dir,
"exp_name":exp_name,
"opt_dir":opt_dir,
"cnhubert_base_dir":ssl_pretrained_dir,
}
gpu_names=gpu_numbers1Ba.split("-")
all_parts=len(gpu_names)
for i_part in range(all_parts):
config.update(
{
"i_part": str(i_part),
"all_parts": str(all_parts),
"_CUDA_VISIBLE_DEVICES": gpu_names[i_part],
}
)
os.environ.update(config)
cmd = '"%s" GPT_SoVITS/prepare_datasets/2-get-hubert-wav32k.py'%python_exec
print(cmd)
p = Popen(cmd, shell=True)
ps1abc.append(p)
yield "进度:1a-done, 1b-ing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps1abc:p.wait()
yield "进度:1a1b-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
ps1abc=[]
#############################1c
path_semantic = "%s/6-name2semantic.tsv" % opt_dir
if(os.path.exists(path_semantic)==False or (os.path.exists(path_semantic)==True and os.path.getsize(path_semantic)<31)):
config={
"inp_text":inp_text,
"exp_name":exp_name,
"opt_dir":opt_dir,
"pretrained_s2G":pretrained_s2G_path,
"s2config_path":"GPT_SoVITS/configs/s2.json",
}
gpu_names=gpu_numbers1c.split("-")
all_parts=len(gpu_names)
for i_part in range(all_parts):
config.update(
{
"i_part": str(i_part),
"all_parts": str(all_parts),
"_CUDA_VISIBLE_DEVICES": gpu_names[i_part],
}
)
os.environ.update(config)
cmd = '"%s" GPT_SoVITS/prepare_datasets/3-get-semantic.py'%python_exec
print(cmd)
p = Popen(cmd, shell=True)
ps1abc.append(p)
yield "进度:1a1b-done, 1cing", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
for p in ps1abc:p.wait()
opt = ["item_name\tsemantic_audio"]
for i_part in range(all_parts):
semantic_path = "%s/6-name2semantic-%s.tsv" % (opt_dir, i_part)
with open(semantic_path, "r",encoding="utf8") as f:
opt += f.read().strip("\n").split("\n")
os.remove(semantic_path)
with open(path_semantic, "w",encoding="utf8") as f:
f.write("\n".join(opt) + "\n")
yield "进度:all-done", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
ps1abc = []
yield "一键三连进程结束", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
except:
traceback.print_exc()
close1abc()
yield "一键三连中途报错", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
else:
yield "已有正在进行的一键三连任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True}
def close1abc():
global ps1abc
if (ps1abc != []):
for p1abc in ps1abc:
try:
kill_process(p1abc.pid)
except:
traceback.print_exc()
ps1abc=[]
return "已终止所有一键三连进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}
with gr.Blocks(title="GPT-SoVITS WebUI") as app:
gr.Markdown("# <center>🌊💕🎶 XTTS 微调:2分钟语音,开启中日英16种语言真实拟声</center>")
gr.Markdown("## <center>🌟 只需2分钟的语音,一键在线微调 最强多语种模型</center>")
gr.Markdown("### <center>🤗 更多精彩,尽在[滔滔AI](https://www.talktalkai.com/);滔滔AI,为爱滔滔!💕</center>")
with gr.Tabs():
with gr.TabItem(i18n("1 - 制作数据集")):#提前随机切片防止uvr5爆内存->uvr5->slicer->asr->打标
#gr.Markdown(value=i18n("0a-UVR5人声伴奏分离&去混响去延迟工具"))
with gr.Row():
if_uvr5 = gr.Checkbox(label=i18n("是否开启UVR5-WebUI"),show_label=True, visible=False)
uvr5_info = gr.Textbox(label=i18n("UVR5进程输出信息"), visible=False)
gr.Markdown(value=i18n("1a-语音切分工具"))
with gr.Row():
with gr.Row():
slice_inp_path=gr.Textbox(label=i18n("音频自动切分输入路径,可文件可文件夹"),info="您需要先在GPT-SoVITS-v2文件夹中上传训练音频,如jay.wav;音频时长建议大于2分钟",value="",placeholder="jay.wav")
slice_opt_root=gr.Textbox(label=i18n("切分后的子音频的输出根目录"),value="output/slicer_opt")
threshold=gr.Textbox(label=i18n("threshold:音量小于这个值视作静音的备选切割点"),value="-34")
min_length=gr.Textbox(label=i18n("min_length:每段最小多长,如果第一段太短一直和后面段连起来直到超过这个值"),value="4000")
min_interval=gr.Textbox(label=i18n("min_interval:最短切割间隔"),value="300")
hop_size=gr.Textbox(label=i18n("hop_size:怎么算音量曲线,越小精度越大计算量越高(不是精度越大效果越好)"),value="10")
max_sil_kept=gr.Textbox(label=i18n("max_sil_kept:切完后静音最多留多长"),value="500")
with gr.Row():
open_slicer_button=gr.Button(i18n("1. 开启语音切割"), variant="primary",visible=True)
close_slicer_button=gr.Button(i18n("终止语音切割"), variant="primary",visible=False)
_max=gr.Slider(minimum=0,maximum=1,step=0.05,label=i18n("max:归一化后最大值多少"),value=0.9,interactive=True)
alpha=gr.Slider(minimum=0,maximum=1,step=0.05,label=i18n("alpha_mix:混多少比例归一化后音频进来"),value=0.25,interactive=True)
n_process=gr.Slider(minimum=1,maximum=n_cpu,step=1,label=i18n("切割使用的进程数"),value=4,interactive=True)
slicer_info = gr.Textbox(label=i18n("语音切割进程输出信息"))
#gr.Markdown(value=i18n("0bb-语音降噪工具"))
with gr.Row():
open_denoise_button = gr.Button(i18n("开启语音降噪"), visible=False)
close_denoise_button = gr.Button(i18n("终止语音降噪进程"), variant="primary",visible=False)
denoise_input_dir=gr.Textbox(label=i18n("降噪音频文件输入文件夹"),value="", visible=False)
denoise_output_dir=gr.Textbox(label=i18n("降噪结果输出文件夹"),value="output/denoise_opt", visible=False)
denoise_info = gr.Textbox(label=i18n("语音降噪进程输出信息"), visible=False)
gr.Markdown(value=i18n("1b-批量语音识别"))
with gr.Row():
open_asr_button = gr.Button(i18n("2. 开启离线批量ASR"), variant="primary",visible=True)
close_asr_button = gr.Button(i18n("终止ASR进程"), variant="primary",visible=False)
with gr.Column():
with gr.Row():
asr_inp_dir = gr.Textbox(
label=i18n("输入文件夹路径"),
value="output/slicer_opt",
interactive=True,
)
asr_opt_dir = gr.Textbox(
label = i18n("输出文件夹路径"),
value = "output/asr_opt",
interactive = True,
)
with gr.Row():
asr_model = gr.Dropdown(
label = i18n("ASR 模型"),
choices = list(asr_dict.keys()),
interactive = True,
value="达摩 ASR (中文)"
)
asr_size = gr.Dropdown(
label = i18n("ASR 模型尺寸"),
choices = ["large"],
interactive = True,
value="large"
)
asr_lang = gr.Dropdown(
label = i18n("ASR 语言设置"),
choices = ["zh"],
interactive = True,
value="zh"
)
lang = asr_lang
with gr.Row():
asr_info = gr.Textbox(label=i18n("ASR进程输出信息"))
def change_lang_choices(key): #根据选择的模型修改可选的语言
# return gr.Dropdown(choices=asr_dict[key]['lang'])
return {"__type__": "update", "choices": asr_dict[key]['lang'],"value":asr_dict[key]['lang'][0]}
def change_size_choices(key): # 根据选择的模型修改可选的模型尺寸
# return gr.Dropdown(choices=asr_dict[key]['size'])
return {"__type__": "update", "choices": asr_dict[key]['size']}
asr_model.change(change_lang_choices, [asr_model], [asr_lang])
asr_model.change(change_size_choices, [asr_model], [asr_size])
gr.Markdown(value=i18n("1c-语音文本校对标注工具"))
with gr.Row():
if_label = gr.Checkbox(label=i18n("是否开启打标WebUI"),show_label=True)
path_list = gr.Textbox(
label=i18n(".list标注文件的路径"),
value="output/asr_opt/slicer_opt.list",
interactive=True,
)
label_info = gr.Textbox(label=i18n("打标工具进程输出信息"))
if_label.change(change_label, [if_label,path_list], [label_info])
if_uvr5.change(change_uvr5, [if_uvr5], [uvr5_info])
open_asr_button.click(open_asr, [asr_inp_dir, asr_opt_dir, asr_model, asr_size, asr_lang], [asr_info,open_asr_button,close_asr_button])
close_asr_button.click(close_asr, [], [asr_info,open_asr_button,close_asr_button])
open_slicer_button.click(open_slice, [slice_inp_path,slice_opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_process], [slicer_info,open_slicer_button,close_slicer_button])
close_slicer_button.click(close_slice, [], [slicer_info,open_slicer_button,close_slicer_button])
open_denoise_button.click(open_denoise, [denoise_input_dir,denoise_output_dir], [denoise_info,open_denoise_button,close_denoise_button])
close_denoise_button.click(close_denoise, [], [denoise_info,open_denoise_button,close_denoise_button])
with gr.Tab("2 - XTTS模型微调"):
inp_list_path_value = str(Path.cwd() / "output/asr_opt/slicer_opt.list")
out_csv_path_value = str(Path.cwd() / "output.csv")
inp_list_path = gr.Textbox(value=inp_list_path_value, label=".list文件地址")
out_csv_path = gr.Textbox(value=out_csv_path_value, label=".csv文件地址")
list_to_csv = gr.Button("3. 准备训练csv文件", variant="primary")
train_csv = gr.Textbox(
label="训练数据集csv文件",
)
eval_csv = gr.Textbox(
label="评价数据集csv文件",
)
list_to_csv.click(convert_list_to_csv, [inp_list_path, out_csv_path], [train_csv, eval_csv])
out_path_value = str(Path.cwd() / "finetune_models")
out_path = gr.Textbox(value=out_path_value, label="XTTS微调模型的文件夹")
num_epochs = gr.Slider(
label="训练步数 Number of epochs:",
minimum=1,
maximum=100,
step=1,
value=6,
)
batch_size = gr.Slider(
label="Batch size:",
minimum=2,
maximum=512,
step=1,
value=2,
)
grad_acumm = gr.Slider(
label="Grad accumulation steps:",
minimum=1,
maximum=128,
step=1,
value=1,
)
max_audio_length = gr.Slider(
label="Max permitted audio size in seconds:",
minimum=2,
maximum=20,
step=1,
value=11,
visible=False,
)
progress_train = gr.Label(
label="训练进程"
)
logs_tts_train = gr.Textbox(
label="训练详细信息",
interactive=False,
)
app.load(read_logs, None, logs_tts_train, every=1)
train_btn = gr.Button(value="4. 开始模型训练", variant="primary")
def train_model(language, train_csv, eval_csv, num_epochs, batch_size, grad_acumm, output_path, max_audio_length):
clear_gpu_cache()
if not train_csv or not eval_csv:
return "You need to run the data processing step or manually set `Train CSV` and `Eval CSV` fields !", "", "", "", ""
try:
# convert seconds to waveform frames
max_audio_length = int(max_audio_length * 22050)
config_path, original_xtts_checkpoint, vocab_file, exp_path, speaker_wav = train_gpt(language, num_epochs, batch_size, grad_acumm, train_csv, eval_csv, output_path=output_path, max_audio_length=max_audio_length)
except:
traceback.print_exc()
error = traceback.format_exc()
return f"The training was interrupted due an error !! Please check the console to check the full error message! \n Error summary: {error}", "", "", "", ""
# copy original files to avoid parameters changes issues
os.system(f"cp {config_path} {exp_path}")
os.system(f"cp {vocab_file} {exp_path}")
ft_xtts_checkpoint = os.path.join(exp_path, "best_model.pth")
print("Model training done!")
clear_gpu_cache()
return "Model training done!", config_path, vocab_file, ft_xtts_checkpoint, speaker_wav
with gr.Tab("3 - XTTS语音合成"):
with gr.Row():
with gr.Column() as col1:
xtts_checkpoint = gr.Textbox(
label="XTTS checkpoint 路径",
value="",
)
xtts_config = gr.Textbox(
label="XTTS config 路径",
value="",
)
xtts_vocab = gr.Textbox(
label="XTTS vocab 路径",
value="",
)
progress_load = gr.Label(
label="模型加载进程"
)
load_btn = gr.Button(value="5. 加载已训练好的模型", variant="primary")
with gr.Column() as col2:
ref_audio_names = os.listdir("output/slicer_opt")
ref_audio_list = [os.path.join("output/slicer_opt", ref_audio_name) for ref_audio_name in ref_audio_names]
speaker_reference_audio = gr.Dropdown(
label="请选择一条参考音频",
info="不同参考音频对应的合成效果不同,您可以多次尝试",
value=ref_audio_list[0],
choices = ref_audio_list
)
tts_language = gr.Dropdown(
label="语音合成的语言",
value="zh",
choices=[
"en",
"es",
"fr",
"de",
"it",
"pt",
"pl",
"tr",
"ru",
"nl",
"cs",
"ar",
"zh",
"hu",
"ko",
"ja",
]
)
tts_text = gr.Textbox(
label="请填写语音合成的文本.",
placeholder="想说却还没说的,还很多",
)
tts_btn = gr.Button(value="6. 开启AI语音之旅吧💕", variant="primary")
with gr.Column() as col3:
progress_gen = gr.Label(
label="语音合成进程"
)
tts_output_audio = gr.Audio(label="为您合成的专属音频.")
reference_audio = gr.Audio(label="您使用的参考音频")
train_btn.click(
fn=train_model,
inputs=[
lang,
train_csv,
eval_csv,
num_epochs,
batch_size,
grad_acumm,
out_path,
max_audio_length,
],
outputs=[progress_train, xtts_config, xtts_vocab, xtts_checkpoint, speaker_reference_audio],
)
load_btn.click(
fn=load_model,
inputs=[
xtts_checkpoint,
xtts_config,
xtts_vocab
],
outputs=[progress_load],
)
tts_btn.click(
fn=run_tts,
inputs=[
tts_language,
tts_text,
speaker_reference_audio,
],
outputs=[progress_gen, tts_output_audio, reference_audio],
)
gr.Markdown("### <center>注意❗:请不要生成会对个人以及组织造成侵害的内容,此程序仅供科研、学习及个人娱乐使用。请自觉合规使用此程序,程序开发者不负有任何责任。</center>")
gr.HTML('''
<div class="footer">
<p>🌊🏞️🎶 - 江水东流急,滔滔无尽声。 明·顾璘
</p>
</div>
''')
app.queue().launch(
share=True,
show_error=True,
)