|
from kokoro import KModel, KPipeline
|
|
import gradio as gr
|
|
import os
|
|
import random
|
|
import torch
|
|
import logging
|
|
import soundfile as sf
|
|
|
|
|
|
try:
|
|
from resemblyzer import VoiceEncoder, preprocess_wav
|
|
encoder = VoiceEncoder()
|
|
except ImportError:
|
|
encoder = None
|
|
|
|
|
|
VOICE_DIR = r"D:\New folder (2)\model\voices"
|
|
OUTPUT_DIR = r"D:\New folder (2)\output_audio"
|
|
TEXT = "Hello, this is a test of the Kokoro TTS system."
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
CUDA_AVAILABLE = torch.cuda.is_available()
|
|
device = "cuda" if CUDA_AVAILABLE else "cpu"
|
|
logger.info(f"Using hardware: {device}")
|
|
|
|
|
|
models = {gpu: KModel("hexgrad/Kokoro-82M").to("cuda" if gpu else "cpu").eval() for gpu in [False] + ([True] if CUDA_AVAILABLE else [])}
|
|
|
|
|
|
pipelines = {
|
|
'a': KPipeline(model=models[False], lang_code='a', device='cpu'),
|
|
'b': KPipeline(model=models[False], lang_code='b', device='cpu')
|
|
}
|
|
|
|
|
|
try:
|
|
pipelines["a"].g2p.lexicon.golds["kokoro"] = "kΛOkΙΙΉO"
|
|
pipelines["b"].g2p.lexicon.golds["kokoro"] = "kΛQkΙΙΉQ"
|
|
except AttributeError as e:
|
|
logger.warning(f"Could not set custom pronunciations: {e}")
|
|
|
|
def forward_gpu(text, voice_path, speed):
|
|
|
|
pipeline = pipelines[voice_path[0]]
|
|
|
|
pipeline.model = models[True]
|
|
generator = pipeline(text, voice=voice_path, speed=speed)
|
|
for _, _, audio in generator:
|
|
return audio
|
|
return None
|
|
|
|
def generate_first(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE, clone_voice_file=None):
|
|
voice_path = os.path.join(VOICE_DIR, voice)
|
|
if not os.path.exists(voice_path):
|
|
raise FileNotFoundError(f"Voice file not found: {voice_path}")
|
|
|
|
pipeline = pipelines[voice[0]]
|
|
|
|
|
|
if clone_voice_file is not None and encoder is not None:
|
|
try:
|
|
|
|
wav = preprocess_wav(clone_voice_file)
|
|
cloned_voice = torch.tensor(encoder.embed_utterance(wav), device=device).unsqueeze(0)
|
|
temp_voice_path = os.path.join(VOICE_DIR, "cloned_voice.pt")
|
|
torch.save(cloned_voice, temp_voice_path)
|
|
voice_path = temp_voice_path
|
|
except Exception as e:
|
|
logger.error(f"Error cloning voice: {e}")
|
|
voice_path = os.path.join(VOICE_DIR, voice)
|
|
|
|
use_gpu = use_gpu and CUDA_AVAILABLE
|
|
try:
|
|
if use_gpu:
|
|
audio = forward_gpu(text, voice_path, speed)
|
|
else:
|
|
pipeline.model = models[False]
|
|
generator = pipeline(text, voice=voice_path, speed=speed)
|
|
for _, ps, audio in generator:
|
|
return (24000, audio.numpy()), ps
|
|
except gr.exceptions.Error as e:
|
|
if use_gpu:
|
|
gr.Warning(str(e))
|
|
gr.Info("Retrying with CPU. To avoid this error, change Hardware to CPU.")
|
|
pipeline.model = models[False]
|
|
generator = pipeline(text, voice=voice_path, speed=speed)
|
|
for _, ps, audio in generator:
|
|
return (24000, audio.numpy()), ps
|
|
else:
|
|
raise gr.Error(e)
|
|
return None, ""
|
|
|
|
def predict(text, voice="af_bella.pt", speed=1):
|
|
return generate_first(text, voice, speed, use_gpu=False)[0]
|
|
|
|
def tokenize_first(text, voice="af_bella.pt"):
|
|
voice_path = os.path.join(VOICE_DIR, voice)
|
|
if not os.path.exists(voice_path):
|
|
raise FileNotFoundError(f"Voice file not found: {voice_path}")
|
|
|
|
pipeline = pipelines[voice[0]]
|
|
generator = pipeline(text, voice=voice_path)
|
|
for _, ps, _ in generator:
|
|
return ps
|
|
return ""
|
|
|
|
def generate_all(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE):
|
|
voice_path = os.path.join(VOICE_DIR, voice)
|
|
if not os.path.exists(voice_path):
|
|
raise FileNotFoundError(f"Voice file not found: {voice_path}")
|
|
|
|
pipeline = pipelines[voice[0]]
|
|
use_gpu = use_gpu and CUDA_AVAILABLE
|
|
first = True
|
|
if use_gpu:
|
|
pipeline.model = models[True]
|
|
else:
|
|
pipeline.model = models[False]
|
|
generator = pipeline(text, voice=voice_path, speed=speed)
|
|
for _, _, audio in generator:
|
|
yield 24000, audio.numpy()
|
|
if first:
|
|
first = False
|
|
yield 24000, torch.zeros(1).numpy()
|
|
|
|
|
|
try:
|
|
with open("en.txt", "r") as r:
|
|
random_quotes = [line.strip() for line in r]
|
|
except FileNotFoundError:
|
|
random_quotes = ["Hello, this is a test of the Kokoro TTS system."]
|
|
|
|
def get_random_quote():
|
|
return random.choice(random_quotes)
|
|
|
|
def get_gatsby():
|
|
try:
|
|
with open("gatsby5k.md", "r") as r:
|
|
return r.read().strip()
|
|
except FileNotFoundError:
|
|
return "The Great Gatsby text not found."
|
|
|
|
def get_frankenstein():
|
|
try:
|
|
with open("frankenstein5k.md", "r") as r:
|
|
return r.read().strip()
|
|
except FileNotFoundError:
|
|
return "Frankenstein text not found."
|
|
|
|
|
|
def load_voice_choices():
|
|
voice_files = [f for f in os.listdir(VOICE_DIR) if f.endswith('.pt')]
|
|
choices = {}
|
|
for voice_file in voice_files:
|
|
|
|
prefix = voice_file[:2]
|
|
if prefix == 'af':
|
|
label = f"πΊπΈ πΊ {voice_file[3:-3].capitalize()}"
|
|
elif prefix == 'am':
|
|
label = f"πΊπΈ πΉ {voice_file[3:-3].capitalize()}"
|
|
elif prefix == 'bf':
|
|
label = f"π¬π§ πΊ {voice_file[3:-3].capitalize()}"
|
|
elif prefix == 'bm':
|
|
label = f"π¬π§ πΉ {voice_file[3:-3].capitalize()}"
|
|
else:
|
|
label = f"Unknown {voice_file[:-3]}"
|
|
choices[label] = voice_file
|
|
return choices
|
|
|
|
CHOICES = load_voice_choices()
|
|
|
|
|
|
for label, voice_path in CHOICES.items():
|
|
full_path = os.path.join(VOICE_DIR, voice_path)
|
|
if not os.path.exists(full_path):
|
|
logger.warning(f"Voice file not found: {full_path}")
|
|
else:
|
|
logger.info(f"Loaded voice: {label} ({voice_path})")
|
|
|
|
|
|
if not CHOICES:
|
|
logger.warning("No voice files found in VOICE_DIR. Adding a placeholder.")
|
|
CHOICES = {"πΊπΈ πΊ Bella π₯": "af_bella.pt"}
|
|
|
|
TOKEN_NOTE = '''
|
|
π‘ Customize pronunciation with Markdown link syntax and /slashes/ like [Kokoro](/kΛOkΙΙΉO/)
|
|
|
|
π¬ To adjust intonation, try punctuation ;:,.!?ββ¦"()ββ or stress Λ and Λ
|
|
|
|
β¬οΈ Lower stress [1 level](-1) or [2 levels](-2)
|
|
|
|
β¬οΈ Raise stress 1 level [or](+2) 2 levels (only works on less stressed, usually short words)
|
|
'''
|
|
|
|
with gr.Blocks() as generate_tab:
|
|
out_audio = gr.Audio(label="Output Audio", interactive=False, streaming=False, autoplay=True)
|
|
generate_btn = gr.Button("Generate", variant="primary")
|
|
with gr.Accordion("Output Tokens", open=True):
|
|
out_ps = gr.Textbox(interactive=False, show_label=False,
|
|
info="Tokens used to generate the audio, up to 510 context length.")
|
|
tokenize_btn = gr.Button("Tokenize", variant="secondary")
|
|
gr.Markdown(TOKEN_NOTE)
|
|
predict_btn = gr.Button("Predict", variant="secondary", visible=False)
|
|
|
|
with gr.Blocks() as stream_tab:
|
|
out_stream = gr.Audio(label="Output Audio Stream", interactive=False, streaming=True, autoplay=True)
|
|
with gr.Row():
|
|
stream_btn = gr.Button("Stream", variant="primary")
|
|
stop_btn = gr.Button("Stop", variant="stop")
|
|
with gr.Accordion("Note", open=True):
|
|
gr.Markdown("β οΈ There is an unknown Gradio bug that might yield no audio the first time you click Stream.")
|
|
gr.DuplicateButton()
|
|
|
|
API_OPEN = True
|
|
with gr.Blocks() as app:
|
|
with gr.Row():
|
|
with gr.Column():
|
|
text = gr.Textbox(label="Input Text", info="Arbitrarily many characters supported")
|
|
with gr.Row():
|
|
voice = gr.Dropdown(list(CHOICES.items()), value="af_bella.pt" if "af_bella.pt" in CHOICES.values() else list(CHOICES.values())[0], label="Voice",
|
|
info="Quality and availability vary by language")
|
|
use_gpu = gr.Dropdown(
|
|
[("GPU π", True), ("CPU π", False)],
|
|
value=CUDA_AVAILABLE,
|
|
label="Hardware",
|
|
info="GPU is usually faster, but may require CUDA support",
|
|
interactive=CUDA_AVAILABLE
|
|
)
|
|
speed = gr.Slider(minimum=0.5, maximum=2, value=1, step=0.1, label="Speed")
|
|
clone_voice_file = gr.File(label="Clone Voice Sample (Optional)", file_count="single", type="filepath")
|
|
random_btn = gr.Button("π² Random Quote π¬", variant="secondary")
|
|
with gr.Row():
|
|
gatsby_btn = gr.Button("π₯ Gatsby π", variant="secondary")
|
|
frankenstein_btn = gr.Button("π Frankenstein π", variant="secondary")
|
|
with gr.Column():
|
|
gr.TabbedInterface([generate_tab, stream_tab], ["Generate", "Stream"])
|
|
random_btn.click(fn=get_random_quote, inputs=[], outputs=[text])
|
|
gatsby_btn.click(fn=get_gatsby, inputs=[], outputs=[text])
|
|
frankenstein_btn.click(fn=get_frankenstein, inputs=[], outputs=[text])
|
|
generate_btn.click(fn=generate_first, inputs=[text, voice, speed, use_gpu, clone_voice_file],
|
|
outputs=[out_audio, out_ps])
|
|
tokenize_btn.click(fn=tokenize_first, inputs=[text, voice], outputs=[out_ps])
|
|
stream_event = stream_btn.click(fn=generate_all, inputs=[text, voice, speed, use_gpu], outputs=[out_stream])
|
|
stop_btn.click(fn=None, cancels=[stream_event])
|
|
predict_btn.click(fn=predict, inputs=[text, voice, speed], outputs=[out_audio])
|
|
|
|
if __name__ == "__main__":
|
|
app.queue(api_open=API_OPEN).launch(
|
|
server_name="127.0.0.1",
|
|
server_port=40001,
|
|
show_api=API_OPEN,
|
|
inbrowser=True
|
|
) |