Spaces:
Runtime error
Runtime error
| enhanced_accessibility = False #@param {type:"boolean"} | |
| #@markdown --- | |
| #@markdown #### Please select your language: | |
| #lang_select = "English" #@param ["English", "Spanish"] | |
| #if lang_select == "English": | |
| # lang = "en" | |
| #elif lang_select == "Spanish": | |
| # lang = "es" | |
| #else: | |
| # raise Exception("Language not supported.") | |
| #@markdown --- | |
| lang = "en" | |
| use_gpu = False #@param {type:"boolean"} | |
| from fastapi import FastAPI, Request, Form | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.responses import FileResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| # ... | |
| # Mount a directory to serve static files (e.g., CSS and JavaScript) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| import logging | |
| app = FastAPI() | |
| templates = Jinja2Templates(directory="templates") | |
| files = {} | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| # Mock data for your interface | |
| data = { | |
| "speaker_options": ["Speaker 1", "Speaker 2", "Speaker 3"], | |
| "default_speaker": "Speaker 1", | |
| } | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("interface.html", {"request": request, "data": data}) | |
| import json | |
| import logging | |
| import math | |
| import sys | |
| from pathlib import Path | |
| from enum import Enum | |
| from typing import Iterable, List, Optional, Union | |
| import numpy as np | |
| import onnxruntime | |
| import glob | |
| import ipywidgets as widgets | |
| from pydub import AudioSegment | |
| import tempfile | |
| import uuid | |
| import soundfile as sf | |
| from IPython.display import display, Audio, Markdown, clear_output | |
| from piper_phonemize import phonemize_codepoints, phonemize_espeak, tashkeel_run | |
| _LOGGER = logging.getLogger("piper_train.infer_onnx") | |
| import os | |
| #if not os.path.exists("./content/piper/src/python/lng"): | |
| # import subprocess | |
| # command = "cp -r ./content/piper/notebooks/lng ./content/piper/src/python/lng" | |
| # subprocess.run(command, shell=True) | |
| import sys | |
| #sys.path.append('/content/piper/notebooks') | |
| sys.path.append('./content/piper/src/python') | |
| import configparser | |
| class Translator: | |
| def __init__(self): | |
| self.configs = {} | |
| def load_language(self, language_name): | |
| if language_name not in self.configs: | |
| config = configparser.ConfigParser() | |
| config.read(os.path.join(os.getcwd(), "lng", f"{language_name}.lang")) | |
| self.configs[language_name] = config | |
| def translate(self, language_name, string): | |
| if language_name == "en": | |
| return string | |
| elif language_name not in self.configs: | |
| self.load_language(language_name) | |
| config = self.configs[language_name] | |
| try: | |
| return config.get("Strings", string) | |
| except (configparser.NoOptionError, configparser.NoSectionError): | |
| if string: | |
| return string | |
| else: | |
| raise Exception("language engine error: This translation is corrupt!") | |
| return 0 | |
| #from translator import * | |
| lan = Translator() | |
| def detect_onnx_models(path): | |
| onnx_models = glob.glob(path + '/*.onnx') | |
| if len(onnx_models) > 1: | |
| return onnx_models | |
| elif len(onnx_models) == 1: | |
| return onnx_models[0] | |
| else: | |
| return None | |
| renamed_audio_file = None | |
| #@app.post("/synthesize") | |
| #@app.post("/", response_class=FileResponse) | |
| async def main( | |
| request: Request, | |
| text_input: str = Form(...), | |
| speaker: str = Form(...), | |
| speed_slider: float = Form(1.0), | |
| noise_scale_slider: float = Form(0.667), | |
| noise_scale_w_slider: float = Form(1.0), | |
| play: bool = Form(True) | |
| ): | |
| """Main entry point""" | |
| sys.path.append('./content/piper/src/python') | |
| models_path = "./content/piper/src/python" | |
| logging.basicConfig(level=logging.DEBUG) | |
| providers = [ | |
| "CPUExecutionProvider" | |
| if use_gpu is False | |
| else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"}) | |
| ] | |
| sess_options = onnxruntime.SessionOptions() | |
| model = None | |
| onnx_models = detect_onnx_models(models_path) | |
| speaker_selection = widgets.Dropdown( | |
| options=[], | |
| description=f'{lan.translate(lang, "Select speaker")}:', | |
| layout={'visibility': 'hidden'} | |
| ) | |
| if onnx_models is None: | |
| if enhanced_accessibility: | |
| playaudio("novoices") | |
| raise Exception(lan.translate(lang, "No downloaded voice packages!")) | |
| elif isinstance(onnx_models, str): | |
| onnx_model = onnx_models | |
| model, config = load_onnx(onnx_model, sess_options, providers) | |
| print("nuber of speakers = ", config["num_speakers"]) | |
| print("speaker", speaker) | |
| # rate = speed_slider.value | |
| # noise_scale = noise_scale_slider.value | |
| # noise_scale_w = noise_scale_w_slider.value | |
| auto_play = play | |
| audio = inferencing(model, config, 0, text_input, speed_slider, noise_scale_slider, noise_scale_w_slider, auto_play) | |
| temp_dir = tempfile.mkdtemp() | |
| # Create a temporary directory to store the audio files | |
| #temp_dir = tempfile.mkdtemp() | |
| # Export the audio to an MP3 file in the temporary directory | |
| # temp_audio_file = os.path.join(temp_dir, "generated_audio.mp3") | |
| # Rename the audio file based on the text input | |
| renamed_audio_file = os.path.join(temp_dir, f"{text_input}.mp3") | |
| audio.export(renamed_audio_file, format="mp3") | |
| # Save the generated audio as a temporary file | |
| filepath = renamed_audio_file | |
| # Generate a unique file ID | |
| file_id = str(uuid.uuid4()) | |
| # Store the file path with the generated file ID | |
| files[file_id] = filepath | |
| # Create a URL to download the file | |
| file_url = f'/download?fileId={file_id}' | |
| # os.rename(temp_audio_file, renamed_audio_file) | |
| # Specify the path to your MP3 audio file | |
| # audio_file_path = "path/to/your/audio.mp3" | |
| # Check if the file exists | |
| # if not os.path.exists(audio_file_path): | |
| # return {"detail": "Audio file not found"} | |
| # temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| # audio.export(temp_audio_file.name, format="mp3") | |
| # Rename the temporary audio file based on the text input | |
| # global renamed_audio_file | |
| # renamed_audio_file = os.path.join(tempfile.gettempdir(), f"{text_input}.mp3") | |
| # os.rename(temp_audio_file.name, renamed_audio_file) | |
| if config["num_speakers"] > 1: | |
| speaker_selection.options = config["speaker_id_map"].values() | |
| speaker_selection.layout.visibility = 'visible' | |
| preview_sid = 0 | |
| else: | |
| speaker_selection.layout.visibility = 'hidden' | |
| preview_sid = None | |
| else: | |
| voice_model_names = [] | |
| for current in onnx_models: | |
| voice_struct = current.split("/")[5] | |
| voice_model_names.append(voice_struct) | |
| # if enhanced_accessibility: | |
| # playaudio("selectmodel") | |
| # selection = widgets.Dropdown( | |
| # options=voice_model_names, | |
| # description=f'{lan.translate(lang, "Select voice package")}:', | |
| # ) | |
| # load_btn = widgets.Button( | |
| # description=lan.translate(lang, "Load it!") | |
| # ) | |
| # config = None | |
| # def load_model(button): | |
| # nonlocal config | |
| # global onnx_model | |
| # nonlocal model | |
| # nonlocal models_path | |
| # selected_voice = selection.value | |
| # onnx_model = f"{models_path}/{selected_voice}" | |
| # model, config = load_onnx(onnx_model, sess_options, providers) | |
| # if enhanced_accessibility: | |
| # playaudio("loaded") | |
| # if config["num_speakers"] > 1: | |
| # speaker_selection.options = config["speaker_id_map"].values() | |
| # speaker_selection.layout.visibility = 'visible' | |
| # if enhanced_accessibility: | |
| # playaudio("multispeaker") | |
| # else: | |
| # speaker_selection.layout.visibility = 'hidden' | |
| # load_btn.on_click(load_model) | |
| # display(selection, load_btn) | |
| # display(speaker_selection) | |
| # Save the audio as a temporary WAV file | |
| return templates.TemplateResponse("interface.html", {"request": request, "file_url": file_url, "data": data}) | |
| # Serve the audio file with the correct media type | |
| # return FileResponse(renamed_audio_file) | |
| # return {"message": f"Text to synthesize: {text_input}, Speed: {speed_slider}, Play: {play}"} | |
| async def download_file(fileId: str): | |
| # Retrieve the file path from the dictionary using the file ID | |
| filepath = files.get(fileId) | |
| if filepath: | |
| # Create a FileResponse to serve the file for download | |
| return FileResponse(filepath, headers={"Content-Disposition": "attachment"}) | |
| else: | |
| return {"error": "File not found"} | |
| def load_onnx(model, sess_options, providers = ["CPUExecutionProvider"]): | |
| _LOGGER.debug("Loading model from %s", model) | |
| config = load_config(model) | |
| model = onnxruntime.InferenceSession( | |
| str(model), | |
| sess_options=sess_options, | |
| providers= providers | |
| ) | |
| _LOGGER.info("Loaded model from %s", model) | |
| return model, config | |
| def load_config(model): | |
| with open(f"{model}.json", "r") as file: | |
| config = json.load(file) | |
| return config | |
| PAD = "_" # padding (0) | |
| BOS = "^" # beginning of sentence | |
| EOS = "$" # end of sentence | |
| class PhonemeType(str, Enum): | |
| ESPEAK = "espeak" | |
| TEXT = "text" | |
| def phonemize(config, text: str) -> List[List[str]]: | |
| """Text to phonemes grouped by sentence.""" | |
| if config["phoneme_type"] == PhonemeType.ESPEAK: | |
| if config["espeak"]["voice"] == "ar": | |
| # Arabic diacritization | |
| # https://github.com/mush42/libtashkeel/ | |
| text = tashkeel_run(text) | |
| return phonemize_espeak(text, config["espeak"]["voice"]) | |
| if config["phoneme_type"] == PhonemeType.TEXT: | |
| return phonemize_codepoints(text) | |
| raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}') | |
| def phonemes_to_ids(config, phonemes: List[str]) -> List[int]: | |
| """Phonemes to ids.""" | |
| id_map = config["phoneme_id_map"] | |
| ids: List[int] = list(id_map[BOS]) | |
| for phoneme in phonemes: | |
| if phoneme not in id_map: | |
| print("Missing phoneme from id map: %s", phoneme) | |
| continue | |
| ids.extend(id_map[phoneme]) | |
| ids.extend(id_map[PAD]) | |
| ids.extend(id_map[EOS]) | |
| return ids | |
| def audio_float_to_int16( | |
| audio: np.ndarray, max_wav_value: float = 32767.0 | |
| ) -> np.ndarray: | |
| """Normalize audio and convert to int16 range""" | |
| audio_norm = audio * (max_wav_value / max(0.01, np.max(np.abs(audio)))) | |
| audio_norm = np.clip(audio_norm, -max_wav_value, max_wav_value) | |
| audio_norm = audio_norm.astype("int16") | |
| return audio_norm | |
| def inferencing(model, config, sid, line, length_scale = 1, noise_scale = 0.667, noise_scale_w = 0.8, auto_play=True): | |
| audios = [] | |
| if config["phoneme_type"] == "PhonemeType.ESPEAK": | |
| config["phoneme_type"] = "espeak" | |
| text = phonemize(config, line) | |
| for phonemes in text: | |
| phoneme_ids = phonemes_to_ids(config, phonemes) | |
| num_speakers = config["num_speakers"] | |
| if num_speakers == 1: | |
| speaker_id = None # for now | |
| else: | |
| speaker_id = sid | |
| text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0) | |
| text_lengths = np.array([text.shape[1]], dtype=np.int64) | |
| scales = np.array( | |
| [noise_scale, length_scale, noise_scale_w], | |
| dtype=np.float32, | |
| ) | |
| sid = None | |
| if speaker_id is not None: | |
| sid = np.array([speaker_id], dtype=np.int64) | |
| audio = model.run( | |
| None, | |
| { | |
| "input": text, | |
| "input_lengths": text_lengths, | |
| "scales": scales, | |
| "sid": sid, | |
| }, | |
| )[0].squeeze((0, 1)) | |
| audio = audio_float_to_int16(audio.squeeze()) | |
| audios.append(audio) | |
| merged_audio = np.concatenate(audios) | |
| sample_rate = config["audio"]["sample_rate"] | |
| temp_audio_path = os.path.join(tempfile.gettempdir(), "generated_audio.wav") | |
| sf.write(temp_audio_path, merged_audio, config["audio"]["sample_rate"]) | |
| audio = AudioSegment.from_mp3(temp_audio_path) | |
| return audio | |
| # return FileResponse(temp_audio_path) | |
| # Return the audio file as a FastAPI response | |
| # display(Markdown(f"{line}")) | |
| # display(Audio(merged_audio, rate=sample_rate, autoplay=auto_play)) | |
| def denoise( | |
| audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float | |
| ) -> np.ndarray: | |
| audio_spec, audio_angles = transform(audio) | |
| a = bias_spec.shape[-1] | |
| b = audio_spec.shape[-1] | |
| repeats = max(1, math.ceil(b / a)) | |
| bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b] | |
| audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength) | |
| audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None) | |
| audio_denoised = inverse(audio_spec_denoised, audio_angles) | |
| return audio_denoised | |
| def stft(x, fft_size, hopsamp): | |
| """Compute and return the STFT of the supplied time domain signal x. | |
| Args: | |
| x (1-dim Numpy array): A time domain signal. | |
| fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used. | |
| hopsamp (int): | |
| Returns: | |
| The STFT. The rows are the time slices and columns are the frequency bins. | |
| """ | |
| window = np.hanning(fft_size) | |
| fft_size = int(fft_size) | |
| hopsamp = int(hopsamp) | |
| return np.array( | |
| [ | |
| np.fft.rfft(window * x[i : i + fft_size]) | |
| for i in range(0, len(x) - fft_size, hopsamp) | |
| ] | |
| ) | |
| def istft(X, fft_size, hopsamp): | |
| """Invert a STFT into a time domain signal. | |
| Args: | |
| X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins. | |
| fft_size (int): | |
| hopsamp (int): The hop size, in samples. | |
| Returns: | |
| The inverse STFT. | |
| """ | |
| fft_size = int(fft_size) | |
| hopsamp = int(hopsamp) | |
| window = np.hanning(fft_size) | |
| time_slices = X.shape[0] | |
| len_samples = int(time_slices * hopsamp + fft_size) | |
| x = np.zeros(len_samples) | |
| for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)): | |
| x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n])) | |
| return x | |
| def inverse(magnitude, phase): | |
| recombine_magnitude_phase = np.concatenate( | |
| [magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1 | |
| ) | |
| x_org = recombine_magnitude_phase | |
| n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence | |
| x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64) | |
| x.real = x_org[:, : n_f // 2] | |
| x.imag = x_org[:, n_f // 2 :] | |
| inverse_transform = [] | |
| for y in x: | |
| y_ = istft(y.T, fft_size=1024, hopsamp=256) | |
| inverse_transform.append(y_[None, :]) | |
| inverse_transform = np.concatenate(inverse_transform, 0) | |
| return inverse_transform | |
| def transform(input_data): | |
| x = input_data | |
| real_part = [] | |
| imag_part = [] | |
| for y in x: | |
| y_ = stft(y, fft_size=1024, hopsamp=256).T | |
| real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object | |
| imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object | |
| real_part = np.concatenate(real_part, 0) | |
| imag_part = np.concatenate(imag_part, 0) | |
| magnitude = np.sqrt(real_part**2 + imag_part**2) | |
| phase = np.arctan2(imag_part.data, real_part.data) | |
| return magnitude, phase | |
| #@app.get("/") | |
| #async def read_root(request: Request): | |
| # return templates.TemplateResponse("interface.html", {"request": request}) | |
| if __name__ == "__main__": | |
| # main() | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |
| # main() | |
| # pass | |
| # app() | |
| # Create an instance of the FastAPI class | |
| #app = main() | |
| # Define a route for the root endpoint | |
| #def read_root(): | |
| # return {"message": "Hello, World!"} |