File size: 8,029 Bytes
98c217d
ee617da
98c217d
 
fa5c1e1
1684657
1509739
799287a
98c217d
 
 
 
fa5c1e1
 
 
 
 
 
 
 
 
98c217d
 
 
 
 
92ccd47
 
 
 
 
 
 
 
 
 
 
 
98c217d
 
 
1509739
 
98c217d
 
1509739
98c217d
 
 
 
 
 
92ccd47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98c217d
92ccd47
98c217d
 
 
 
 
 
 
92ccd47
 
1509739
 
92ccd47
1509739
98c217d
 
 
92ccd47
1509739
98c217d
fa5c1e1
92ccd47
fa5c1e1
98c217d
 
 
 
fa5c1e1
 
 
 
 
 
 
 
 
 
 
 
92ccd47
fa5c1e1
 
 
 
 
 
92ccd47
1509739
92ccd47
fa5c1e1
 
 
 
 
 
92ccd47
98c217d
 
 
fa5c1e1
 
 
 
 
 
98c217d
 
92ccd47
fa5c1e1
 
92ccd47
fa5c1e1
 
 
 
92ccd47
fa5c1e1
 
92ccd47
fa5c1e1
 
 
 
92ccd47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa5c1e1
 
 
976f3b9
 
92ccd47
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import gradio as gr
import os
import torch
import logging
import soundfile as sf
import time
from kokoro import KModel, KPipeline

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
VOICE_DIR = os.path.join(os.path.dirname(__file__), "voices")
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "output_audio")
TEXT = "Hello, this is a test of the Kokoro TTS system."

# Ensure directories exist
os.makedirs(VOICE_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Device setup
CUDA_AVAILABLE = torch.cuda.is_available()
device = "cuda" if CUDA_AVAILABLE else "cpu"
logger.info(f"Using hardware: {device}")

# Cache model in a persistent directory
MODEL_CACHE_DIR = os.path.join(os.path.dirname(__file__), "model_cache")
os.makedirs(MODEL_CACHE_DIR, exist_ok=True)

# Load a single model instance with caching
try:
    start_time = time.time()
    model = KModel("hexgrad/Kokoro-82M", cache_dir=MODEL_CACHE_DIR).to(device).eval()
    logger.info(f"Model loading time: {time.time() - start_time} seconds")
except Exception as e:
    logger.error(f"Failed to load model: {e}")
    raise

# Define pipelines for American ('a') and British ('b') English
pipelines = {
    'a': KPipeline(model=model, lang_code='a', device=device),  # American English
    'b': KPipeline(model=model, lang_code='b', device=device)   # British English
}

# Set custom pronunciations for "kokoro"
try:
    pipelines["a"].g2p.lexicon.golds["kokoro"] = "kหˆOkษ™ษนO"
    pipelines["b"].g2p.lexicon.golds["kokoro"] = "kหˆQkษ™ษนQ"
except AttributeError as e:
    logger.warning(f"Could not set custom pronunciations: {e}")

# Cache voice choices to avoid repeated file scanning
VOICE_CHOICES = None
def load_voice_choices():
    global VOICE_CHOICES
    if VOICE_CHOICES is not None:
        return VOICE_CHOICES
    voice_files = [f for f in os.listdir(VOICE_DIR) if f.endswith('.pt')]
    choices = {}
    for voice_file in voice_files:
        prefix = voice_file[:2]
        if prefix == 'af':
            label = f"๐Ÿ‡บ๐Ÿ‡ธ ๐Ÿšบ {voice_file[3:-3].capitalize()}"
        elif prefix == 'am':
            label = f"๐Ÿ‡บ๐Ÿ‡ธ ๐Ÿšน {voice_file[3:-3].capitalize()}"
        elif prefix == 'bf':
            label = f"๐Ÿ‡ฌ๐Ÿ‡ง ๐Ÿšบ {voice_file[3:-3].capitalize()}"
        elif prefix == 'bm':
            label = f"๐Ÿ‡ฌ๐Ÿ‡ง ๐Ÿšน {voice_file[3:-3].capitalize()}"
        else:
            label = f"Unknown {voice_file[:-3]}"
        choices[label] = voice_file
    if not choices:
        logger.warning("No voice files found in VOICE_DIR. Adding a placeholder.")
        choices = {"๐Ÿ‡บ๐Ÿ‡ธ ๐Ÿšบ Bella ๐Ÿ”ฅ": "af_bella.pt"}
    VOICE_CHOICES = choices
    return choices

CHOICES = load_voice_choices()

# Log available voices
for label, voice_path in CHOICES.items():
    full_path = os.path.join(VOICE_DIR, voice_path)
    if not os.path.exists(full_path):
        logger.warning(f"Voice file not found: {full_path}")
    else:
        logger.info(f"Loaded voice: {label} ({voice_path})")

def generate_first(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE):
    start_time = time.time()
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    use_gpu = use_gpu and CUDA_AVAILABLE
    try:
        if not use_gpu and model.device.type != "cpu":
            model.to("cpu")
        generator = pipeline(text, voice=voice_path, speed=speed)
        for _, ps, audio in generator:
            logger.info(f"Generation time: {time.time() - start_time} seconds")
            return (24000, audio.numpy()), ps
    except gr.exceptions.Error as e:
        if use_gpu:
            gr.Warning(str(e))
            gr.Info("Retrying with CPU.")
            model.to("cpu")
            generator = pipeline(text, voice=voice_path, speed=speed)
            for _, ps, audio in generator:
                logger.info(f"Generation time (CPU retry): {time.time() - start_time} seconds")
                return (24000, audio.numpy()), ps
        else:
            raise gr.Error(e)
    return None, ""

def tokenize_first(text, voice="af_bella.pt"):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    generator = pipeline(text, voice=voice_path)
    for _, ps, _ in generator:
        return ps
    return ""

def generate_all(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE):
    start_time = time.time()
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    use_gpu = use_gpu and CUDA_AVAILABLE
    if not use_gpu and model.device.type != "cpu":
        model.to("cpu")
    first = True
    generator = pipeline(text, voice=voice_path, speed=speed)
    for _, _, audio in generator:
        yield 24000, audio.numpy()
        if first:
            first = False
            yield 24000, torch.zeros(1).numpy()
    logger.info(f"Streaming generation time: {time.time() - start_time} seconds")

TOKEN_NOTE = '''
๐Ÿ’ก Customize pronunciation with Markdown link syntax and /slashes/ like [Kokoro](/kหˆOkษ™ษนO/)

๐Ÿ’ฌ To adjust intonation, try punctuation ;:,.!?โ€”โ€ฆ"()โ€œโ€ or stress หˆ and หŒ

โฌ‡๏ธ Lower stress [1 level](-1) or [2 levels](-2)

โฌ†๏ธ Raise stress 1 level [or](+2) 2 levels (only works on less stressed, usually short words)
'''

with gr.Blocks(theme="soft") as app:
    with gr.Row():
        with gr.Column():
            text = gr.Textbox(label="Input Text", value=TEXT, info="Arbitrarily many characters supported")
            with gr.Row():
                voice = gr.Dropdown(list(CHOICES.items()), value="af_bella.pt" if "af_bella.pt" in CHOICES.values() else list(CHOICES.values())[0], label="Voice",
                                    info="Quality and availability vary by language")
                use_gpu = gr.Dropdown(
                    [("GPU ๐Ÿš€", True), ("CPU ๐ŸŒ", False)],
                    value=CUDA_AVAILABLE,
                    label="Hardware",
                    info="GPU is faster but requires CUDA support",
                    interactive=CUDA_AVAILABLE
                )
            speed = gr.Slider(minimum=0.5, maximum=2, value=1, step=0.1, label="Speed")
        with gr.Column():
            with gr.Tab(label="Generate"):
                out_audio = gr.Audio(label="Output Audio", interactive=False, streaming=False, autoplay=True)
                generate_btn = gr.Button("Generate", variant="primary")
                with gr.Accordion("Output Tokens", open=True):
                    out_ps = gr.Textbox(interactive=False, show_label=False,
                                        info="Tokens used to generate the audio, up to 510 context length.")
                    tokenize_btn = gr.Button("Tokenize", variant="secondary")
                    gr.Markdown(TOKEN_NOTE)
            with gr.Tab(label="Stream"):
                out_stream = gr.Audio(label="Output Audio Stream", interactive=False, streaming=True, autoplay=True)
                with gr.Row():
                    stream_btn = gr.Button("Stream", variant="primary")
                    stop_btn = gr.Button("Stop", variant="stop")
                gr.Markdown("โš ๏ธ Streaming may have initial delays due to processing.")

    generate_btn.click(fn=generate_first, inputs=[text, voice, speed, use_gpu], outputs=[out_audio, out_ps])
    tokenize_btn.click(fn=tokenize_first, inputs=[text, voice], outputs=[out_ps])
    stream_event = stream_btn.click(fn=generate_all, inputs=[text, voice, speed, use_gpu], outputs=[out_stream])
    stop_btn.click(fn=None, cancels=[stream_event])

if __name__ == "__main__":
    logger.info("Starting Gradio app...")
    app.launch()
    logger.info("Gradio app started.")