kokoro_test / app.py
dattazigzag's picture
trying font
8f3b9ad verified
import gradio as gr
import numpy as np
import logging
import warnings
import torch
import re
import time
from kokoro import KPipeline
import os
# import shutil
import soundfile as sf # You need to pip install soundfile
AUDIO_DIR = "audio_exports"
AUDIO_FILE_PATH = None
# Configure logging and suppress warnings
logging.basicConfig(level=logging.INFO)
warnings.filterwarnings("ignore", category=UserWarning, module="torch.nn.modules.rnn")
warnings.filterwarnings(
"ignore", category=FutureWarning, module="torch.nn.utils.weight_norm"
)
# Create output directory if it doesn't exist
os.makedirs(AUDIO_DIR, exist_ok=True)
# Initialize global variables
LANG_CODE = "a" # Default to American English
PIPELINE = None
CURRENT_VOICE = "af_bella" # Default voice
# Timing metrics
PIPELINE_LOAD_TIME = 0
AUDIO_GEN_TIME = 0
loading_time_box = None
# Mapping from human-readable to Kokoro language codes
LANG_MAP = {
"American English (en-us)": "a",
"British English (en-gb)": "b",
"Spanish (es)": "e",
"French (fr-fr)": "f",
"Hindi (hi)": "h",
"Italian (it)": "i",
"Japanese (ja)": "j",
"Brazilian Portuguese (pt-br)": "p",
"Mandarin Chinese (zh)": "z",
}
# Reverse mapping for display
CODE_TO_LANG = {v: k for k, v in LANG_MAP.items()}
# Complete list of all voices by language
ALL_VOICES = {
"a": [
"af_heart",
"af_alloy",
"af_aoede",
"af_bella",
"af_jessica",
"af_kore",
"af_nicole",
"af_nova",
"af_river",
"af_sarah",
"af_sky",
"am_adam",
"am_echo",
"am_eric",
"am_fenrir",
"am_liam",
"am_michael",
"am_onyx",
"am_puck",
"am_santa",
],
"b": [
"bf_alice",
"bf_emma",
"bf_isabella",
"bf_lily",
"bm_daniel",
"bm_fable",
"bm_george",
"bm_lewis",
],
"e": ["ef_dora", "em_alex", "em_santa"],
"f": ["ff_siwis"],
"h": ["hf_alpha", "hf_beta", "hm_omega", "hm_psi"],
"i": ["if_sara", "im_nicola"],
"j": ["jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo"],
"p": ["pf_dora", "pm_alex", "pm_santa"],
"z": [
"zf_xiaobei",
"zf_xiaoni",
"zf_xiaoxiao",
"zf_xiaoyi",
"zm_yunjian",
"zm_yunxi",
"zm_yunxia",
"zm_yunyang",
],
}
# Voice ratings (A, B, C, etc.) for voice recommendation
VOICE_RATINGS = {
"af_heart": "A",
"af_bella": "A-",
"af_nicole": "B-",
"bf_emma": "B-",
"ff_siwis": "B-",
}
# Add generic ratings for all other voices
for lang_code, voices in ALL_VOICES.items():
for voice in voices:
if voice not in VOICE_RATINGS:
if voice.startswith(lang_code + "f_"): # Female voices generally better
VOICE_RATINGS[voice] = "C+"
else:
VOICE_RATINGS[voice] = "C"
# Split pattern presets
SPLIT_PATTERNS = {
"Paragraphs (one or more newlines)": r"\n+",
"Sentences (periods, question marks, exclamation points)": r"(?<=[.!?])\s+",
"Commas and semicolons": r"[,;]\s+",
"No splitting (process as one chunk)": r"$^", # Pattern that won't match anything
"Custom": "custom",
}
# Flatten all voices list for full selection
ALL_VOICES_FLAT = []
for voices in ALL_VOICES.values():
ALL_VOICES_FLAT.extend(voices)
# Initialize pipeline
def init_pipeline(lang_code="a"):
"""
Initialize or reload the Kokoro pipeline for a specific language
"""
global PIPELINE, LANG_CODE, PIPELINE_LOAD_TIME
print(f"Initializing pipeline for language code: {lang_code}")
# Track loading time
start_time = time.time()
# Load the pipeline
LANG_CODE = lang_code
PIPELINE = KPipeline(lang_code=lang_code, repo_id="hexgrad/Kokoro-82M")
# Calculate loading time
PIPELINE_LOAD_TIME = time.time() - start_time
# Log language change
lang_name = CODE_TO_LANG.get(lang_code, f"Unknown ({lang_code})")
print(f"Pipeline loaded for {lang_name} in {PIPELINE_LOAD_TIME:.6f} seconds")
return PIPELINE, PIPELINE_LOAD_TIME
# Initialize the default pipeline
PIPELINE, PIPELINE_LOAD_TIME = init_pipeline(LANG_CODE)
def preview_text_splitting(text, split_pattern):
"""
Preview how text will be split based on the pattern
"""
try:
if split_pattern == "$^": # Special case for no splitting
return [text]
chunks = re.split(split_pattern, text)
# Filter out empty chunks
chunks = [chunk.strip() for chunk in chunks if chunk.strip()]
return chunks
except Exception as e:
return [f"Error previewing split: {e}"]
def generate_audio(text, voice, split_pattern=r"\n+", speed=1.0, output_dir=AUDIO_DIR):
"""
Generate audio using pure Kokoro with support for splitting
Args:
text: Text to synthesize
voice: Voice to use
split_pattern: Pattern to split text into chunks
speed: Speech speed
output_dir: Directory to save audio files
Returns:
Tuple of (audio_tuple, phonemes, split_info, timing_info)
"""
global PIPELINE, CURRENT_VOICE, AUDIO_GEN_TIME
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Update current voice
if voice != CURRENT_VOICE:
print(f"Voice changed from {CURRENT_VOICE} to {voice}")
CURRENT_VOICE = voice
# Handle "No splitting" special case
actual_split_pattern = split_pattern
if split_pattern == "$^":
print("Using no-split mode (processing as one chunk)")
# Preview how text will be split
chunks_preview = preview_text_splitting(text, actual_split_pattern)
split_info = f"Text split into {len(chunks_preview)} chunks using pattern: '{actual_split_pattern}'"
print(split_info)
# Process text
all_audio = []
all_phonemes = []
sample_rate = 24000 # Kokoro's sample rate
# Timing metrics
chunk_gen_times = []
chunk_save_times = []
generator_init_time = 0
# Measure generator creation time too
generator_start_time = time.time()
generator = PIPELINE(
text, voice=voice, speed=speed, split_pattern=actual_split_pattern
)
generator_init_time = time.time() - generator_start_time
print(f"Generator initialization: {generator_init_time:.6f}s")
# Start tracking overall generation time and iteration time
gen_start_time = time.time()
iter_start_time = time.time()
for i, (gs, ps, audio) in enumerate(generator):
# Track time for this chunk
chunk_start_time = time.time()
# Save the phonemes for each chunk
all_phonemes.append(f"Chunk {i + 1}: {ps}")
# Convert PyTorch tensor to NumPy array if needed
if isinstance(audio, torch.Tensor):
audio_chunk = audio.detach().cpu().numpy()
else:
audio_chunk = audio
all_audio.append(audio_chunk)
# Calculate chunk generation time
chunk_gen_time = time.time() - chunk_start_time
chunk_gen_times.append(chunk_gen_time)
print(f"Chunk {i + 1} generated in {chunk_gen_time:.6f}s")
# Save individual chunk to file
save_start_time = time.time()
chunk_filename = os.path.join(output_dir, f"chunk_{i + 1}_{voice}.wav")
sf.write(chunk_filename, audio_chunk, sample_rate)
chunk_save_time = time.time() - save_start_time
chunk_save_times.append(chunk_save_time)
print(f"Chunk {i + 1} saved to {chunk_filename} in {chunk_save_time:.6f}s")
# Calculate iteration time (includes Kokoro processing)
iter_total_time = time.time() - iter_start_time
print(f"Total iteration time: {iter_total_time:.6f}s")
# Calculate the "hidden" Kokoro processing time by subtracting our measured components
sum_chunk_processing = sum(chunk_gen_times) + sum(chunk_save_times)
kokoro_processing_time = iter_total_time - sum_chunk_processing
# Time to combine chunks
combine_start_time = time.time()
if len(all_audio) > 1:
audio_data = np.concatenate(all_audio)
combine_time = time.time() - combine_start_time
print(f"Combined {len(all_audio)} chunks in {combine_time:.6f}s")
else:
audio_data = all_audio[0] if all_audio else np.array([])
combine_time = 0
# Time to save combined file
save_combined_start = time.time()
combined_filename = os.path.join(output_dir, f"combined_{voice}.wav")
sf.write(combined_filename, audio_data, sample_rate)
save_combined_time = time.time() - save_combined_start
print(f"Combined audio saved to {combined_filename} in {save_combined_time:.6f}s")
# Total time
AUDIO_GEN_TIME = time.time() - gen_start_time
# Create detailed timing info
chunks_count = len(all_audio)
timing_lines = []
# Add Kokoro processing time
timing_lines.append(f"Kokoro processing time: {kokoro_processing_time:.6f}s")
# Per-chunk timing
if chunks_count > 1:
timing_lines.append("\nChunk details:")
for i, (t, s) in enumerate(zip(chunk_gen_times, chunk_save_times)):
timing_lines.append(f" Chunk {i + 1}: Gen {t:.6f}s, Save {s:.6f}s")
# Post-processing timing
if chunks_count > 1:
timing_lines.append(f"\nCombine chunks: {combine_time:.6f}s")
timing_lines.append(f"Save combined: {save_combined_time:.6f}s")
# Overall timing
post_processing = (
sum(chunk_gen_times) + sum(chunk_save_times) + combine_time + save_combined_time
)
timing_lines.append(f"\nTotal Kokoro time: {kokoro_processing_time:.6f}s")
timing_lines.append(f"Total post-processing: {post_processing:.6f}s")
timing_lines.append(f"Total processing time: {AUDIO_GEN_TIME:.6f}s")
# Format timing info for display
timing_info = "\n".join(timing_lines)
# Combine phonemes
phonemes = "\n\n".join(all_phonemes)
# Update split info
if chunks_count > 1:
split_info = (
f"Text was split into {chunks_count} chunks and saved to {output_dir}"
)
else:
split_info = f"Text processed as a single chunk and saved to {output_dir}"
return (sample_rate, audio_data), phonemes, split_info, timing_info
def on_language_change(language_display):
"""
Handle language change by reloading the pipeline
"""
global PIPELINE, LANG_CODE, PIPELINE_LOAD_TIME
# Get language code from display name
new_lang_code = LANG_MAP.get(language_display, "a")
# Only reload if language changed
if new_lang_code != LANG_CODE:
print(
f"Language changed from {LANG_CODE} to {new_lang_code}. Reloading pipeline..."
)
PIPELINE, PIPELINE_LOAD_TIME = init_pipeline(new_lang_code)
# Recommend voices for this language
recommended_voices = []
# Find the top-rated voices for this language
for voice in ALL_VOICES.get(new_lang_code, []):
if voice in VOICE_RATINGS and VOICE_RATINGS[voice] in ["A", "A-", "B", "B-"]:
recommended_voices.append(f"{voice} ({VOICE_RATINGS[voice]})")
# If no high-rated voices, just take the first few
if not recommended_voices and new_lang_code in ALL_VOICES:
recommended_voices = [f"{v}" for v in ALL_VOICES[new_lang_code][:3]]
recommendation_text = f"Language changed to {language_display}. Pipeline loaded in {PIPELINE_LOAD_TIME:.6f} seconds."
if recommended_voices:
recommendation_text += f"\nRecommended voices: {', '.join(recommended_voices)}"
return recommendation_text, f"{PIPELINE_LOAD_TIME:.6f}s"
def on_split_pattern_change(pattern_name, custom_pattern):
"""
Handle changes to the split pattern selection
"""
if pattern_name == "Custom":
return custom_pattern, gr.update(visible=True)
else:
return SPLIT_PATTERNS[pattern_name], gr.update(visible=False)
def preview_splits(text, pattern):
"""
Preview how text will be split based on the pattern
"""
chunks = preview_text_splitting(text, pattern)
if len(chunks) == 1 and pattern == "$^":
return "Text will be processed as a single chunk (no splitting)"
result = f"Text will be split into {len(chunks)} chunks:\n\n"
for i, chunk in enumerate(chunks):
# Truncate very long chunks in the preview
display_chunk = chunk[:100] + "..." if len(chunk) > 100 else chunk
result += f"Chunk {i + 1}: {display_chunk}\n\n"
return result
def create_app():
global loading_time_box, PIPELINE_LOAD_TIME
with gr.Blocks(theme=gr.themes.Soft(font=[gr.themes.GoogleFont("Lato"), gr.themes.GoogleFont("Roboto"), "system-ui", "sans-serif"])) as ui:
# Title
gr.Markdown("# Kokoro TTS Demo")
gr.Markdown("#### Pure Kokoro Implementation with Enhanced Text Splitting")
# Status message for language/voice changes
status_message = gr.Markdown("")
# Input controls
with gr.Row():
with gr.Column(scale=1):
text_input = gr.TextArea(
label="Input Text",
value="Hello!\n\nThis is a multi-paragraph test.\nWith multiple lines.\n\nKokoro can split on paragraphs, sentences, or other patterns.",
lines=8,
)
# Information about split patterns
with gr.Accordion("About Text Splitting in Kokoro", open=False):
gr.Markdown("""
### Understanding Text Splitting
The splitting pattern controls how Kokoro breaks your text into manageable chunks for processing.
**Common patterns:**
- `\\n+`: Split on one or more newlines (paragraphs)
- `(?<=[.!?])\\s+`: Split after periods, question marks, and exclamation points (sentences)
- `[,;]\\s+`: Split after commas and semicolons
- `$^`: Special pattern that won't match anything (processes the entire text as one chunk)
**Benefits of splitting:**
- Better phrasing and natural pauses
- Improved handling of longer texts
- More consistent pronunciation across chunks
**When to use different patterns:**
- Paragraph splits: Good for clearly separated content
- Sentence splits: Maintains sentence integrity but creates more natural breaks
- No splitting: Best for very short texts or when you want continuous flow
The preview feature lets you see exactly how your text will be divided before generating audio.
""")
# Split Pattern Selection
split_pattern_dropdown = gr.Dropdown(
label="Split Text Using",
value="Paragraphs (one or more newlines)",
choices=list(SPLIT_PATTERNS.keys()),
info="Select how to split your text into chunks",
)
custom_pattern_input = gr.Textbox(
label="Custom Split Pattern (Regular Expression)",
value=r"\n+",
visible=False,
info="Enter a custom regex pattern for splitting text",
)
preview_button = gr.Button("Preview Text Splitting")
split_preview = gr.Textbox(
label="Split Preview",
value="Click 'Preview Text Splitting' to see how your text will be divided",
lines=5,
)
with gr.Column(scale=1):
# Language selection
language_input = gr.Dropdown(
label="Language",
value="American English (en-us)",
choices=list(LANG_MAP.keys()),
info="Select the language for text processing",
)
# loading_time_box = gr.Textbox(label="Model Loading time", lines=1)
loading_time_box = gr.Label(
label="Lang loaded in", value=f"{PIPELINE_LOAD_TIME:.6f}s"
)
# Voice selection with grouping
with gr.Accordion("Voice Selection", open=True):
voice_input = gr.Dropdown(
label="Voice",
value="af_bella",
choices=sorted(ALL_VOICES_FLAT),
info="Select voice for synthesis",
)
gr.Markdown("""
**Voice naming convention**:
- First letter = language: a=American, b=British, f=French, etc.
- Second letter = gender: f=female, m=male
- After underscore = voice name
""")
# Speed slider
speed_input = gr.Slider(
label="Speech Speed",
minimum=0.5,
maximum=1.5,
value=1.0,
step=0.1,
info="Adjust speaking rate",
)
with gr.Column(scale=1):
# Generate button
submit_button = gr.Button("Generate Audio", variant="primary")
# Outputs
audio_output = gr.Audio(
label="Generated Audio", format="wav", show_download_button=True
)
audio_gen_timing_output = gr.Textbox(
label="Performance Metrics", lines=12
)
phonemes_output = gr.Textbox(label="Phoneme Representation", lines=10)
split_info_output = gr.Textbox(label="Processing Information", lines=5)
# Handle language change
language_input.change(
fn=on_language_change,
inputs=[language_input],
outputs=[status_message, loading_time_box],
)
# Handle split pattern change
split_pattern_dropdown.change(
fn=on_split_pattern_change,
inputs=[split_pattern_dropdown, custom_pattern_input],
outputs=[custom_pattern_input, custom_pattern_input],
)
# Preview splitting button
preview_button.click(
fn=preview_splits,
inputs=[text_input, custom_pattern_input],
outputs=[split_preview],
)
# Button click handler
# def on_generate(text, language_display, voice, split_pattern, speed):
# # Generate the audio
# audio_tuple, phonemes, split_info, timing_info = generate_audio(
# text, voice, split_pattern=split_pattern, speed=speed
# )
# # Return results
# return audio_tuple, timing_info, phonemes, split_info
def on_generate(text, language_display, voice, split_pattern, speed):
# Generate the audio with output directory
audio_tuple, phonemes, split_info, timing_info = generate_audio(
text,
voice,
split_pattern=split_pattern,
speed=speed,
output_dir=AUDIO_DIR, # Add this parameter
)
# Return results
return audio_tuple, timing_info, phonemes, split_info
submit_button.click(
fn=on_generate,
inputs=[
text_input,
language_input,
voice_input,
custom_pattern_input,
speed_input,
],
outputs=[
audio_output,
audio_gen_timing_output,
phonemes_output,
split_info_output,
],
)
return ui
# Create and launch the app
ui = create_app()
ui.launch(
debug=True,
server_name="0.0.0.0", # Make accessible externally
server_port=7860, # Choose your port
share=True, # Set to True if you want a public link
)