File size: 18,031 Bytes
1190db4
 
 
 
 
 
 
5c85d81
1190db4
 
 
354d940
1190db4
 
 
 
5c85d81
 
1190db4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c85d81
1190db4
5c85d81
354d940
5c85d81
1190db4
5c85d81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
354d940
5c85d81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
354d940
 
5c85d81
 
 
 
 
 
354d940
 
5c85d81
 
 
 
354d940
 
 
5c85d81
 
 
 
354d940
5c85d81
 
 
 
 
 
 
 
 
1190db4
5c85d81
 
 
 
 
354d940
 
5c85d81
354d940
5c85d81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
354d940
5c85d81
 
354d940
5c85d81
 
 
 
 
1190db4
354d940
 
 
 
5c85d81
354d940
1190db4
 
354d940
1190db4
 
 
 
 
 
 
 
 
 
 
5c85d81
 
1190db4
 
5c85d81
 
 
 
 
 
 
1190db4
354d940
1190db4
5c85d81
 
 
354d940
5c85d81
1190db4
354d940
 
5c85d81
 
 
354d940
5c85d81
 
 
 
354d940
 
 
 
 
1190db4
 
 
354d940
 
 
1190db4
 
 
5c85d81
354d940
1190db4
 
 
354d940
1190db4
354d940
 
 
 
 
 
 
 
1190db4
354d940
 
 
1190db4
 
354d940
1190db4
354d940
 
 
 
 
 
1190db4
 
5c85d81
354d940
1190db4
354d940
1190db4
 
 
354d940
1190db4
354d940
 
 
 
1190db4
5c85d81
 
354d940
 
5c85d81
354d940
7a4548f
 
 
354d940
7a4548f
5c85d81
 
 
 
354d940
 
 
 
 
 
 
 
5c85d81
 
1190db4
354d940
 
1190db4
 
 
354d940
 
5c85d81
 
354d940
 
 
5c85d81
354d940
5c85d81
 
354d940
5c85d81
354d940
 
 
5c85d81
354d940
 
 
5c85d81
354d940
5c85d81
354d940
1190db4
 
354d940
 
 
1190db4
 
354d940
 
 
 
1190db4
 
354d940
 
 
 
 
5c85d81
354d940
1190db4
 
354d940
1190db4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import gradio as gr
import os
import asyncio
import tempfile
import shutil
import zipfile
import random
import json
from openai import AsyncOpenAI

from utils.script_parser import parse_dialogue_script, calculate_cost, MAX_SCRIPT_LENGTH
from utils.openai_tts import synthesize_speech_line, OPENAI_VOICES as ALL_TTS_VOICES
from utils.merge_audio import merge_mp3_files

# --- Configuration ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
NSFW_API_URL_TEMPLATE = os.getenv("NSFW_API_URL_TEMPLATE")
MODEL_DEFAULT = os.getenv("MODEL_DEFAULT", "tts-1-hd")

if not OPENAI_API_KEY:
    try:
        from huggingface_hub import HfApi
        api = HfApi()
        space_id = os.getenv("SPACE_ID")
        if space_id:
            secrets = api.get_space_secrets(repo_id=space_id)
            OPENAI_API_KEY = secrets.get("OPENAI_API_KEY")
            NSFW_API_URL_TEMPLATE = secrets.get("NSFW_API_URL_TEMPLATE", NSFW_API_URL_TEMPLATE)
            MODEL_DEFAULT = secrets.get("MODEL_DEFAULT", MODEL_DEFAULT)
    except Exception as e:
        print(f"Could not retrieve secrets from Hugging Face Hub: {e}")

async_openai_client = None
if OPENAI_API_KEY:
    async_openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
else:
    print("ERROR: OPENAI_API_KEY secret is not set. The application will not function properly.")

TTS_MODELS_AVAILABLE = ["tts-1", "tts-1-hd", "gpt-4o-mini-tts"]
if MODEL_DEFAULT not in TTS_MODELS_AVAILABLE:
    MODEL_DEFAULT = "tts-1-hd"

SPEAKER_CONFIG_METHODS = [
    "Single Voice (Global)", 
    "Random per Speaker", 
    "A/B Round Robin", 
    "Detailed Configuration (JSON)"
]
DEFAULT_SPEAKER_CONFIG_METHOD = "Random per Speaker"
APP_AVAILABLE_VOICES = ALL_TTS_VOICES.copy()
_speaker_config_cache = {}

def parse_detailed_speaker_config(json_text, parsed_script_lines):
    config_map = {}
    default_voice = APP_AVAILABLE_VOICES[0]
    try:
        if not json_text.strip():
            return {}
        
        config_list = json.loads(json_text)
        if not isinstance(config_list, list):
            raise ValueError("JSON config must be a list of speaker objects.")
            
        for item in config_list:
            if not isinstance(item, dict) or "speaker" not in item or "voice" not in item:
                print(f"Skipping malformed item in JSON config: {item}")
                continue
            if item["voice"] not in APP_AVAILABLE_VOICES:
                print(f"Warning: Voice '{item['voice']}' for speaker '{item['speaker']}' not recognized. Falling back to '{default_voice}'.")
                item["voice"] = default_voice

            if "speed" in item:
                try:
                    item["speed"] = float(item["speed"])
                    if not (0.25 <= item["speed"] <= 4.0):
                        print(f"Warning: Speed for speaker '{item['speaker']}' out of range (0.25-4.0). Clamping.")
                        item["speed"] = max(0.25, min(item["speed"], 4.0))
                except ValueError:
                    print(f"Warning: Invalid speed value for speaker '{item['speaker']}'. Using default.")
                    item.pop("speed", None)

            config_map[item["speaker"]] = {
                "voice": item["voice"],
                "speed": item.get("speed"),
                "instructions": item.get("instructions")
            }
        return config_map
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in Detailed Speaker Configuration: {e}")
    except ValueError as e:
        raise e
    except Exception as e:
        raise ValueError(f"Error parsing Detailed Speaker Configuration: {e}")

def get_config_for_speaker(speaker_name, speaker_config_method, unique_script_speakers, 
                           global_selected_voice, detailed_config_map):
    global _speaker_config_cache
    if _speaker_config_cache.get("__method") != speaker_config_method or \
       _speaker_config_cache.get("__speakers_set") != frozenset(unique_script_speakers):
        _speaker_config_cache = {"__method": speaker_config_method, "__speakers_set": frozenset(unique_script_speakers)}

    base_config = {"voice": APP_AVAILABLE_VOICES[0], "speed": None, "instructions": None}

    if speaker_config_method == "Single Voice (Global)":
        base_config["voice"] = global_selected_voice if global_selected_voice in APP_AVAILABLE_VOICES else APP_AVAILABLE_VOICES[0]
        return base_config
    
    if speaker_config_method == "Detailed Configuration (JSON)":
        if speaker_name in detailed_config_map:
            speaker_specific = detailed_config_map[speaker_name]
            return {
                "voice": speaker_specific.get("voice", base_config["voice"]),
                "speed": speaker_specific.get("speed"),
                "instructions": speaker_specific.get("instructions")
            }
        else:
            print(f"Warning: Speaker '{speaker_name}' not found in Detailed JSON. Using default voice '{base_config['voice']}'.")
            return base_config

    if speaker_name not in _speaker_config_cache:
        if speaker_config_method == "Random per Speaker":
            available_voices_shuffled = random.sample(APP_AVAILABLE_VOICES, len(APP_AVAILABLE_VOICES))
            if not _speaker_config_cache.get("__all_assigned_random"):
                 for i, spk_unique in enumerate(unique_script_speakers):
                     if spk_unique not in _speaker_config_cache:
                        _speaker_config_cache[spk_unique] = {"voice": available_voices_shuffled[i % len(available_voices_shuffled)]}
                 _speaker_config_cache["__all_assigned_random"] = True
            if speaker_name not in _speaker_config_cache:
                 _speaker_config_cache[speaker_name] = {"voice": random.choice(APP_AVAILABLE_VOICES)}
        elif speaker_config_method == "A/B Round Robin":
            if not _speaker_config_cache.get("__all_assigned_ab"):
                for i, spk_unique in enumerate(unique_script_speakers):
                     if spk_unique not in _speaker_config_cache:
                        _speaker_config_cache[spk_unique] = {"voice": APP_AVAILABLE_VOICES[i % len(APP_AVAILABLE_VOICES)]}
                _speaker_config_cache["__all_assigned_ab"] = True
            if speaker_name not in _speaker_config_cache:
                speaker_idx = unique_script_speakers.index(speaker_name) if speaker_name in unique_script_speakers else 0
                _speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[speaker_idx % len(APP_AVAILABLE_VOICES)]}
        else:
             _speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[0]}

    cached_entry = _speaker_config_cache.get(speaker_name, base_config.copy())
    return {"voice": cached_entry.get("voice", base_config["voice"]), "speed": None, "instructions": None}

async def handle_script_processing(
    dialogue_script: str, tts_model: str, pause_ms: int, 
    speaker_config_method: str, global_voice_selection: str, 
    detailed_speaker_json: str, global_speed: float, 
    global_instructions: str, progress=gr.Progress(track_tqdm=True)):
    global _speaker_config_cache
    _speaker_config_cache = {}

    if not OPENAI_API_KEY or not async_openai_client:
        return None, None, "Error: OPENAI_API_KEY is not configured."
    if not dialogue_script.strip():
        return None, None, "Error: Script is empty."

    job_audio_path_prefix = os.path.join(tempfile.gettempdir(), "current_job_audio")
    if os.path.exists(job_audio_path_prefix):
        shutil.rmtree(job_audio_path_prefix)
    os.makedirs(job_audio_path_prefix, exist_ok=True)

    try:
        parsed_lines, total_chars = parse_dialogue_script(dialogue_script)
        if not parsed_lines:
            return None, None, "Error: No valid dialogue lines found."
    except ValueError as e:
        return None, None, f"Script parsing error: {str(e)}"

    unique_speakers = sorted(list(set(p["speaker"] for p in parsed_lines)))
    parsed_detailed_config_map = {}
    if speaker_config_method == "Detailed Configuration (JSON)":
        try:
            parsed_detailed_config_map = parse_detailed_speaker_config(detailed_speaker_json, parsed_lines)
        except ValueError as e:
            return None, None, f"Configuration Error: {str(e)}"

    tasks, line_audio_files = [], [None] * len(parsed_lines)
    for i, line_data in enumerate(parsed_lines):
        speaker_name = line_data["speaker"]
        speaker_base_cfg = get_config_for_speaker(
            speaker_name, speaker_config_method, unique_speakers, 
            global_voice_selection, parsed_detailed_config_map)
        line_voice = speaker_base_cfg["voice"]
        
        effective_speed = global_speed
        if speaker_base_cfg.get("speed") is not None:
            effective_speed = speaker_base_cfg["speed"]
        
        effective_instructions = global_instructions if global_instructions and global_instructions.strip() else None
        if speaker_base_cfg.get("instructions") is not None and speaker_base_cfg["instructions"].strip():
            effective_instructions = speaker_base_cfg["instructions"]

        output_filename = os.path.join(job_audio_path_prefix, f"line_{line_data['id']}.mp3")
        progress(i / len(parsed_lines), desc=f"Synthesizing line {i+1}/{len(parsed_lines)} ({speaker_name} w/ {line_voice})")
        tasks.append(synthesize_speech_line(
            client=async_openai_client, text=line_data["text"], voice=line_voice,
            output_path=output_filename, model=tts_model, speed=effective_speed,
            instructions=effective_instructions, nsfw_api_url_template=NSFW_API_URL_TEMPLATE,
            line_index=line_data['id']))

    synthesis_results = await asyncio.gather(*tasks, return_exceptions=True)
    for idx, result in enumerate(synthesis_results):
        if isinstance(result, Exception): print(f"Error for line {parsed_lines[idx]['id']}: {result}")
        elif result is None: print(f"Skipped/failed line {parsed_lines[idx]['id']}")
        else: line_audio_files[idx] = result 

    valid_audio_files = [f for f in line_audio_files if f and os.path.exists(f) and os.path.getsize(f) > 0]
    if not valid_audio_files:
        shutil.rmtree(job_audio_path_prefix)
        return None, None, "Error: No audio files successfully synthesized."

    zip_filename = os.path.join(job_audio_path_prefix, "dialogue_lines.zip")
    with zipfile.ZipFile(zip_filename, 'w') as zf:
        for pth in valid_audio_files: zf.write(pth, os.path.basename(pth))
    
    merged_mp3_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3")
    merged_out_path = merge_mp3_files([f for f in line_audio_files if f], merged_mp3_fn, pause_ms)

    status = f"Processed {len(valid_audio_files)}/{len(parsed_lines)} lines. "
    if len(valid_audio_files) < len(parsed_lines): status += "Some lines failed/skipped. "
    if not merged_out_path and len(valid_audio_files) > 0: status += "Merged audio failed. "
    elif not merged_out_path and len(valid_audio_files) == 0: status += "No audio generated."
    else: status += "Outputs generated."
        
    return (zip_filename if os.path.exists(zip_filename) else None,
           merged_out_path if merged_out_path and os.path.exists(merged_out_path) else None,
           status)

def handle_calculate_cost(dialogue_script: str, tts_model: str):
    if not dialogue_script.strip(): return "Cost: $0.000000 (Empty script)"
    try:
        parsed, chars = parse_dialogue_script(dialogue_script)
        if not parsed: return "Cost: $0.000000 (No valid lines)"
        cost = calculate_cost(chars, len(parsed), tts_model)
        return f"Est. Cost: ${cost:.6f} ({chars} chars, {len(parsed)} lines)"
    except ValueError as e: return f"Error: {str(e)}"
    except Exception as e: return f"Cost calc error: {str(e)}"

with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("# Dialogue Script to Speech Converter (Enhanced)")
    gr.Markdown("Convert scripts with control over voices, speed, and instructions.")
    if not OPENAI_API_KEY or not async_openai_client:
        gr.Markdown("<h3 style='color:red;'>Warning: OPENAI_API_KEY not set. Synthesis will fail.</h3>")

    with gr.Row():
        with gr.Column(scale=2):
            script_input = gr.TextArea(label="Dialogue Script", placeholder="[S1] Hi!\n[S2] Hello!", lines=10, info=f"Max {MAX_SCRIPT_LENGTH} chars.")
        with gr.Column(scale=1):
            tts_model_dropdown = gr.Dropdown(TTS_MODELS_AVAILABLE, label="TTS Model", value=MODEL_DEFAULT, info="Affects controls below.")
            pause_input = gr.Number(label="Pause (ms)", value=500, minimum=0, maximum=5000, step=50, info="Between merged lines.")
            global_speed_input = gr.Slider(minimum=0.25, maximum=4.0, value=1.0, step=0.05, label="Global Speed (tts-1/tts-1-hd)", visible=(MODEL_DEFAULT in ["tts-1", "tts-1-hd"]), interactive=True)
            global_instructions_input = gr.Textbox(label="Global Instructions (gpt-4o-mini-tts)", placeholder="e.g., Speak calmly.", visible=(MODEL_DEFAULT == "gpt-4o-mini-tts"), interactive=True, lines=2)

    gr.Markdown("### Speaker Configuration")
    with gr.Row():
        speaker_config_method_dropdown = gr.Dropdown(SPEAKER_CONFIG_METHODS, label="Speaker Config Method", value=DEFAULT_SPEAKER_CONFIG_METHOD)
        global_voice_dropdown = gr.Dropdown(APP_AVAILABLE_VOICES, label="Global Voice ('Single Voice' method)", value=APP_AVAILABLE_VOICES[0], visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Single Voice (Global)"), interactive=True)
    
    initial_json_config_example = """[
  {"speaker": "Alice", "voice": "nova", "speed": 1.1, "instructions": "sound excited"},
  {"speaker": "Bob", "voice": "echo"},
  {"speaker": "Narrator", "voice": "shimmer", "instructions": "be very serious"}
]""".strip()

    detailed_speaker_config_input = gr.Code(
        label="Detailed Speaker Configuration (JSON)",
        language="json",
        lines=7,
        value=initial_json_config_example,
        visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)")
    )
    # New Markdown for info text, visibility tied to dropdown
    detailed_config_info_md = gr.Markdown(
        "<small>Define voice per speaker. Optionally, `speed` (0.25-4.0) for `tts-1`/`tts-1-hd` models, "
        "and `instructions` (text) for `gpt-4o-mini-tts`.</small>",
        visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)")
    )
    
    with gr.Row():
        calculate_cost_button = gr.Button("Calculate Cost")
        generate_button = gr.Button("Generate Audio", variant="primary")
    
    cost_output = gr.Textbox(label="Estimated Cost", interactive=False)
    with gr.Row():
        individual_lines_zip_output = gr.File(label="Download ZIP")
        merged_dialogue_mp3_output = gr.Audio(label="Merged MP3", type="filepath")
    status_output = gr.Textbox(label="Status", interactive=False, lines=2)

    def update_model_controls(selected_model):
        is_tts1 = selected_model in ["tts-1", "tts-1-hd"]
        is_gpt_mini = selected_model == "gpt-4o-mini-tts"
        return {
            global_speed_input: gr.update(visible=is_tts1, interactive=is_tts1),
            global_instructions_input: gr.update(visible=is_gpt_mini, interactive=is_gpt_mini)
        }
    tts_model_dropdown.change(fn=update_model_controls, inputs=[tts_model_dropdown], outputs=[global_speed_input, global_instructions_input])

    def update_speaker_controls(method):
        is_single = (method == "Single Voice (Global)")
        is_detailed = (method == "Detailed Configuration (JSON)")
        return {
            global_voice_dropdown: gr.update(visible=is_single, interactive=is_single),
            detailed_speaker_config_input: gr.update(visible=is_detailed, interactive=is_detailed),
            detailed_config_info_md: gr.update(visible=is_detailed) # Control visibility of new Markdown
        }
    speaker_config_method_dropdown.change(fn=update_speaker_controls, inputs=[speaker_config_method_dropdown], outputs=[global_voice_dropdown, detailed_speaker_config_input, detailed_config_info_md])
    
    calculate_cost_button.click(fn=handle_calculate_cost, inputs=[script_input, tts_model_dropdown], outputs=[cost_output])
    generate_button.click(
        fn=handle_script_processing,
        inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown, 
                global_voice_dropdown, detailed_speaker_config_input, global_speed_input, global_instructions_input],
        outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output])

    gr.Markdown("## Examples")
    ex_script1 = "[A] Hi B.\n[B] Hey A.\n[A] What's up?"
    ex_json1 = '[{"speaker": "A", "voice": "nova", "instr...": "curious"}, {"speaker": "B", "voice": "echo", "speed": 0.9}]'.replace("instr...", "instructions") # Hack for brevity
    ex_script2 = "[Cpt] Status?\n[Comp] Nominal. Slow.\n[Cpt] Good."
    ex_json2 = '[{"speaker": "Cpt", "voice": "alloy"}, {"speaker": "Comp", "voice": "onyx", "speed": 0.8, "instr...": "robotic"}]'.replace("instr...", "instructions")
    gr.Examples(
        examples=[
            [ex_script1, "gpt-4o-mini-tts", 250, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json1, 1.0, "Speak naturally."],
            [ex_script2, "tts-1-hd", 300, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json2, 1.1, ""],
            ["[N] Single line.", "tts-1", 0, "Single Voice (Global)", "fable", "", 1.2, ""]],
        inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown, global_voice_dropdown, 
                detailed_speaker_config_input, global_speed_input, global_instructions_input],
        outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output],
        fn=handle_script_processing, cache_examples=False)

if __name__ == "__main__":
    if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    demo.launch()