import gradio as gr import numpy as np import logging import warnings import torch import re import time from kokoro import KPipeline import os # import shutil import soundfile as sf # You need to pip install soundfile AUDIO_DIR = "audio_exports" AUDIO_FILE_PATH = None # Configure logging and suppress warnings logging.basicConfig(level=logging.INFO) warnings.filterwarnings("ignore", category=UserWarning, module="torch.nn.modules.rnn") warnings.filterwarnings( "ignore", category=FutureWarning, module="torch.nn.utils.weight_norm" ) # Create output directory if it doesn't exist os.makedirs(AUDIO_DIR, exist_ok=True) # Initialize global variables LANG_CODE = "a" # Default to American English PIPELINE = None CURRENT_VOICE = "af_bella" # Default voice # Timing metrics PIPELINE_LOAD_TIME = 0 AUDIO_GEN_TIME = 0 loading_time_box = None # Mapping from human-readable to Kokoro language codes LANG_MAP = { "American English (en-us)": "a", "British English (en-gb)": "b", "Spanish (es)": "e", "French (fr-fr)": "f", "Hindi (hi)": "h", "Italian (it)": "i", "Japanese (ja)": "j", "Brazilian Portuguese (pt-br)": "p", "Mandarin Chinese (zh)": "z", } # Reverse mapping for display CODE_TO_LANG = {v: k for k, v in LANG_MAP.items()} # Complete list of all voices by language ALL_VOICES = { "a": [ "af_heart", "af_alloy", "af_aoede", "af_bella", "af_jessica", "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", "am_michael", "am_onyx", "am_puck", "am_santa", ], "b": [ "bf_alice", "bf_emma", "bf_isabella", "bf_lily", "bm_daniel", "bm_fable", "bm_george", "bm_lewis", ], "e": ["ef_dora", "em_alex", "em_santa"], "f": ["ff_siwis"], "h": ["hf_alpha", "hf_beta", "hm_omega", "hm_psi"], "i": ["if_sara", "im_nicola"], "j": ["jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo"], "p": ["pf_dora", "pm_alex", "pm_santa"], "z": [ "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang", ], } # Voice ratings (A, B, C, etc.) for voice recommendation VOICE_RATINGS = { "af_heart": "A", "af_bella": "A-", "af_nicole": "B-", "bf_emma": "B-", "ff_siwis": "B-", } # Add generic ratings for all other voices for lang_code, voices in ALL_VOICES.items(): for voice in voices: if voice not in VOICE_RATINGS: if voice.startswith(lang_code + "f_"): # Female voices generally better VOICE_RATINGS[voice] = "C+" else: VOICE_RATINGS[voice] = "C" # Split pattern presets SPLIT_PATTERNS = { "Paragraphs (one or more newlines)": r"\n+", "Sentences (periods, question marks, exclamation points)": r"(?<=[.!?])\s+", "Commas and semicolons": r"[,;]\s+", "No splitting (process as one chunk)": r"$^", # Pattern that won't match anything "Custom": "custom", } # Flatten all voices list for full selection ALL_VOICES_FLAT = [] for voices in ALL_VOICES.values(): ALL_VOICES_FLAT.extend(voices) # Initialize pipeline def init_pipeline(lang_code="a"): """ Initialize or reload the Kokoro pipeline for a specific language """ global PIPELINE, LANG_CODE, PIPELINE_LOAD_TIME print(f"Initializing pipeline for language code: {lang_code}") # Track loading time start_time = time.time() # Load the pipeline LANG_CODE = lang_code PIPELINE = KPipeline(lang_code=lang_code, repo_id="hexgrad/Kokoro-82M") # Calculate loading time PIPELINE_LOAD_TIME = time.time() - start_time # Log language change lang_name = CODE_TO_LANG.get(lang_code, f"Unknown ({lang_code})") print(f"Pipeline loaded for {lang_name} in {PIPELINE_LOAD_TIME:.6f} seconds") return PIPELINE, PIPELINE_LOAD_TIME # Initialize the default pipeline PIPELINE, PIPELINE_LOAD_TIME = init_pipeline(LANG_CODE) def preview_text_splitting(text, split_pattern): """ Preview how text will be split based on the pattern """ try: if split_pattern == "$^": # Special case for no splitting return [text] chunks = re.split(split_pattern, text) # Filter out empty chunks chunks = [chunk.strip() for chunk in chunks if chunk.strip()] return chunks except Exception as e: return [f"Error previewing split: {e}"] def generate_audio(text, voice, split_pattern=r"\n+", speed=1.0, output_dir=AUDIO_DIR): """ Generate audio using pure Kokoro with support for splitting Args: text: Text to synthesize voice: Voice to use split_pattern: Pattern to split text into chunks speed: Speech speed output_dir: Directory to save audio files Returns: Tuple of (audio_tuple, phonemes, split_info, timing_info) """ global PIPELINE, CURRENT_VOICE, AUDIO_GEN_TIME # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Update current voice if voice != CURRENT_VOICE: print(f"Voice changed from {CURRENT_VOICE} to {voice}") CURRENT_VOICE = voice # Handle "No splitting" special case actual_split_pattern = split_pattern if split_pattern == "$^": print("Using no-split mode (processing as one chunk)") # Preview how text will be split chunks_preview = preview_text_splitting(text, actual_split_pattern) split_info = f"Text split into {len(chunks_preview)} chunks using pattern: '{actual_split_pattern}'" print(split_info) # Process text all_audio = [] all_phonemes = [] sample_rate = 24000 # Kokoro's sample rate # Timing metrics chunk_gen_times = [] chunk_save_times = [] generator_init_time = 0 # Measure generator creation time too generator_start_time = time.time() generator = PIPELINE( text, voice=voice, speed=speed, split_pattern=actual_split_pattern ) generator_init_time = time.time() - generator_start_time print(f"Generator initialization: {generator_init_time:.6f}s") # Start tracking overall generation time and iteration time gen_start_time = time.time() iter_start_time = time.time() for i, (gs, ps, audio) in enumerate(generator): # Track time for this chunk chunk_start_time = time.time() # Save the phonemes for each chunk all_phonemes.append(f"Chunk {i + 1}: {ps}") # Convert PyTorch tensor to NumPy array if needed if isinstance(audio, torch.Tensor): audio_chunk = audio.detach().cpu().numpy() else: audio_chunk = audio all_audio.append(audio_chunk) # Calculate chunk generation time chunk_gen_time = time.time() - chunk_start_time chunk_gen_times.append(chunk_gen_time) print(f"Chunk {i + 1} generated in {chunk_gen_time:.6f}s") # Save individual chunk to file save_start_time = time.time() chunk_filename = os.path.join(output_dir, f"chunk_{i + 1}_{voice}.wav") sf.write(chunk_filename, audio_chunk, sample_rate) chunk_save_time = time.time() - save_start_time chunk_save_times.append(chunk_save_time) print(f"Chunk {i + 1} saved to {chunk_filename} in {chunk_save_time:.6f}s") # Calculate iteration time (includes Kokoro processing) iter_total_time = time.time() - iter_start_time print(f"Total iteration time: {iter_total_time:.6f}s") # Calculate the "hidden" Kokoro processing time by subtracting our measured components sum_chunk_processing = sum(chunk_gen_times) + sum(chunk_save_times) kokoro_processing_time = iter_total_time - sum_chunk_processing # Time to combine chunks combine_start_time = time.time() if len(all_audio) > 1: audio_data = np.concatenate(all_audio) combine_time = time.time() - combine_start_time print(f"Combined {len(all_audio)} chunks in {combine_time:.6f}s") else: audio_data = all_audio[0] if all_audio else np.array([]) combine_time = 0 # Time to save combined file save_combined_start = time.time() combined_filename = os.path.join(output_dir, f"combined_{voice}.wav") sf.write(combined_filename, audio_data, sample_rate) save_combined_time = time.time() - save_combined_start print(f"Combined audio saved to {combined_filename} in {save_combined_time:.6f}s") # Total time AUDIO_GEN_TIME = time.time() - gen_start_time # Create detailed timing info chunks_count = len(all_audio) timing_lines = [] # Add Kokoro processing time timing_lines.append(f"Kokoro processing time: {kokoro_processing_time:.6f}s") # Per-chunk timing if chunks_count > 1: timing_lines.append("\nChunk details:") for i, (t, s) in enumerate(zip(chunk_gen_times, chunk_save_times)): timing_lines.append(f" Chunk {i + 1}: Gen {t:.6f}s, Save {s:.6f}s") # Post-processing timing if chunks_count > 1: timing_lines.append(f"\nCombine chunks: {combine_time:.6f}s") timing_lines.append(f"Save combined: {save_combined_time:.6f}s") # Overall timing post_processing = ( sum(chunk_gen_times) + sum(chunk_save_times) + combine_time + save_combined_time ) timing_lines.append(f"\nTotal Kokoro time: {kokoro_processing_time:.6f}s") timing_lines.append(f"Total post-processing: {post_processing:.6f}s") timing_lines.append(f"Total processing time: {AUDIO_GEN_TIME:.6f}s") # Format timing info for display timing_info = "\n".join(timing_lines) # Combine phonemes phonemes = "\n\n".join(all_phonemes) # Update split info if chunks_count > 1: split_info = ( f"Text was split into {chunks_count} chunks and saved to {output_dir}" ) else: split_info = f"Text processed as a single chunk and saved to {output_dir}" return (sample_rate, audio_data), phonemes, split_info, timing_info def on_language_change(language_display): """ Handle language change by reloading the pipeline """ global PIPELINE, LANG_CODE, PIPELINE_LOAD_TIME # Get language code from display name new_lang_code = LANG_MAP.get(language_display, "a") # Only reload if language changed if new_lang_code != LANG_CODE: print( f"Language changed from {LANG_CODE} to {new_lang_code}. Reloading pipeline..." ) PIPELINE, PIPELINE_LOAD_TIME = init_pipeline(new_lang_code) # Recommend voices for this language recommended_voices = [] # Find the top-rated voices for this language for voice in ALL_VOICES.get(new_lang_code, []): if voice in VOICE_RATINGS and VOICE_RATINGS[voice] in ["A", "A-", "B", "B-"]: recommended_voices.append(f"{voice} ({VOICE_RATINGS[voice]})") # If no high-rated voices, just take the first few if not recommended_voices and new_lang_code in ALL_VOICES: recommended_voices = [f"{v}" for v in ALL_VOICES[new_lang_code][:3]] recommendation_text = f"Language changed to {language_display}. Pipeline loaded in {PIPELINE_LOAD_TIME:.6f} seconds." if recommended_voices: recommendation_text += f"\nRecommended voices: {', '.join(recommended_voices)}" return recommendation_text, f"{PIPELINE_LOAD_TIME:.6f}s" def on_split_pattern_change(pattern_name, custom_pattern): """ Handle changes to the split pattern selection """ if pattern_name == "Custom": return custom_pattern, gr.update(visible=True) else: return SPLIT_PATTERNS[pattern_name], gr.update(visible=False) def preview_splits(text, pattern): """ Preview how text will be split based on the pattern """ chunks = preview_text_splitting(text, pattern) if len(chunks) == 1 and pattern == "$^": return "Text will be processed as a single chunk (no splitting)" result = f"Text will be split into {len(chunks)} chunks:\n\n" for i, chunk in enumerate(chunks): # Truncate very long chunks in the preview display_chunk = chunk[:100] + "..." if len(chunk) > 100 else chunk result += f"Chunk {i + 1}: {display_chunk}\n\n" return result def create_app(): global loading_time_box, PIPELINE_LOAD_TIME with gr.Blocks(theme=gr.themes.Soft(font=[gr.themes.GoogleFont("Lato"), gr.themes.GoogleFont("Roboto"), "system-ui", "sans-serif"])) as ui: # Title gr.Markdown("# Kokoro TTS Demo") gr.Markdown("#### Pure Kokoro Implementation with Enhanced Text Splitting") # Status message for language/voice changes status_message = gr.Markdown("") # Input controls with gr.Row(): with gr.Column(scale=1): text_input = gr.TextArea( label="Input Text", value="Hello!\n\nThis is a multi-paragraph test.\nWith multiple lines.\n\nKokoro can split on paragraphs, sentences, or other patterns.", lines=8, ) # Information about split patterns with gr.Accordion("About Text Splitting in Kokoro", open=False): gr.Markdown(""" ### Understanding Text Splitting The splitting pattern controls how Kokoro breaks your text into manageable chunks for processing. **Common patterns:** - `\\n+`: Split on one or more newlines (paragraphs) - `(?<=[.!?])\\s+`: Split after periods, question marks, and exclamation points (sentences) - `[,;]\\s+`: Split after commas and semicolons - `$^`: Special pattern that won't match anything (processes the entire text as one chunk) **Benefits of splitting:** - Better phrasing and natural pauses - Improved handling of longer texts - More consistent pronunciation across chunks **When to use different patterns:** - Paragraph splits: Good for clearly separated content - Sentence splits: Maintains sentence integrity but creates more natural breaks - No splitting: Best for very short texts or when you want continuous flow The preview feature lets you see exactly how your text will be divided before generating audio. """) # Split Pattern Selection split_pattern_dropdown = gr.Dropdown( label="Split Text Using", value="Paragraphs (one or more newlines)", choices=list(SPLIT_PATTERNS.keys()), info="Select how to split your text into chunks", ) custom_pattern_input = gr.Textbox( label="Custom Split Pattern (Regular Expression)", value=r"\n+", visible=False, info="Enter a custom regex pattern for splitting text", ) preview_button = gr.Button("Preview Text Splitting") split_preview = gr.Textbox( label="Split Preview", value="Click 'Preview Text Splitting' to see how your text will be divided", lines=5, ) with gr.Column(scale=1): # Language selection language_input = gr.Dropdown( label="Language", value="American English (en-us)", choices=list(LANG_MAP.keys()), info="Select the language for text processing", ) # loading_time_box = gr.Textbox(label="Model Loading time", lines=1) loading_time_box = gr.Label( label="Lang loaded in", value=f"{PIPELINE_LOAD_TIME:.6f}s" ) # Voice selection with grouping with gr.Accordion("Voice Selection", open=True): voice_input = gr.Dropdown( label="Voice", value="af_bella", choices=sorted(ALL_VOICES_FLAT), info="Select voice for synthesis", ) gr.Markdown(""" **Voice naming convention**: - First letter = language: a=American, b=British, f=French, etc. - Second letter = gender: f=female, m=male - After underscore = voice name """) # Speed slider speed_input = gr.Slider( label="Speech Speed", minimum=0.5, maximum=1.5, value=1.0, step=0.1, info="Adjust speaking rate", ) with gr.Column(scale=1): # Generate button submit_button = gr.Button("Generate Audio", variant="primary") # Outputs audio_output = gr.Audio( label="Generated Audio", format="wav", show_download_button=True ) audio_gen_timing_output = gr.Textbox( label="Performance Metrics", lines=12 ) phonemes_output = gr.Textbox(label="Phoneme Representation", lines=10) split_info_output = gr.Textbox(label="Processing Information", lines=5) # Handle language change language_input.change( fn=on_language_change, inputs=[language_input], outputs=[status_message, loading_time_box], ) # Handle split pattern change split_pattern_dropdown.change( fn=on_split_pattern_change, inputs=[split_pattern_dropdown, custom_pattern_input], outputs=[custom_pattern_input, custom_pattern_input], ) # Preview splitting button preview_button.click( fn=preview_splits, inputs=[text_input, custom_pattern_input], outputs=[split_preview], ) # Button click handler # def on_generate(text, language_display, voice, split_pattern, speed): # # Generate the audio # audio_tuple, phonemes, split_info, timing_info = generate_audio( # text, voice, split_pattern=split_pattern, speed=speed # ) # # Return results # return audio_tuple, timing_info, phonemes, split_info def on_generate(text, language_display, voice, split_pattern, speed): # Generate the audio with output directory audio_tuple, phonemes, split_info, timing_info = generate_audio( text, voice, split_pattern=split_pattern, speed=speed, output_dir=AUDIO_DIR, # Add this parameter ) # Return results return audio_tuple, timing_info, phonemes, split_info submit_button.click( fn=on_generate, inputs=[ text_input, language_input, voice_input, custom_pattern_input, speed_input, ], outputs=[ audio_output, audio_gen_timing_output, phonemes_output, split_info_output, ], ) return ui # Create and launch the app ui = create_app() ui.launch( debug=True, server_name="0.0.0.0", # Make accessible externally server_port=7860, # Choose your port share=True, # Set to True if you want a public link )