File size: 7,065 Bytes
98c217d
ee617da
98c217d
 
fa5c1e1
1509739
799287a
98c217d
 
 
 
fa5c1e1
 
 
 
 
 
 
 
 
98c217d
 
 
 
 
5dda1a8
db34aa6
98c217d
 
 
1509739
 
98c217d
 
1509739
98c217d
 
 
 
 
 
 
 
 
db34aa6
98c217d
 
 
 
1509739
 
 
98c217d
 
 
db34aa6
1509739
98c217d
fa5c1e1
 
98c217d
 
 
 
fa5c1e1
 
 
db34aa6
fa5c1e1
 
 
 
 
 
 
 
 
 
db34aa6
fa5c1e1
 
 
92ccd47
db34aa6
 
fa5c1e1
 
 
 
 
 
98c217d
db34aa6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98c217d
db34aa6
21977f5
db34aa6
 
 
 
 
 
 
21977f5
db34aa6
 
 
 
21977f5
db34aa6
 
21977f5
db34aa6
21977f5
db34aa6
92ccd47
db34aa6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa5c1e1
976f3b9
 
db34aa6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import gradio as gr
import os
import torch
import logging
import soundfile as sf
from kokoro import KModel, KPipeline

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
VOICE_DIR = os.path.join(os.path.dirname(__file__), "voices")
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "output_audio")
TEXT = "Hello, this is a test of the Kokoro TTS system."

# Ensure directories exist
os.makedirs(VOICE_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Device setup
CUDA_AVAILABLE = torch.cuda.is_available()
device = "cuda" if CUDA_AVAILABLE else "cpu"
logger.info(f"Using hardware: {device}")

# Load a single model instance
model = KModel("hexgrad/Kokoro-82M").to(device).eval()

# Define pipelines for American ('a') and British ('b') English
pipelines = {
    'a': KPipeline(model=model, lang_code='a', device=device),  # American English
    'b': KPipeline(model=model, lang_code='b', device=device)   # British English
}

# Set custom pronunciations for "kokoro"
try:
    pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO"
    pipelines["b"].g2p.lexicon.golds["kokoro"] = "kˈQkəɹQ"
except AttributeError as e:
    logger.warning(f"Could not set custom pronunciations: {e}")

def generate_first(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    use_gpu = use_gpu and CUDA_AVAILABLE
    try:
        generator = pipeline(text, voice=voice_path, speed=speed)
        for _, ps, audio in generator:
            return (24000, audio.numpy()), ps
    except gr.exceptions.Error as e:
        if use_gpu:
            gr.Warning(str(e))
            gr.Info("Retrying with CPU. To avoid this error, change Hardware to CPU.")
            model.to("cpu")
            generator = pipeline(text, voice=voice_path, speed=speed)
            for _, ps, audio in generator:
                return (24000, audio.numpy()), ps
        else:
            raise gr.Error(e)
    return None, ""

def tokenize_first(text, voice="af_bella.pt"):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    generator = pipeline(text, voice=voice_path)
    for _, ps, _ in generator:
        return ps
    return ""

def generate_all(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    use_gpu = use_gpu and CUDA_AVAILABLE
    first = True
    if not use_gpu:
        model.to("cpu")
    generator = pipeline(text, voice=voice_path, speed=speed)
    for _, _, audio in generator:
        yield 24000, audio.numpy()
        if first:
            first = False
            yield 24000, torch.zeros(1).numpy()

# Dynamically load .pt voice files from VOICE_DIR
def load_voice_choices():
    voice_files = [f for f in os.listdir(VOICE_DIR) if f.endswith('.pt')]
    choices = {}
    for voice_file in voice_files:
        prefix = voice_file[:2]
        if prefix == 'af':
            label = f"🇺🇸 🚺 {voice_file[3:-3].capitalize()}"
        elif prefix == 'am':
            label = f"🇺🇸 🚹 {voice_file[3:-3].capitalize()}"
        elif prefix == 'bf':
            label = f"🇬🇧 🚺 {voice_file[3:-3].capitalize()}"
        elif prefix == 'bm':
            label = f"🇬🇧 🚹 {voice_file[3:-3].capitalize()}"
        else:
            label = f"Unknown {voice_file[:-3]}"
        choices[label] = voice_file
    return choices

CHOICES = load_voice_choices()

# Log available voices
for label, voice_path in CHOICES.items():
    full_path = os.path.join(VOICE_DIR, voice_path)
    if not os.path.exists(full_path):
        logger.warning(f"Voice file not found: {full_path}")
    else:
        logger.info(f"Loaded voice: {label} ({voice_path})")

# If no voices are found, add a default fallback
if not CHOICES:
    logger.warning("No voice files found in VOICE_DIR. Adding a placeholder.")
    CHOICES = {"🇺🇸 🚺 Bella 🔥": "af_bella.pt"}

TOKEN_NOTE = '''
💡 Customize pronunciation with Markdown link syntax and /slashes/ like [Kokoro](/kˈOkəɹO/)

💬 To adjust intonation, try punctuation ;:,.!?—…"()“” or stress ˈ and ˌ

⬇️ Lower stress [1 level](-1) or [2 levels](-2)

⬆️ Raise stress 1 level [or](+2) 2 levels (only works on less stressed, usually short words)
'''

with gr.Blocks() as generate_tab:
    out_audio = gr.Audio(label="Output Audio", interactive=False, streaming=False, autoplay=True)
    generate_btn = gr.Button("Generate", variant="primary")
    with gr.Accordion("Output Tokens", open=True):
        out_ps = gr.Textbox(interactive=False, show_label=False,
                            info="Tokens used to generate the audio, up to 510 context length.")
        tokenize_btn = gr.Button("Tokenize", variant="secondary")
        gr.Markdown(TOKEN_NOTE)

with gr.Blocks() as stream_tab:
    out_stream = gr.Audio(label="Output Audio Stream", interactive=False, streaming=True, autoplay=True)
    with gr.Row():
        stream_btn = gr.Button("Stream", variant="primary")
        stop_btn = gr.Button("Stop", variant="stop")
    with gr.Accordion("Note", open=True):
        gr.Markdown("⚠️ There may be delays in streaming audio due to processing limitations.")

with gr.Blocks() as app:
    with gr.Row():
        with gr.Column():
            text = gr.Textbox(label="Input Text", info="Arbitrarily many characters supported")
            with gr.Row():
                voice = gr.Dropdown(list(CHOICES.items()), value="af_bella.pt" if "af_bella.pt" in CHOICES.values() else list(CHOICES.values())[0], label="Voice",
                                    info="Quality and availability vary by language")
                use_gpu = gr.Dropdown(
                    [("GPU �-held", True), ("CPU 🐌", False)],
                    value=CUDA_AVAILABLE,
                    label="Hardware",
                    info="GPU is usually faster, but may require CUDA support",
                    interactive=CUDA_AVAILABLE
                )
            speed = gr.Slider(minimum=0.5, maximum=2, value=1, step=0.1, label="Speed")
        with gr.Column():
            gr.TabbedInterface([generate_tab, stream_tab], ["Generate", "Stream"])
    generate_btn.click(fn=generate_first, inputs=[text, voice, speed, use_gpu],
                       outputs=[out_audio, out_ps])
    tokenize_btn.click(fn=tokenize_first, inputs=[text, voice], outputs=[out_ps])
    stream_event = stream_btn.click(fn=generate_all, inputs=[text, voice, speed, use_gpu], outputs=[out_stream])
    stop_btn.click(fn=None, cancels=[stream_event])

if __name__ == "__main__":
    app.queue().launch()