|
import re, io, os, stat, logging |
|
import tempfile, subprocess |
|
import requests |
|
import torch |
|
import traceback |
|
import numpy as np |
|
import scipy |
|
from TTS.api import TTS |
|
from flask import Flask, Blueprint, request, jsonify, send_file |
|
|
|
import torch |
|
import torchaudio |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
app = Flask(__name__) |
|
|
|
|
|
|
|
from qili import upload_bytes |
|
|
|
|
|
|
|
|
|
|
|
tts=None |
|
model=None |
|
|
|
sample_root= os.environ.get('XTTS_SAMPLE_DIR') |
|
if(sample_root==None): |
|
sample_root=f'{os.getcwd()}/samples' |
|
if not os.path.exists(sample_root): |
|
os.makedirs(sample_root) |
|
|
|
default_sample=f'{os.path.dirname(os.path.abspath(__file__))}/sample.wav', f'{sample_root}/sample.pt' |
|
ffmpeg=f'{os.path.dirname(os.path.abspath(__file__))}/ffmpeg' |
|
|
|
def predict(text, sample=None, language="zh"): |
|
global tts |
|
global model |
|
try: |
|
if tts is None: |
|
|
|
|
|
|
|
|
|
model_name="tts_models/multilingual/multi-dataset/xtts_v2" |
|
logging.info(f"loading model {model_name} ...") |
|
tts = TTS( |
|
model_name, |
|
|
|
|
|
|
|
progress_bar=False |
|
) |
|
model=tts.synthesizer.tts_model |
|
|
|
model.__get_conditioning_latents=model.get_conditioning_latents |
|
model.get_conditioning_latents=get_conditioning_latents |
|
logging.info("model is ready") |
|
text= re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",text) |
|
wav = tts.tts( |
|
text, |
|
language=language if language is not None else "zh", |
|
speaker_wav=sample if sample is not None else default_sample[0], |
|
) |
|
|
|
with io.BytesIO() as wav_buffer: |
|
if torch.is_tensor(wav): |
|
wav = wav.cpu().numpy() |
|
if isinstance(wav, list): |
|
wav = np.array(wav) |
|
wav_norm = wav * (32767 / max(0.01, np.max(np.abs(wav)))) |
|
wav_norm = wav_norm.astype(np.int16) |
|
scipy.io.wavfile.write(wav_buffer, tts.synthesizer.output_sample_rate, wav_norm) |
|
wav_bytes = wav_buffer.getvalue() |
|
url= upload_bytes(wav_bytes, ext=".wav") |
|
logging.debug(f'wav is at {url}') |
|
return url |
|
except Exception as e: |
|
traceback.print_exc() |
|
return str(e) |
|
|
|
@app.route("/") |
|
def convert(): |
|
text = request.args.get('text') |
|
if text is None: |
|
return jsonify({'error': 'text is missing'}), 400 |
|
|
|
sample = request.args.get('sample') |
|
language = request.args.get('language') |
|
|
|
return predict(text, sample, language) |
|
|
|
@app.route("/play") |
|
def play(): |
|
url=predict() |
|
return f''' |
|
<html> |
|
<body> |
|
<audio controls autoplay> |
|
<source src="{url}" type="audio/wav"> |
|
Your browser does not support the audio element. |
|
</audio> |
|
</body> |
|
</html> |
|
''' |
|
|
|
def get_conditioning_latents(audio_path, **others): |
|
global model |
|
speaker_wav, pt_file=download(audio_path) |
|
try: |
|
if pt_file != None: |
|
( |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
) = torch.load(pt_file) |
|
logging.debug(f'sample wav info loaded from {pt_file}') |
|
except: |
|
( |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
) = model.__get_conditioning_latents(audio_path=speaker_wav, **others) |
|
torch.save((gpt_cond_latent,speaker_embedding), pt_file) |
|
logging.debug(f'sample wav info saved to {pt_file}') |
|
return gpt_cond_latent,speaker_embedding |
|
|
|
def download(url): |
|
try: |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
id=f'{sample_root}/{response.headers["etag"]}.pt'.replace('"','') |
|
if(os.path.exists(id)): |
|
return "", id |
|
with tempfile.NamedTemporaryFile(mode="wb", delete=True) as temp_file: |
|
temp_file.write(response.content) |
|
return trim_sample_audio(os.path.abspath(temp_file.name)), id |
|
except: |
|
return default_sample |
|
|
|
def trim_sample_audio(speaker_wav): |
|
global ffmpeg |
|
try: |
|
lowpass_highpass = "lowpass=8000,highpass=75," |
|
trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," |
|
out_filename=speaker_wav.replace(".wav","_trimed.wav") |
|
shell_command = f"{ffmpeg} -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ") |
|
subprocess.run( |
|
[item for item in shell_command], |
|
capture_output=False, |
|
text=True, |
|
check=True, |
|
) |
|
return out_filename |
|
except: |
|
traceback.print_exc() |
|
return speaker_wav |
|
|
|
logging.info("xtts is ready") |
|
|
|
|
|
|
|
|