|
import re, io, os, stat |
|
import tempfile, subprocess |
|
import requests |
|
import torch |
|
import traceback |
|
import numpy as np |
|
import scipy |
|
from importlib import import_module |
|
from flask import Flask, Blueprint, request, jsonify, send_file |
|
|
|
import torch |
|
import torchaudio |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
from qili import upload_bytes |
|
|
|
|
|
|
|
|
|
|
|
|
|
sample_root= os.environ.get('XTTS_SAMPLE_DIR') |
|
if(sample_root==None): |
|
sample_root=f'{os.getcwd()}/samples' |
|
if not os.path.exists(sample_root): |
|
os.makedirs(sample_root) |
|
|
|
default_sample=f'{os.path.dirname(os.path.abspath(__file__))}/sample.wav', f'{sample_root}/sample.pt' |
|
|
|
ffmpeg=f'{os.path.dirname(os.path.abspath(__file__))}/ffmpeg' |
|
try: |
|
st = os.stat(ffmpeg) |
|
os.chmod(ffmpeg, st.st_mode | stat.S_IEXEC) |
|
except: |
|
traceback.print_exc() |
|
|
|
tts=None |
|
model=None |
|
@app.route("/convert") |
|
def predict(): |
|
global tts |
|
global model |
|
text = request.args.get('text') |
|
sample = request.args.get('sample') |
|
language = request.args.get('language') |
|
|
|
if text is None: |
|
return jsonify({'error': 'text is missing'}), 400 |
|
|
|
text= re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",text) |
|
|
|
try: |
|
if tts is None: |
|
TTS=import_module("TTS.api").TTS |
|
model_name="tts_models/multilingual/multi-dataset/xtts_v2" |
|
tts = TTS(model_name=model_name) |
|
model=tts.synthesizer.tts_model |
|
|
|
model.__get_conditioning_latents=model.get_conditioning_latents |
|
model.get_conditioning_latents=get_conditioning_latents |
|
|
|
wav = tts.tts( |
|
text, |
|
language=language if language is not None else "zh", |
|
speaker_wav=sample if sample is not None else default_sample[0], |
|
) |
|
|
|
with io.BytesIO() as wav_buffer: |
|
if torch.is_tensor(wav): |
|
wav = wav.cpu().numpy() |
|
if isinstance(wav, list): |
|
wav = np.array(wav) |
|
wav_norm = wav * (32767 / max(0.01, np.max(np.abs(wav)))) |
|
wav_norm = wav_norm.astype(np.int16) |
|
scipy.io.wavfile.write(wav_buffer, tts.synthesizer.output_sample_rate, wav_norm) |
|
wav_bytes = wav_buffer.getvalue() |
|
url= upload_bytes(wav_bytes, ext=".wav") |
|
print(f'wav is at {url}') |
|
return url |
|
except Exception as e: |
|
traceback.print_exc() |
|
return str(e) |
|
|
|
@app.route("/play") |
|
def play(): |
|
url=predict() |
|
return f''' |
|
<html> |
|
<body> |
|
<audio controls autoplay> |
|
<source src="{url}" type="audio/wav"> |
|
Your browser does not support the audio element. |
|
</audio> |
|
</body> |
|
</html> |
|
''' |
|
|
|
def get_conditioning_latents(audio_path, **others): |
|
global model |
|
speaker_wav, pt_file=download(audio_path) |
|
try: |
|
if pt_file != None: |
|
( |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
) = torch.load(pt_file) |
|
print(f'sample wav info loaded from {pt_file}') |
|
except: |
|
( |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
) = model.__get_conditioning_latents(audio_path=speaker_wav, **others) |
|
torch.save((gpt_cond_latent,speaker_embedding), pt_file) |
|
print(f'sample wav info saved to {pt_file}') |
|
return gpt_cond_latent,speaker_embedding |
|
|
|
def download(url): |
|
try: |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
id=f'{sample_root}/{response.headers["etag"]}.pt'.replace('"','') |
|
if(os.path.exists(id)): |
|
return "", id |
|
with tempfile.NamedTemporaryFile(mode="wb", delete=True) as temp_file: |
|
temp_file.write(response.content) |
|
return trim_sample_audio(os.path.abspath(temp_file.name)), id |
|
except: |
|
return default_sample |
|
|
|
def trim_sample_audio(speaker_wav): |
|
global ffmpeg |
|
try: |
|
lowpass_highpass = "lowpass=8000,highpass=75," |
|
trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," |
|
out_filename=speaker_wav.replace(".wav","_trimed.wav") |
|
shell_command = f"{ffmpeg} -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split(" ") |
|
subprocess.run( |
|
[item for item in shell_command], |
|
capture_output=False, |
|
text=True, |
|
check=True, |
|
) |
|
return out_filename |
|
except: |
|
traceback.print_exc() |
|
return speaker_wav |
|
|
|
|
|
@app.route("/") |
|
def hello(): |
|
return "hello xtts" |
|
|
|
|