|
import csv |
|
import datetime |
|
import os |
|
import re |
|
import time |
|
import uuid |
|
from io import StringIO |
|
import gradio as gr |
|
import spaces |
|
import torch |
|
import torchaudio |
|
from huggingface_hub import HfApi, hf_hub_download, snapshot_download |
|
from TTS.tts.configs.xtts_config import XttsConfig |
|
from TTS.tts.models.xtts import Xtts |
|
from vinorm import TTSnorm |
|
from langchain.llms import HuggingFacePipeline |
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline |
|
from components import caption_chain, tag_chain |
|
from components import pexels, utils |
|
import cv2 |
|
from moviepy.editor import AudioFileClip, ImageSequenceClip |
|
import gc |
|
from content_generation import create_content |
|
|
|
|
|
os.system("python -m unidic download") |
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
print("Downloading if not downloaded viXTTS") |
|
checkpoint_dir = "model/" |
|
repo_id = "capleaf/viXTTS" |
|
use_deepspeed = False |
|
os.makedirs(checkpoint_dir, exist_ok=True) |
|
required_files = ["model.pth", "config.json", "vocab.json", "speakers_xtts.pth"] |
|
files_in_dir = os.listdir(checkpoint_dir) |
|
if not all(file in files_in_dir for file in required_files): |
|
snapshot_download( |
|
repo_id=repo_id, |
|
repo_type="model", |
|
local_dir=checkpoint_dir, |
|
) |
|
hf_hub_download( |
|
repo_id="coqui/XTTS-v2", |
|
filename="speakers_xtts.pth", |
|
local_dir=checkpoint_dir, |
|
) |
|
xtts_config = os.path.join(checkpoint_dir, "config.json") |
|
config = XttsConfig() |
|
config.load_json(xtts_config) |
|
MODEL = Xtts.init_from_config(config) |
|
MODEL.load_checkpoint( |
|
config, checkpoint_dir=checkpoint_dir, use_deepspeed=use_deepspeed |
|
) |
|
if torch.cuda.is_available(): |
|
MODEL.cuda() |
|
supported_languages = config.languages |
|
if not "vi" in supported_languages: |
|
supported_languages.append("vi") |
|
|
|
|
|
model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-xl") |
|
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-xl") |
|
pipe = pipeline( |
|
'text2text-generation', |
|
model=model, |
|
tokenizer=tokenizer, |
|
max_length=1024 |
|
) |
|
local_llm = HuggingFacePipeline(pipeline=pipe) |
|
llm_chain = caption_chain.chain(llm=local_llm) |
|
sum_llm_chain = tag_chain.chain(llm=local_llm) |
|
pexels_api_key = os.getenv('pexels_api_key') |
|
|
|
def normalize_vietnamese_text(text): |
|
text = ( |
|
TTSnorm(text, unknown=False, lower=False, rule=True) |
|
.replace("..", ".") |
|
.replace("!.", "!") |
|
.replace("?.", "?") |
|
.replace(" .", ".") |
|
.replace(" ,", ",") |
|
.replace('"', "") |
|
.replace("'", "") |
|
.replace("AI", "Ây Ai") |
|
.replace("A.I", "Ây Ai") |
|
.replace("%", "phần trăm") |
|
) |
|
return text |
|
|
|
def calculate_keep_len(text, lang): |
|
"""Simple hack for short sentences""" |
|
if lang in ["ja", "zh-cn"]: |
|
return -1 |
|
word_count = len(text.split()) |
|
num_punct = text.count(".") + text.count("!") + text.count("?") + text.count(",") |
|
if word_count < 5: |
|
return 15000 * word_count + 2000 * num_punct |
|
elif word_count < 10: |
|
return 13000 * word_count + 2000 * num_punct |
|
return -1 |
|
|
|
def create_video_from_audio(audio_path, images, output_path): |
|
audio_clip = AudioFileClip(audio_path) |
|
duration = audio_clip.duration |
|
|
|
|
|
frame_rate = len(images) / duration |
|
|
|
|
|
video_clip = ImageSequenceClip(images, fps=frame_rate) |
|
|
|
|
|
final_clip = video_clip.set_audio(audio_clip) |
|
|
|
|
|
final_clip.write_videofile(output_path, codec='libx264', audio_codec='aac') |
|
audio_clip.close() |
|
video_clip.close() |
|
final_clip.close() |
|
|
|
def truncate_prompt(prompt, tokenizer, max_length=512): |
|
"""Truncate prompt to fit within the maximum token length.""" |
|
tokens = tokenizer.tokenize(prompt) |
|
if len(tokens) > max_length: |
|
tokens = tokens[:max_length] |
|
prompt = tokenizer.convert_tokens_to_string(tokens) |
|
return prompt |
|
|
|
@spaces.GPU |
|
def predict( |
|
prompt, |
|
language, |
|
audio_file_pth, |
|
normalize_text=True, |
|
use_llm=False, |
|
content_type="Theo yêu cầu", |
|
): |
|
if use_llm: |
|
|
|
print("I: Generating text with LLM...") |
|
generated_text = create_content(prompt, content_type, language) |
|
print(f"Generated text: {generated_text}") |
|
prompt = generated_text |
|
|
|
if language not in supported_languages: |
|
metrics_text = gr.Warning( |
|
f"Language you put {language} in is not in our Supported Languages, please choose from dropdown" |
|
) |
|
return (None, None, metrics_text) |
|
|
|
speaker_wav = audio_file_pth |
|
if len(prompt) < 2: |
|
metrics_text = gr.Warning("Please give a longer prompt text") |
|
return (None, None, metrics_text) |
|
|
|
try: |
|
metrics_text = "" |
|
t_latent = time.time() |
|
try: |
|
( |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
) = MODEL.get_conditioning_latents( |
|
audio_path=speaker_wav, |
|
gpt_cond_len=30, |
|
gpt_cond_chunk_len=4, |
|
max_ref_length=60, |
|
) |
|
except Exception as e: |
|
print("Speaker encoding error", str(e)) |
|
metrics_text = gr.Warning( |
|
"It appears something wrong with reference, did you unmute your microphone?" |
|
) |
|
return (None, None, metrics_text) |
|
|
|
prompt = re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)", r"\1 \2\2", prompt) |
|
if normalize_text and language == "vi": |
|
prompt = normalize_vietnamese_text(prompt) |
|
|
|
|
|
prompt = truncate_prompt(prompt, tokenizer, max_length=512) |
|
|
|
print("I: Generating new audio...") |
|
t0 = time.time() |
|
out = MODEL.inference( |
|
prompt, |
|
language, |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
repetition_penalty=5.0, |
|
temperature=0.75, |
|
enable_text_splitting=True, |
|
) |
|
inference_time = time.time() - t0 |
|
print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds") |
|
metrics_text += ( |
|
f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" |
|
) |
|
real_time_factor = (time.time() - t0) / out["wav"].shape[-1] * 24000 |
|
print(f"Real-time factor (RTF): {real_time_factor}") |
|
metrics_text += f"Real-time factor (RTF): {real_time_factor:.2f}\n" |
|
|
|
|
|
keep_len = calculate_keep_len(prompt, language) |
|
out["wav"] = out["wav"][:keep_len] |
|
torchaudio.save("output.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000) |
|
|
|
|
|
print("I: Generating video from audio...") |
|
|
|
folder_name = f"video_{uuid.uuid4().hex}" |
|
os.makedirs(folder_name, exist_ok=True) |
|
folder_path = os.path.join(folder_name, "images") |
|
os.makedirs(folder_path, exist_ok=True) |
|
|
|
|
|
folder_name, sentences = pexels.generate_videos(prompt, pexels_api_key, "landscape", 1080, 1920, llm_chain, sum_llm_chain) |
|
utils.combine_videos(folder_name) |
|
video_path = os.path.join(folder_name, "Final_Ad_Video.mp4") |
|
|
|
print(f"I: Video generated at {video_path}") |
|
metrics_text += f"Video generated at {video_path}\n" |
|
|
|
return ("output.wav", video_path, metrics_text) |
|
except RuntimeError as e: |
|
if "device-side assert" in str(e): |
|
|
|
print( |
|
f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", |
|
flush=True, |
|
) |
|
gr.Warning("Unhandled Exception encounter, please retry in a minute") |
|
print("Cuda device-assert Runtime encountered need restart") |
|
error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S") |
|
error_data = [ |
|
error_time, |
|
prompt, |
|
language, |
|
audio_file_pth, |
|
] |
|
error_data = [str(e) if type(e) != str else e for e in error_data] |
|
print(error_data) |
|
print(speaker_wav) |
|
write_io = StringIO() |
|
csv.writer(write_io).writerows([error_data]) |
|
csv_upload = write_io.getvalue().encode() |
|
filename = error_time + "_" + str(uuid.uuid4()) + ".csv" |
|
print("Writing error csv") |
|
error_api = HfApi() |
|
error_api.upload_file( |
|
path_or_fileobj=csv_upload, |
|
path_in_repo=filename, |
|
repo_id="coqui/xtts-flagged-dataset", |
|
repo_type="dataset", |
|
) |
|
|
|
print("Writing error reference audio") |
|
speaker_filename = error_time + "_reference_" + str(uuid.uuid4()) + ".wav" |
|
error_api = HfApi() |
|
error_api.upload_file( |
|
path_or_fileobj=speaker_wav, |
|
path_in_repo=speaker_filename, |
|
repo_id="coqui/xtts-flagged-dataset", |
|
repo_type="dataset", |
|
) |
|
|
|
space = api.get_space_runtime(repo_id=repo_id) |
|
if space.stage != "BUILDING": |
|
api.restart_space(repo_id=repo_id) |
|
else: |
|
print("TRIED TO RESTART but space is building") |
|
else: |
|
if "Failed to decode" in str(e): |
|
print("Speaker encoding error", str(e)) |
|
metrics_text = gr.Warning( |
|
"It appears something wrong with reference, did you unmute your microphone?" |
|
) |
|
else: |
|
print("RuntimeError: non device-side assert error:", str(e)) |
|
metrics_text = gr.Warning( |
|
"Something unexpected happened please retry again." |
|
) |
|
return (None, None, metrics_text) |
|
except Exception as e: |
|
print("Unexpected error:", str(e)) |
|
metrics_text = gr.Warning( |
|
"An unexpected error occurred. Please try again later." |
|
) |
|
return (None, None, metrics_text) |
|
return ("output.wav", None, metrics_text) |
|
|
|
|
|
with gr.Blocks(analytics_enabled=False) as demo: |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown( |
|
""" |
|
# tts@TDNM ✨ https://www.tdn-m.com |
|
""" |
|
) |
|
with gr.Column(): |
|
|
|
pass |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
input_text_gr = gr.Textbox( |
|
label="Text Prompt (Văn bản cần đọc)", |
|
info="Mỗi câu nên từ 10 từ trở lên.", |
|
value="Xin chào, tôi là một mô hình chuyển đổi văn bản thành giọng nói tiếng Việt.", |
|
) |
|
language_gr = gr.Dropdown( |
|
label="Language (Ngôn ngữ)", |
|
choices=[ |
|
"vi", |
|
"en", |
|
"es", |
|
"fr", |
|
"de", |
|
"it", |
|
"pt", |
|
"pl", |
|
"tr", |
|
"ru", |
|
"nl", |
|
"cs", |
|
"ar", |
|
"zh-cn", |
|
"ja", |
|
"ko", |
|
"hu", |
|
"hi", |
|
], |
|
max_choices=1, |
|
value="vi", |
|
) |
|
normalize_text = gr.Checkbox( |
|
label="Chuẩn hóa văn bản tiếng Việt", |
|
info="Normalize Vietnamese text", |
|
value=True, |
|
) |
|
use_llm_checkbox = gr.Checkbox( |
|
label="Sử dụng LLM để tạo nội dung", |
|
info="Use LLM to generate content", |
|
value=False, |
|
) |
|
content_type_dropdown = gr.Dropdown( |
|
label="Loại nội dung", |
|
choices=["triết lý sống", "Theo yêu cầu"], |
|
value="Theo yêu cầu", |
|
) |
|
ref_gr = gr.Audio( |
|
label="Reference Audio (Giọng mẫu)", |
|
type="filepath", |
|
value="nam-tai-lieu.wav", |
|
) |
|
tts_button = gr.Button( |
|
"Đọc 🗣️🔥", |
|
elem_id="send-btn", |
|
visible=True, |
|
variant="primary", |
|
) |
|
|
|
with gr.Column(): |
|
audio_gr = gr.Audio(label="Synthesised Audio", autoplay=True) |
|
video_gr = gr.Video(label="Generated Video") |
|
out_text_gr = gr.Text(label="Metrics") |
|
|
|
tts_button.click( |
|
predict, |
|
[ |
|
input_text_gr, |
|
language_gr, |
|
ref_gr, |
|
normalize_text, |
|
use_llm_checkbox, |
|
content_type_dropdown, |
|
], |
|
outputs=[audio_gr, video_gr, out_text_gr], |
|
api_name="predict", |
|
) |
|
|
|
demo.queue() |
|
demo.launch(debug=True, show_api=True, share=True) |