File size: 10,812 Bytes
61d473c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
from kokoro import KModel, KPipeline
import gradio as gr
import os
import random
import torch
import logging
import soundfile as sf

# Optional: import Resemblyzer for voice cloning (install via pip install resemblyzer)
try:
    from resemblyzer import VoiceEncoder, preprocess_wav
    encoder = VoiceEncoder()
except ImportError:
    encoder = None

# Configuration
VOICE_DIR = r"D:\New folder (2)\model\voices"
OUTPUT_DIR = r"D:\New folder (2)\output_audio"
TEXT = "Hello, this is a test of the Kokoro TTS system."

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Device setup
CUDA_AVAILABLE = torch.cuda.is_available()
device = "cuda" if CUDA_AVAILABLE else "cpu"
logger.info(f"Using hardware: {device}")

# Load models for CPU and GPU (if available)
models = {gpu: KModel("hexgrad/Kokoro-82M").to("cuda" if gpu else "cpu").eval() for gpu in [False] + ([True] if CUDA_AVAILABLE else [])}

# Define pipelines for American ('a') and British ('b') English
pipelines = {
    'a': KPipeline(model=models[False], lang_code='a', device='cpu'),  # American English
    'b': KPipeline(model=models[False], lang_code='b', device='cpu')   # British English
}

# Set custom pronunciations for "kokoro" in both American and British modes
try:
    pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkΙ™ΙΉO"
    pipelines["b"].g2p.lexicon.golds["kokoro"] = "kˈQkΙ™ΙΉQ"
except AttributeError as e:
    logger.warning(f"Could not set custom pronunciations: {e}")

def forward_gpu(text, voice_path, speed):
    # Use the GPU model directly without spaces.GPU decorator
    pipeline = pipelines[voice_path[0]]
    # Ensure the pipeline uses the GPU model
    pipeline.model = models[True]  # Switch to GPU model
    generator = pipeline(text, voice=voice_path, speed=speed)
    for _, _, audio in generator:
        return audio
    return None

def generate_first(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE, clone_voice_file=None):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]

    # If a clone file is provided and the encoder is available, try to clone the voice
    if clone_voice_file is not None and encoder is not None:
        try:
            # clone_voice_file is a file path (string) in Gradio with type="filepath"
            wav = preprocess_wav(clone_voice_file)
            cloned_voice = torch.tensor(encoder.embed_utterance(wav), device=device).unsqueeze(0)
            temp_voice_path = os.path.join(VOICE_DIR, "cloned_voice.pt")
            torch.save(cloned_voice, temp_voice_path)
            voice_path = temp_voice_path
        except Exception as e:
            logger.error(f"Error cloning voice: {e}")
            voice_path = os.path.join(VOICE_DIR, voice)

    use_gpu = use_gpu and CUDA_AVAILABLE
    try:
        if use_gpu:
            audio = forward_gpu(text, voice_path, speed)
        else:
            pipeline.model = models[False]  # Ensure CPU model is used
            generator = pipeline(text, voice=voice_path, speed=speed)
            for _, ps, audio in generator:
                return (24000, audio.numpy()), ps
    except gr.exceptions.Error as e:
        if use_gpu:
            gr.Warning(str(e))
            gr.Info("Retrying with CPU. To avoid this error, change Hardware to CPU.")
            pipeline.model = models[False]  # Switch to CPU model
            generator = pipeline(text, voice=voice_path, speed=speed)
            for _, ps, audio in generator:
                return (24000, audio.numpy()), ps
        else:
            raise gr.Error(e)
    return None, ""

def predict(text, voice="af_bella.pt", speed=1):
    return generate_first(text, voice, speed, use_gpu=False)[0]

def tokenize_first(text, voice="af_bella.pt"):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    generator = pipeline(text, voice=voice_path)
    for _, ps, _ in generator:
        return ps
    return ""

def generate_all(text, voice="af_bella.pt", speed=1, use_gpu=CUDA_AVAILABLE):
    voice_path = os.path.join(VOICE_DIR, voice)
    if not os.path.exists(voice_path):
        raise FileNotFoundError(f"Voice file not found: {voice_path}")

    pipeline = pipelines[voice[0]]
    use_gpu = use_gpu and CUDA_AVAILABLE
    first = True
    if use_gpu:
        pipeline.model = models[True]  # Switch to GPU model
    else:
        pipeline.model = models[False]  # Switch to CPU model
    generator = pipeline(text, voice=voice_path, speed=speed)
    for _, _, audio in generator:
        yield 24000, audio.numpy()
        if first:
            first = False
            yield 24000, torch.zeros(1).numpy()

# Load random quotes and sample texts
try:
    with open("en.txt", "r") as r:
        random_quotes = [line.strip() for line in r]
except FileNotFoundError:
    random_quotes = ["Hello, this is a test of the Kokoro TTS system."]

def get_random_quote():
    return random.choice(random_quotes)

def get_gatsby():
    try:
        with open("gatsby5k.md", "r") as r:
            return r.read().strip()
    except FileNotFoundError:
        return "The Great Gatsby text not found."

def get_frankenstein():
    try:
        with open("frankenstein5k.md", "r") as r:
            return r.read().strip()
    except FileNotFoundError:
        return "Frankenstein text not found."

# Dynamically load all .pt voice files from VOICE_DIR
def load_voice_choices():
    voice_files = [f for f in os.listdir(VOICE_DIR) if f.endswith('.pt')]
    choices = {}
    for voice_file in voice_files:
        # Determine the voice type based on the prefix
        prefix = voice_file[:2]
        if prefix == 'af':
            label = f"πŸ‡ΊπŸ‡Έ 🚺 {voice_file[3:-3].capitalize()}"
        elif prefix == 'am':
            label = f"πŸ‡ΊπŸ‡Έ 🚹 {voice_file[3:-3].capitalize()}"
        elif prefix == 'bf':
            label = f"πŸ‡¬πŸ‡§ 🚺 {voice_file[3:-3].capitalize()}"
        elif prefix == 'bm':
            label = f"πŸ‡¬πŸ‡§ 🚹 {voice_file[3:-3].capitalize()}"
        else:
            label = f"Unknown {voice_file[:-3]}"
        choices[label] = voice_file
    return choices

CHOICES = load_voice_choices()

# Log available voices
for label, voice_path in CHOICES.items():
    full_path = os.path.join(VOICE_DIR, voice_path)
    if not os.path.exists(full_path):
        logger.warning(f"Voice file not found: {full_path}")
    else:
        logger.info(f"Loaded voice: {label} ({voice_path})")

# If no voices are found, add a default fallback
if not CHOICES:
    logger.warning("No voice files found in VOICE_DIR. Adding a placeholder.")
    CHOICES = {"πŸ‡ΊπŸ‡Έ 🚺 Bella πŸ”₯": "af_bella.pt"}

TOKEN_NOTE = '''

πŸ’‘ Customize pronunciation with Markdown link syntax and /slashes/ like [Kokoro](/kˈOkΙ™ΙΉO/)



πŸ’¬ To adjust intonation, try punctuation ;:,.!?—…"()β€œβ€ or stress ˈ and ˌ



⬇️ Lower stress [1 level](-1) or [2 levels](-2)



⬆️ Raise stress 1 level [or](+2) 2 levels (only works on less stressed, usually short words)

'''

with gr.Blocks() as generate_tab:
    out_audio = gr.Audio(label="Output Audio", interactive=False, streaming=False, autoplay=True)
    generate_btn = gr.Button("Generate", variant="primary")
    with gr.Accordion("Output Tokens", open=True):
        out_ps = gr.Textbox(interactive=False, show_label=False,
                            info="Tokens used to generate the audio, up to 510 context length.")
        tokenize_btn = gr.Button("Tokenize", variant="secondary")
        gr.Markdown(TOKEN_NOTE)
        predict_btn = gr.Button("Predict", variant="secondary", visible=False)

with gr.Blocks() as stream_tab:
    out_stream = gr.Audio(label="Output Audio Stream", interactive=False, streaming=True, autoplay=True)
    with gr.Row():
        stream_btn = gr.Button("Stream", variant="primary")
        stop_btn = gr.Button("Stop", variant="stop")
    with gr.Accordion("Note", open=True):
        gr.Markdown("⚠️ There is an unknown Gradio bug that might yield no audio the first time you click Stream.")
        gr.DuplicateButton()

API_OPEN = True
with gr.Blocks() as app:
    with gr.Row():
        with gr.Column():
            text = gr.Textbox(label="Input Text", info="Arbitrarily many characters supported")
            with gr.Row():
                voice = gr.Dropdown(list(CHOICES.items()), value="af_bella.pt" if "af_bella.pt" in CHOICES.values() else list(CHOICES.values())[0], label="Voice",
                                    info="Quality and availability vary by language")
                use_gpu = gr.Dropdown(
                    [("GPU πŸš€", True), ("CPU 🐌", False)],
                    value=CUDA_AVAILABLE,
                    label="Hardware",
                    info="GPU is usually faster, but may require CUDA support",
                    interactive=CUDA_AVAILABLE
                )
            speed = gr.Slider(minimum=0.5, maximum=2, value=1, step=0.1, label="Speed")
            clone_voice_file = gr.File(label="Clone Voice Sample (Optional)", file_count="single", type="filepath")
            random_btn = gr.Button("🎲 Random Quote πŸ’¬", variant="secondary")
            with gr.Row():
                gatsby_btn = gr.Button("πŸ₯‚ Gatsby πŸ“•", variant="secondary")
                frankenstein_btn = gr.Button("πŸ’€ Frankenstein πŸ“—", variant="secondary")
        with gr.Column():
            gr.TabbedInterface([generate_tab, stream_tab], ["Generate", "Stream"])
    random_btn.click(fn=get_random_quote, inputs=[], outputs=[text])
    gatsby_btn.click(fn=get_gatsby, inputs=[], outputs=[text])
    frankenstein_btn.click(fn=get_frankenstein, inputs=[], outputs=[text])
    generate_btn.click(fn=generate_first, inputs=[text, voice, speed, use_gpu, clone_voice_file],
                       outputs=[out_audio, out_ps])
    tokenize_btn.click(fn=tokenize_first, inputs=[text, voice], outputs=[out_ps])
    stream_event = stream_btn.click(fn=generate_all, inputs=[text, voice, speed, use_gpu], outputs=[out_stream])
    stop_btn.click(fn=None, cancels=[stream_event])
    predict_btn.click(fn=predict, inputs=[text, voice, speed], outputs=[out_audio])

if __name__ == "__main__":
    app.queue(api_open=API_OPEN).launch(
        server_name="127.0.0.1",
        server_port=40001,
        show_api=API_OPEN,
        inbrowser=True
    )