Spaces:
Paused
Paused
import sys | |
import io, os, stat | |
import subprocess | |
import random | |
from zipfile import ZipFile | |
import uuid | |
import time | |
import torch | |
import torchaudio | |
import gradio as gr | |
import shutil | |
# mp4 to wav and denoising | |
import ffmpeg | |
import urllib.request | |
urllib.request.urlretrieve("https://download.openxlab.org.cn/models/Kevin676/rvc-models/weight/UVR-HP2.pth", "uvr5/uvr_model/UVR-HP2.pth") | |
urllib.request.urlretrieve("https://download.openxlab.org.cn/models/Kevin676/rvc-models/weight/UVR-HP5.pth", "uvr5/uvr_model/UVR-HP5.pth") | |
from uvr5.vr import AudioPre | |
weight_uvr5_root = "uvr5/uvr_model" | |
uvr5_names = [] | |
for name in os.listdir(weight_uvr5_root): | |
if name.endswith(".pth") or "onnx" in name: | |
uvr5_names.append(name.replace(".pth", "")) | |
func = AudioPre | |
pre_fun_hp2 = func( | |
agg=int(10), | |
model_path=os.path.join(weight_uvr5_root, "UVR-HP2.pth"), | |
device="cuda", | |
is_half=True, | |
) | |
pre_fun_hp5 = func( | |
agg=int(10), | |
model_path=os.path.join(weight_uvr5_root, "UVR-HP5.pth"), | |
device="cuda", | |
is_half=True, | |
) | |
# mp4 to wav and denoising ending | |
#download for mecab | |
os.system('python -m unidic download') | |
# By using XTTS you agree to CPML license https://coqui.ai/cpml | |
os.environ["COQUI_TOS_AGREED"] = "1" | |
# langid is used to detect language for longer text | |
# Most users expect text to be their own language, there is checkbox to disable it | |
import langid | |
import base64 | |
import csv | |
from io import StringIO | |
import datetime | |
import re | |
import gradio as gr | |
from scipy.io.wavfile import write | |
from pydub import AudioSegment | |
from TTS.api import TTS | |
from TTS.tts.configs.xtts_config import XttsConfig | |
from TTS.tts.models.xtts import Xtts | |
from TTS.utils.generic_utils import get_user_data_dir | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
from huggingface_hub import HfApi | |
# will use api to restart space on a unrecoverable error | |
api = HfApi(token=HF_TOKEN) | |
repo_id = "coqui/xtts" | |
# Use never ffmpeg binary for Ubuntu20 to use denoising for microphone input | |
print("Export newer ffmpeg binary for denoise filter") | |
ZipFile("ffmpeg.zip").extractall() | |
print("Make ffmpeg binary executable") | |
st = os.stat("ffmpeg") | |
os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC) | |
# This will trigger downloading model | |
print("Downloading if not downloaded Coqui XTTS V2") | |
from TTS.utils.manage import ModelManager | |
model_name = "tts_models/multilingual/multi-dataset/xtts_v2" | |
ModelManager().download_model(model_name) | |
model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--")) | |
print("XTTS downloaded") | |
config = XttsConfig() | |
config.load_json(os.path.join(model_path, "config.json")) | |
model = Xtts.init_from_config(config) | |
model.load_checkpoint( | |
config, | |
checkpoint_path=os.path.join(model_path, "model.pth"), | |
vocab_path=os.path.join(model_path, "vocab.json"), | |
eval=True, | |
use_deepspeed=True, | |
) | |
model.cuda() | |
# This is for debugging purposes only | |
DEVICE_ASSERT_DETECTED = 0 | |
DEVICE_ASSERT_PROMPT = None | |
DEVICE_ASSERT_LANG = None | |
supported_languages = config.languages | |
def predict( | |
prompt, | |
language, | |
audio_file_pth, | |
save_path | |
): | |
voice_cleanup = False | |
mic_file_path = None | |
use_mic = False | |
agree = True | |
no_lang_auto_detect = True | |
if agree == True: | |
if language not in supported_languages: | |
gr.Warning( | |
f"Language you put {language} in is not in is not in our Supported Languages, please choose from dropdown" | |
) | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
language_predicted = langid.classify(prompt)[ | |
0 | |
].strip() # strip need as there is space at end! | |
# tts expects chinese as zh-cn | |
if language_predicted == "zh": | |
# we use zh-cn | |
language_predicted = "zh-cn" | |
print(f"Detected language:{language_predicted}, Chosen language:{language}") | |
# After text character length 15 trigger language detection | |
if len(prompt) > 15: | |
# allow any language for short text as some may be common | |
# If user unchecks language autodetection it will not trigger | |
# You may remove this completely for own use | |
if language_predicted != language and not no_lang_auto_detect: | |
# Please duplicate and remove this check if you really want this | |
# Or auto-detector fails to identify language (which it can on pretty short text or mixed text) | |
gr.Warning( | |
f"It looks like your text isn’t the language you chose , if you’re sure the text is the same language you chose, please check disable language auto-detection checkbox" | |
) | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
if use_mic == True: | |
if mic_file_path is not None: | |
speaker_wav = mic_file_path | |
else: | |
gr.Warning( | |
"Please record your voice with Microphone, or uncheck Use Microphone to use reference audios" | |
) | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
else: | |
speaker_wav = audio_file_pth | |
# Filtering for microphone input, as it has BG noise, maybe silence in beginning and end | |
# This is fast filtering not perfect | |
# Apply all on demand | |
lowpassfilter = denoise = trim = loudness = True | |
if lowpassfilter: | |
lowpass_highpass = "lowpass=8000,highpass=75," | |
else: | |
lowpass_highpass = "" | |
if trim: | |
# better to remove silence in beginning and end for microphone | |
trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," | |
else: | |
trim_silence = "" | |
if voice_cleanup: | |
try: | |
out_filename = ( | |
speaker_wav + str(uuid.uuid4()) + ".wav" | |
) # ffmpeg to know output format | |
# we will use newer ffmpeg as that has afftn denoise filter | |
shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split( | |
" " | |
) | |
command_result = subprocess.run( | |
[item for item in shell_command], | |
capture_output=False, | |
text=True, | |
check=True, | |
) | |
speaker_wav = out_filename | |
print("Filtered microphone input") | |
except subprocess.CalledProcessError: | |
# There was an error - command exited with non-zero code | |
print("Error: failed filtering, use original microphone input") | |
else: | |
speaker_wav = speaker_wav | |
if len(prompt) < 2: | |
gr.Warning("Please give a longer prompt text") | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
if len(prompt) > 500: | |
gr.Warning( | |
"Text length limited to 500 characters for this demo, please try shorter text. You can clone this space and edit code for your own usage" | |
) | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
global DEVICE_ASSERT_DETECTED | |
if DEVICE_ASSERT_DETECTED: | |
global DEVICE_ASSERT_PROMPT | |
global DEVICE_ASSERT_LANG | |
# It will likely never come here as we restart space on first unrecoverable error now | |
print( | |
f"Unrecoverable exception caused by language:{DEVICE_ASSERT_LANG} prompt:{DEVICE_ASSERT_PROMPT}" | |
) | |
# HF Space specific.. This error is unrecoverable need to restart space | |
space = api.get_space_runtime(repo_id=repo_id) | |
if space.stage!="BUILDING": | |
api.restart_space(repo_id=repo_id) | |
else: | |
print("TRIED TO RESTART but space is building") | |
try: | |
metrics_text = "" | |
t_latent = time.time() | |
# note diffusion_conditioning not used on hifigan (default mode), it will be empty but need to pass it to model.inference | |
try: | |
( | |
gpt_cond_latent, | |
speaker_embedding, | |
) = model.get_conditioning_latents(audio_path=speaker_wav, gpt_cond_len=30, gpt_cond_chunk_len=4, max_ref_length=60) | |
except Exception as e: | |
print("Speaker encoding error", str(e)) | |
gr.Warning( | |
"It appears something wrong with reference, did you unmute your microphone?" | |
) | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
latent_calculation_time = time.time() - t_latent | |
# metrics_text=f"Embedding calculation time: {latent_calculation_time:.2f} seconds\n" | |
# temporary comma fix | |
prompt= re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",prompt) | |
wav_chunks = [] | |
## Direct mode | |
print("I: Generating new audio...") | |
t0 = time.time() | |
out = model.inference( | |
prompt, | |
language, | |
gpt_cond_latent, | |
speaker_embedding, | |
repetition_penalty=5.0, | |
temperature=0.75, | |
) | |
inference_time = time.time() - t0 | |
print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds") | |
metrics_text+=f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" | |
real_time_factor= (time.time() - t0) / out['wav'].shape[-1] * 24000 | |
print(f"Real-time factor (RTF): {real_time_factor}") | |
metrics_text+=f"Real-time factor (RTF): {real_time_factor:.2f}\n" | |
torchaudio.save(f"output/{save_path}.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000) | |
""" | |
print("I: Generating new audio in streaming mode...") | |
t0 = time.time() | |
chunks = model.inference_stream( | |
prompt, | |
language, | |
gpt_cond_latent, | |
speaker_embedding, | |
repetition_penalty=7.0, | |
temperature=0.85, | |
) | |
first_chunk = True | |
for i, chunk in enumerate(chunks): | |
if first_chunk: | |
first_chunk_time = time.time() - t0 | |
metrics_text += f"Latency to first audio chunk: {round(first_chunk_time*1000)} milliseconds\n" | |
first_chunk = False | |
wav_chunks.append(chunk) | |
print(f"Received chunk {i} of audio length {chunk.shape[-1]}") | |
inference_time = time.time() - t0 | |
print( | |
f"I: Time to generate audio: {round(inference_time*1000)} milliseconds" | |
) | |
#metrics_text += ( | |
# f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" | |
#) | |
wav = torch.cat(wav_chunks, dim=0) | |
print(wav.shape) | |
real_time_factor = (time.time() - t0) / wav.shape[0] * 24000 | |
print(f"Real-time factor (RTF): {real_time_factor}") | |
metrics_text += f"Real-time factor (RTF): {real_time_factor:.2f}\n" | |
torchaudio.save("output.wav", wav.squeeze().unsqueeze(0).cpu(), 24000) | |
""" | |
except RuntimeError as e: | |
if "device-side assert" in str(e): | |
# cannot do anything on cuda device side error, need tor estart | |
print( | |
f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", | |
flush=True, | |
) | |
gr.Warning("Unhandled Exception encounter, please retry in a minute") | |
print("Cuda device-assert Runtime encountered need restart") | |
if not DEVICE_ASSERT_DETECTED: | |
DEVICE_ASSERT_DETECTED = 1 | |
DEVICE_ASSERT_PROMPT = prompt | |
DEVICE_ASSERT_LANG = language | |
# just before restarting save what caused the issue so we can handle it in future | |
# Uploading Error data only happens for unrecovarable error | |
error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S") | |
error_data = [ | |
error_time, | |
prompt, | |
language, | |
audio_file_pth, | |
mic_file_path, | |
use_mic, | |
voice_cleanup, | |
no_lang_auto_detect, | |
agree, | |
] | |
error_data = [str(e) if type(e) != str else e for e in error_data] | |
print(error_data) | |
print(speaker_wav) | |
write_io = StringIO() | |
csv.writer(write_io).writerows([error_data]) | |
csv_upload = write_io.getvalue().encode() | |
filename = error_time + "_" + str(uuid.uuid4()) + ".csv" | |
print("Writing error csv") | |
error_api = HfApi() | |
error_api.upload_file( | |
path_or_fileobj=csv_upload, | |
path_in_repo=filename, | |
repo_id="coqui/xtts-flagged-dataset", | |
repo_type="dataset", | |
) | |
# speaker_wav | |
print("Writing error reference audio") | |
speaker_filename = ( | |
error_time + "_reference_" + str(uuid.uuid4()) + ".wav" | |
) | |
error_api = HfApi() | |
error_api.upload_file( | |
path_or_fileobj=speaker_wav, | |
path_in_repo=speaker_filename, | |
repo_id="coqui/xtts-flagged-dataset", | |
repo_type="dataset", | |
) | |
# HF Space specific.. This error is unrecoverable need to restart space | |
space = api.get_space_runtime(repo_id=repo_id) | |
if space.stage!="BUILDING": | |
api.restart_space(repo_id=repo_id) | |
else: | |
print("TRIED TO RESTART but space is building") | |
else: | |
if "Failed to decode" in str(e): | |
print("Speaker encoding error", str(e)) | |
gr.Warning( | |
"It appears something wrong with reference, did you unmute your microphone?" | |
) | |
else: | |
print("RuntimeError: non device-side assert error:", str(e)) | |
gr.Warning("Something unexpected happened please retry again.") | |
return ( | |
None, | |
None, | |
None, | |
None, | |
) | |
return ( | |
f"output/{save_path}.wav" | |
) | |
else: | |
gr.Warning("Please accept the Terms & Condition!") | |
return ( | |
None | |
) | |
class subtitle: | |
def __init__(self,index:int, start_time, end_time, text:str): | |
self.index = int(index) | |
self.start_time = start_time | |
self.end_time = end_time | |
self.text = text.strip() | |
def normalize(self,ntype:str,fps=30): | |
if ntype=="prcsv": | |
h,m,s,fs=(self.start_time.replace(';',':')).split(":")#seconds | |
self.start_time=int(h)*3600+int(m)*60+int(s)+round(int(fs)/fps,2) | |
h,m,s,fs=(self.end_time.replace(';',':')).split(":") | |
self.end_time=int(h)*3600+int(m)*60+int(s)+round(int(fs)/fps,2) | |
elif ntype=="srt": | |
h,m,s=self.start_time.split(":") | |
s=s.replace(",",".") | |
self.start_time=int(h)*3600+int(m)*60+round(float(s),2) | |
h,m,s=self.end_time.split(":") | |
s=s.replace(",",".") | |
self.end_time=int(h)*3600+int(m)*60+round(float(s),2) | |
else: | |
raise ValueError | |
def add_offset(self,offset=0): | |
self.start_time+=offset | |
if self.start_time<0: | |
self.start_time=0 | |
self.end_time+=offset | |
if self.end_time<0: | |
self.end_time=0 | |
def __str__(self) -> str: | |
return f'id:{self.index},start:{self.start_time},end:{self.end_time},text:{self.text}' | |
def read_srt(uploaded_file): | |
offset=0 | |
with open(uploaded_file.name,"r",encoding="utf-8") as f: | |
file=f.readlines() | |
subtitle_list=[] | |
indexlist=[] | |
filelength=len(file) | |
for i in range(0,filelength): | |
if " --> " in file[i]: | |
is_st=True | |
for char in file[i-1].strip().replace("\ufeff",""): | |
if char not in ['0','1','2','3','4','5','6','7','8','9']: | |
is_st=False | |
break | |
if is_st: | |
indexlist.append(i) #get line id | |
listlength=len(indexlist) | |
for i in range(0,listlength-1): | |
st,et=file[indexlist[i]].split(" --> ") | |
id=int(file[indexlist[i]-1].strip().replace("\ufeff","")) | |
text="" | |
for x in range(indexlist[i]+1,indexlist[i+1]-2): | |
text+=file[x] | |
st=subtitle(id,st,et,text) | |
st.normalize(ntype="srt") | |
st.add_offset(offset=offset) | |
subtitle_list.append(st) | |
st,et=file[indexlist[-1]].split(" --> ") | |
id=file[indexlist[-1]-1] | |
text="" | |
for x in range(indexlist[-1]+1,filelength): | |
text+=file[x] | |
st=subtitle(id,st,et,text) | |
st.normalize(ntype="srt") | |
st.add_offset(offset=offset) | |
subtitle_list.append(st) | |
return subtitle_list | |
from pydub import AudioSegment | |
def trim_audio(intervals, input_file_path, output_file_path): | |
# load the audio file | |
audio = AudioSegment.from_file(input_file_path) | |
# iterate over the list of time intervals | |
for i, (start_time, end_time) in enumerate(intervals): | |
# extract the segment of the audio | |
segment = audio[start_time*1000:end_time*1000] | |
# construct the output file path | |
output_file_path_i = f"{output_file_path}_{i}.wav" | |
# export the segment to a file | |
segment.export(output_file_path_i, format='wav') | |
import re | |
def sort_key(file_name): | |
"""Extract the last number in the file name for sorting.""" | |
numbers = re.findall(r'\d+', file_name) | |
if numbers: | |
return int(numbers[-1]) | |
return -1 # In case there's no number, this ensures it goes to the start. | |
def merge_audios(folder_path): | |
output_file = "AI配音版.wav" | |
# Get all WAV files in the folder | |
files = [f for f in os.listdir(folder_path) if f.endswith('.wav')] | |
# Sort files based on the last digit in their names | |
sorted_files = sorted(files, key=sort_key) | |
# Initialize an empty audio segment | |
merged_audio = AudioSegment.empty() | |
# Loop through each file, in order, and concatenate them | |
for file in sorted_files: | |
audio = AudioSegment.from_wav(os.path.join(folder_path, file)) | |
merged_audio += audio | |
print(f"Merged: {file}") | |
# Export the merged audio to a new file | |
merged_audio.export(output_file, format="wav") | |
return "AI配音版.wav" | |
def convert_from_srt(filename, video_full, language, split_model, multilingual): | |
subtitle_list = read_srt(filename) | |
if os.path.exists("audio_full.wav"): | |
os.remove("audio_full.wav") | |
ffmpeg.input(video_full).output("audio_full.wav", ac=2, ar=44100).run() | |
if split_model=="UVR-HP2": | |
pre_fun = pre_fun_hp2 | |
else: | |
pre_fun = pre_fun_hp5 | |
filename = "output" | |
pre_fun._path_audio_("audio_full.wav", f"./denoised/{split_model}/{filename}/", f"./denoised/{split_model}/{filename}/", "wav") | |
if os.path.isdir("output"): | |
shutil.rmtree("output") | |
if multilingual==False: | |
for i in subtitle_list: | |
os.makedirs("output", exist_ok=True) | |
trim_audio([[i.start_time, i.end_time]], f"./denoised/{split_model}/{filename}/vocal_audio_full.wav_10.wav", f"sliced_audio_{i.index}") | |
print(f"正在合成第{i.index}条语音") | |
print(f"语音内容:{i.text}") | |
predict(i.text, language, f"sliced_audio_{i.index}_0.wav", i.text + " " + str(i.index)) | |
else: | |
for i in subtitle_list: | |
os.makedirs("output", exist_ok=True) | |
trim_audio([[i.start_time, i.end_time]], f"./denoised/{split_model}/{filename}/vocal_audio_full.wav_10.wav", f"sliced_audio_{i.index}") | |
print(f"正在合成第{i.index}条语音") | |
print(f"语音内容:{i.text.splitlines()[1]}") | |
predict(i.text.splitlines()[1], language, f"sliced_audio_{i.index}_0.wav", i.text.splitlines()[1] + " " + str(i.index)) | |
return merge_audios("output") | |
with gr.Blocks() as app: | |
gr.Markdown("# <center>🌊💕🎶 XTTS - SRT文件一键AI配音</center>") | |
gr.Markdown("### <center>🌟 只需上传SRT文件和原版配音文件即可,每次一集视频AI自动配音!Developed by Kevin Wang </center>") | |
with gr.Row(): | |
with gr.Column(): | |
inp1 = gr.File(file_count="single", label="请上传一集视频对应的SRT文件") | |
inp2 = gr.Video(label="请上传一集包含原声配音的视频", info="需要是.mp4视频文件") | |
inp3 = gr.Dropdown( | |
label="请选择SRT文件对应的语言", | |
info="各种语言的简写代码请参考:https://www.science.co.il/language/Codes.php", | |
choices=[ | |
"en", | |
"es", | |
"fr", | |
"de", | |
"it", | |
"pt", | |
"pl", | |
"tr", | |
"ru", | |
"nl", | |
"cs", | |
"ar", | |
"zh-cn", | |
"ja", | |
"ko", | |
"hu", | |
"hi" | |
], | |
max_choices=1, | |
value="en", | |
) | |
inp4 = gr.Dropdown(label="请选择用于分离伴奏的模型", info="UVR-HP5去除背景音乐效果更好,但会对人声造成一定的损伤", choices=["UVR-HP2", "UVR-HP5"], value="UVR-HP5") | |
inp5 = gr.Checkbox(label="SRT文件是否为双语字幕", info="若为双语字幕,请打勾选择(SRT文件中需要先出现中文字幕,后英文字幕;中英字幕各占一行)") | |
btn = gr.Button("一键开启AI配音吧💕", variant="primary") | |
with gr.Column(): | |
out1 = gr.Audio(label="为您生成的AI完整配音") | |
btn.click(convert_from_srt, [inp1, inp2, inp3, inp4, inp5], [out1]) | |
gr.Markdown("### <center>注意❗:请勿生成会对任何个人或组织造成侵害的内容,请尊重他人的著作权和知识产权。用户对此程序的任何使用行为与程序开发者无关。</center>") | |
gr.HTML(''' | |
<div class="footer"> | |
<p>🌊🏞️🎶 - 江水东流急,滔滔无尽声。 明·顾璘 | |
</p> | |
</div> | |
''') | |
app.launch(share=True, show_error=True) | |