Spaces:
Running
Running
File size: 17,708 Bytes
8468afb 05e4a98 8468afb 05e4a98 8468afb 05e4a98 8468afb 05e4a98 8468afb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
import gradio as gr
import os
import asyncio
import tempfile
import shutil
import zipfile
import random
from functools import partial
from utils.script_parser import parse_dialogue_script, calculate_cost
from utils.openai_tts import synthesize_speech_line
from utils.merge_audio import merge_mp3_files
# Import constants from ui_layout to avoid circular dependencies if they were in app.py
from ui_layout import APP_AVAILABLE_VOICES, DEFAULT_VIBE, VIBE_CHOICES, PREDEFINED_VIBES
# Ensure a default voice if APP_AVAILABLE_VOICES is empty (shouldn't happen with new ui_layout)
DEFAULT_FALLBACK_VOICE = APP_AVAILABLE_VOICES[0] if APP_AVAILABLE_VOICES else "alloy"
def get_speakers_from_script(script_text: str):
if not script_text.strip():
return []
try:
parsed_lines, _ = parse_dialogue_script(script_text)
seen_speakers = set()
ordered_unique_speakers = []
for p in parsed_lines:
if p["speaker"] not in seen_speakers:
ordered_unique_speakers.append(p["speaker"])
seen_speakers.add(p["speaker"])
return ordered_unique_speakers
except ValueError:
return []
def handle_dynamic_input_change(new_value, current_configs_state_dict: dict, speaker_name: str, config_key: str, tts_model: str):
"""Handles changes from dynamically generated UI elements for per-speaker settings."""
# print(f"Dynamic change for {speaker_name}, key {config_key}: {new_value}. State: {current_configs_state_dict}")
if current_configs_state_dict is None: # Should ideally be initialized by Gradio's gr.State
current_configs_state_dict = {}
if speaker_name not in current_configs_state_dict:
current_configs_state_dict[speaker_name] = {}
current_configs_state_dict[speaker_name][config_key] = new_value
return current_configs_state_dict
def load_refresh_per_speaker_ui(script_text: str, current_configs_state_dict: dict, tts_model: str, speaker_configs_state_component: gr.State):
"""
Generates or refreshes the dynamic UI components (accordions) for each speaker.
Returns a list of Gradio components to populate the dynamic UI area and the updated state.
"""
# event_handlers.py - inside load_refresh_per_speaker_ui
print("DEBUG: load_refresh_per_speaker_ui CALLED - HARDCODED RETURN")
debug_markdown = gr.Markdown("## !! Dynamic Area Test Content Loaded !!")
# Return this simple component and an empty dict for state for now
return [debug_markdown], {}
# Comment out ALL original logic in this function for this test.
# print(f"Load/Refresh UI called. TTS Model: {tts_model}") # Debug
# unique_speakers = get_speakers_from_script(script_text)
# new_ui_components = []
# if current_configs_state_dict is None:
# current_configs_state_dict = {}
# # Ensure a default voice for safety
# safe_default_voice = APP_AVAILABLE_VOICES[0] if APP_AVAILABLE_VOICES else "alloy"
# for speaker_name in unique_speakers:
# if speaker_name not in current_configs_state_dict:
# current_configs_state_dict[speaker_name] = {
# "voice": safe_default_voice, "speed": 1.0,
# "vibe": DEFAULT_VIBE, "custom_instructions": ""
# }
# # Ensure all keys exist with defaults
# current_configs_state_dict[speaker_name].setdefault("voice", safe_default_voice)
# current_configs_state_dict[speaker_name].setdefault("speed", 1.0)
# current_configs_state_dict[speaker_name].setdefault("vibe", DEFAULT_VIBE)
# current_configs_state_dict[speaker_name].setdefault("custom_instructions", "")
# if not unique_speakers:
# print("No unique speakers found, returning markdown.") # Debug
# new_ui_components.append(gr.Markdown("No speakers detected in the script, or script is empty. Type a script and click 'Load/Refresh' again, or change the script content."))
# return new_ui_components, current_configs_state_dict
# print(f"Found speakers: {unique_speakers}. Building UI...") # Debug
# for speaker_name in unique_speakers:
# speaker_cfg = current_configs_state_dict[speaker_name]
# speed_interactive = tts_model in ["tts-1", "tts-1-hd"]
# instructions_relevant = tts_model == "gpt-4o-mini-tts"
# # Use a unique elem_id for each accordion to help Gradio differentiate if needed
# accordion_elem_id = f"accordion_speaker_{speaker_name.replace(' ', '_')}"
# with gr.Accordion(label=f"Settings for: {speaker_name}", open=False, elem_id=accordion_elem_id) as speaker_accordion:
# # Voice Dropdown
# voice_dd = gr.Dropdown(
# label="Voice", choices=APP_AVAILABLE_VOICES, value=speaker_cfg.get("voice", safe_default_voice), interactive=True
# )
# voice_dd.change(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="voice", tts_model=tts_model),
# inputs=[voice_dd, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# # Speed Slider
# speed_slider_label = "Speech Speed" + (" (Active for tts-1/hd)" if speed_interactive else " (N/A for this model)")
# speed_slider = gr.Slider(
# label=speed_slider_label, minimum=0.25, maximum=4.0, value=float(speaker_cfg.get("speed", 1.0)),
# step=0.05, interactive=speed_interactive
# )
# if speed_interactive:
# speed_slider.release(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="speed", tts_model=tts_model),
# inputs=[speed_slider, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# # Vibe Dropdown
# vibe_label = "Vibe/Emotion Preset" + (" (For gpt-4o-mini-tts)" if instructions_relevant else " (Less impact on other models)")
# vibe_dd = gr.Dropdown(
# label=vibe_label, choices=VIBE_CHOICES, value=speaker_cfg.get("vibe", DEFAULT_VIBE), interactive=True
# )
# vibe_dd.change(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="vibe", tts_model=tts_model),
# inputs=[vibe_dd, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# # Custom Instructions Textbox
# custom_instr_label = "Custom Instructions"
# custom_instr_placeholder = "Used if Vibe is 'Custom...'. Overrides Vibe preset."
# custom_instr_tb = gr.Textbox(
# label=custom_instr_label,
# value=speaker_cfg.get("custom_instructions", ""),
# placeholder=custom_instr_placeholder,
# lines=2, interactive=True
# )
# custom_instr_tb.input(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="custom_instructions", tts_model=tts_model),
# inputs=[custom_instr_tb, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# new_ui_components.append(speaker_accordion)
# print(f"Returning {len(new_ui_components)} UI components for dynamic area.") # Debug
# return new_ui_components, current_configs_state_dict
async def handle_script_processing(
openai_api_key: str, async_openai_client, nsfw_api_url_template: str, # Passed from app.py
dialogue_script: str, tts_model: str, pause_ms: int,
speaker_config_method: str, global_voice_selection: str,
speaker_configs_state_dict: dict,
global_speed: float,
global_instructions: str,
progress=gr.Progress(track_tqdm=True)
):
if not openai_api_key or not async_openai_client:
return None, None, "Error: OpenAI API Key or client is not configured."
if not dialogue_script.strip():
return None, None, "Error: Script is empty."
job_audio_path_prefix = os.path.join(tempfile.gettempdir(), f"dialogue_tts_job_{random.randint(10000, 99999)}")
if os.path.exists(job_audio_path_prefix): shutil.rmtree(job_audio_path_prefix)
os.makedirs(job_audio_path_prefix, exist_ok=True)
try:
parsed_lines, _ = parse_dialogue_script(dialogue_script)
if not parsed_lines:
shutil.rmtree(job_audio_path_prefix)
return None, None, "Error: No valid lines found in script."
except ValueError as e:
shutil.rmtree(job_audio_path_prefix)
return None, None, f"Script parsing error: {str(e)}"
if speaker_configs_state_dict is None: speaker_configs_state_dict = {}
# Ensure a default voice for safety
safe_default_global_voice = global_voice_selection if global_voice_selection in APP_AVAILABLE_VOICES else DEFAULT_FALLBACK_VOICE
speaker_voice_map = {}
if speaker_config_method in ["Random per Speaker", "A/B Round Robin"]:
unique_script_speakers_for_map = get_speakers_from_script(dialogue_script)
temp_voices_pool = APP_AVAILABLE_VOICES.copy()
if not temp_voices_pool: temp_voices_pool = [DEFAULT_FALLBACK_VOICE] # Ensure pool isn't empty
if speaker_config_method == "Random per Speaker":
for spk_name in unique_script_speakers_for_map:
speaker_voice_map[spk_name] = random.choice(temp_voices_pool)
elif speaker_config_method == "A/B Round Robin":
for i, spk_name in enumerate(unique_script_speakers_for_map):
speaker_voice_map[spk_name] = temp_voices_pool[i % len(temp_voices_pool)]
tasks = []
# line_audio_files map to store results by original line ID for correct ordering
line_audio_files_map = {}
for i, line_data in enumerate(parsed_lines):
speaker_name = line_data["speaker"]
line_voice = safe_default_global_voice
line_speed = global_speed
line_instructions = global_instructions if global_instructions and global_instructions.strip() else None
if speaker_config_method == "Detailed Configuration (Per Speaker UI)":
spk_cfg = speaker_configs_state_dict.get(speaker_name, {})
line_voice = spk_cfg.get("voice", safe_default_global_voice)
if tts_model in ["tts-1", "tts-1-hd"]:
line_speed = float(spk_cfg.get("speed", global_speed))
if tts_model == "gpt-4o-mini-tts":
vibe = spk_cfg.get("vibe", DEFAULT_VIBE)
custom_instr = spk_cfg.get("custom_instructions", "").strip()
if vibe == "Custom..." and custom_instr:
line_instructions = custom_instr
elif vibe != "None" and vibe != "Custom...":
line_instructions = PREDEFINED_VIBES.get(vibe, "")
if not line_instructions and global_instructions and global_instructions.strip():
line_instructions = global_instructions
elif not line_instructions:
line_instructions = None
elif speaker_config_method in ["Random per Speaker", "A/B Round Robin"]:
line_voice = speaker_voice_map.get(speaker_name, safe_default_global_voice)
if tts_model not in ["tts-1", "tts-1-hd"]:
line_speed = 1.0
out_fn = os.path.join(job_audio_path_prefix, f"line_{line_data['id']}_{speaker_name.replace(' ','_')}.mp3")
progress(i / len(parsed_lines), desc=f"Synthesizing: Line {i+1}/{len(parsed_lines)} ({speaker_name})")
tasks.append(synthesize_speech_line(
client=async_openai_client, text=line_data["text"], voice=line_voice,
output_path=out_fn, model=tts_model, speed=line_speed,
instructions=line_instructions, nsfw_api_url_template=nsfw_api_url_template,
line_index=line_data['id']
))
results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, res_path_or_exc in enumerate(results):
original_line_id = parsed_lines[idx]['id'] # Get original ID from the parsed line
if isinstance(res_path_or_exc, Exception):
print(f"Error synthesizing line ID {original_line_id} ({parsed_lines[idx]['speaker']}): {res_path_or_exc}")
line_audio_files_map[original_line_id] = None
elif res_path_or_exc is None:
print(f"Skipped or failed synthesizing line ID {original_line_id} ({parsed_lines[idx]['speaker']})")
line_audio_files_map[original_line_id] = None
else:
line_audio_files_map[original_line_id] = res_path_or_exc
# Reconstruct ordered list of files for merging, using original line IDs
ordered_files_for_merge_and_zip = []
for p_line in parsed_lines:
file_path = line_audio_files_map.get(p_line['id'])
if file_path and os.path.exists(file_path) and os.path.getsize(file_path) > 0:
ordered_files_for_merge_and_zip.append(file_path)
else:
ordered_files_for_merge_and_zip.append(None) # Keep placeholder for failed lines for merge logic
valid_files_for_zip = [f for f in ordered_files_for_merge_and_zip if f]
if not valid_files_for_zip:
shutil.rmtree(job_audio_path_prefix)
return None, None, "Error: No audio was successfully synthesized."
zip_fn = os.path.join(job_audio_path_prefix, "dialogue_lines.zip")
with zipfile.ZipFile(zip_fn, 'w') as zf:
for f_path in valid_files_for_zip:
zf.write(f_path, os.path.basename(f_path))
merged_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3")
# For merge_mp3_files, pass only the list of existing files in order
files_to_actually_merge = [f for f in ordered_files_for_merge_and_zip if f]
merged_path = merge_mp3_files(files_to_actually_merge, merged_fn, pause_ms)
status = f"Successfully processed {len(valid_files_for_zip)} out of {len(parsed_lines)} lines. "
if len(valid_files_for_zip) < len(parsed_lines): status += "Some lines may have failed. "
if not merged_path and len(valid_files_for_zip) > 0: status += "Merging audio failed. "
elif not merged_path: status = "No audio to merge."
else: status += "Merged audio generated."
return (zip_fn if os.path.exists(zip_fn) else None,
merged_path if merged_path and os.path.exists(merged_path) else None,
status)
def handle_calculate_cost(dialogue_script: str, tts_model: str):
if not dialogue_script.strip(): return "Cost: $0.00 (Script is empty)"
try:
parsed, chars = parse_dialogue_script(dialogue_script)
if not parsed: return "Cost: $0.00 (No valid lines in script)"
cost = calculate_cost(chars, len(parsed), tts_model)
return f"Estimated Cost for {len(parsed)} lines ({chars} chars): ${cost:.6f}"
except ValueError as e:
return f"Cost calculation error: {str(e)}"
except Exception as e:
return f"An unexpected error occurred during cost calculation: {str(e)}"
def update_model_controls_visibility(selected_model: str, script_text_for_refresh: str, current_speaker_configs_for_refresh: dict, speaker_configs_state_comp: gr.State):
"""Updates visibility of global controls and refreshes per-speaker UI when TTS model changes."""
print(f"Model changed to: {selected_model}. Refreshing dynamic UI and controls.") # Debug
try:
# load_refresh_per_speaker_ui might return components or markdown
# It now takes speaker_configs_state_comp as an argument to wire up .change() correctly
dynamic_ui_output, updated_state = load_refresh_per_speaker_ui(
script_text_for_refresh, current_speaker_configs_for_refresh, selected_model, speaker_configs_state_comp
)
except Exception as e:
print(f"Error in load_refresh_per_speaker_ui called from model_controls_visibility: {e}")
# Fallback: clear dynamic UI and keep state as is, or return an error message component
dynamic_ui_output = [gr.Markdown(f"Error refreshing per-speaker UI: {e}")]
updated_state = current_speaker_configs_for_refresh # or {} to reset
is_tts1_family = selected_model in ["tts-1", "tts-1-hd"]
is_gpt_mini_tts = selected_model == "gpt-4o-mini-tts"
# The keys in this dictionary must match the Gradio components passed in the `outputs` list
# of the .change() event.
updates = {
"global_speed_input": gr.update(visible=is_tts1_family, interactive=is_tts1_family),
"global_instructions_input": gr.update(visible=is_gpt_mini_tts, interactive=is_gpt_mini_tts),
"dynamic_speaker_ui_area": dynamic_ui_output, # This directly provides the new children for the Column
"speaker_configs_state": updated_state
}
return updates["global_speed_input"], updates["global_instructions_input"], updates["dynamic_speaker_ui_area"], updates["speaker_configs_state"]
def update_speaker_config_method_visibility(method: str):
"""Updates visibility of UI groups based on selected speaker configuration method."""
is_single = (method == "Single Voice (Global)")
is_detailed_per_speaker = (method == "Detailed Configuration (Per Speaker UI)")
# Keys here must match the Gradio components in the .change() event's `outputs` list.
return {
"single_voice_group": gr.update(visible=is_single),
"detailed_per_speaker_ui_group": gr.update(visible=is_detailed_per_speaker),
} |