esl-dialogue-tts / event_handlers.py
abocha's picture
debug
05e4a98
raw
history blame
17.7 kB
import gradio as gr
import os
import asyncio
import tempfile
import shutil
import zipfile
import random
from functools import partial
from utils.script_parser import parse_dialogue_script, calculate_cost
from utils.openai_tts import synthesize_speech_line
from utils.merge_audio import merge_mp3_files
# Import constants from ui_layout to avoid circular dependencies if they were in app.py
from ui_layout import APP_AVAILABLE_VOICES, DEFAULT_VIBE, VIBE_CHOICES, PREDEFINED_VIBES
# Ensure a default voice if APP_AVAILABLE_VOICES is empty (shouldn't happen with new ui_layout)
DEFAULT_FALLBACK_VOICE = APP_AVAILABLE_VOICES[0] if APP_AVAILABLE_VOICES else "alloy"
def get_speakers_from_script(script_text: str):
if not script_text.strip():
return []
try:
parsed_lines, _ = parse_dialogue_script(script_text)
seen_speakers = set()
ordered_unique_speakers = []
for p in parsed_lines:
if p["speaker"] not in seen_speakers:
ordered_unique_speakers.append(p["speaker"])
seen_speakers.add(p["speaker"])
return ordered_unique_speakers
except ValueError:
return []
def handle_dynamic_input_change(new_value, current_configs_state_dict: dict, speaker_name: str, config_key: str, tts_model: str):
"""Handles changes from dynamically generated UI elements for per-speaker settings."""
# print(f"Dynamic change for {speaker_name}, key {config_key}: {new_value}. State: {current_configs_state_dict}")
if current_configs_state_dict is None: # Should ideally be initialized by Gradio's gr.State
current_configs_state_dict = {}
if speaker_name not in current_configs_state_dict:
current_configs_state_dict[speaker_name] = {}
current_configs_state_dict[speaker_name][config_key] = new_value
return current_configs_state_dict
def load_refresh_per_speaker_ui(script_text: str, current_configs_state_dict: dict, tts_model: str, speaker_configs_state_component: gr.State):
"""
Generates or refreshes the dynamic UI components (accordions) for each speaker.
Returns a list of Gradio components to populate the dynamic UI area and the updated state.
"""
# event_handlers.py - inside load_refresh_per_speaker_ui
print("DEBUG: load_refresh_per_speaker_ui CALLED - HARDCODED RETURN")
debug_markdown = gr.Markdown("## !! Dynamic Area Test Content Loaded !!")
# Return this simple component and an empty dict for state for now
return [debug_markdown], {}
# Comment out ALL original logic in this function for this test.
# print(f"Load/Refresh UI called. TTS Model: {tts_model}") # Debug
# unique_speakers = get_speakers_from_script(script_text)
# new_ui_components = []
# if current_configs_state_dict is None:
# current_configs_state_dict = {}
# # Ensure a default voice for safety
# safe_default_voice = APP_AVAILABLE_VOICES[0] if APP_AVAILABLE_VOICES else "alloy"
# for speaker_name in unique_speakers:
# if speaker_name not in current_configs_state_dict:
# current_configs_state_dict[speaker_name] = {
# "voice": safe_default_voice, "speed": 1.0,
# "vibe": DEFAULT_VIBE, "custom_instructions": ""
# }
# # Ensure all keys exist with defaults
# current_configs_state_dict[speaker_name].setdefault("voice", safe_default_voice)
# current_configs_state_dict[speaker_name].setdefault("speed", 1.0)
# current_configs_state_dict[speaker_name].setdefault("vibe", DEFAULT_VIBE)
# current_configs_state_dict[speaker_name].setdefault("custom_instructions", "")
# if not unique_speakers:
# print("No unique speakers found, returning markdown.") # Debug
# new_ui_components.append(gr.Markdown("No speakers detected in the script, or script is empty. Type a script and click 'Load/Refresh' again, or change the script content."))
# return new_ui_components, current_configs_state_dict
# print(f"Found speakers: {unique_speakers}. Building UI...") # Debug
# for speaker_name in unique_speakers:
# speaker_cfg = current_configs_state_dict[speaker_name]
# speed_interactive = tts_model in ["tts-1", "tts-1-hd"]
# instructions_relevant = tts_model == "gpt-4o-mini-tts"
# # Use a unique elem_id for each accordion to help Gradio differentiate if needed
# accordion_elem_id = f"accordion_speaker_{speaker_name.replace(' ', '_')}"
# with gr.Accordion(label=f"Settings for: {speaker_name}", open=False, elem_id=accordion_elem_id) as speaker_accordion:
# # Voice Dropdown
# voice_dd = gr.Dropdown(
# label="Voice", choices=APP_AVAILABLE_VOICES, value=speaker_cfg.get("voice", safe_default_voice), interactive=True
# )
# voice_dd.change(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="voice", tts_model=tts_model),
# inputs=[voice_dd, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# # Speed Slider
# speed_slider_label = "Speech Speed" + (" (Active for tts-1/hd)" if speed_interactive else " (N/A for this model)")
# speed_slider = gr.Slider(
# label=speed_slider_label, minimum=0.25, maximum=4.0, value=float(speaker_cfg.get("speed", 1.0)),
# step=0.05, interactive=speed_interactive
# )
# if speed_interactive:
# speed_slider.release(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="speed", tts_model=tts_model),
# inputs=[speed_slider, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# # Vibe Dropdown
# vibe_label = "Vibe/Emotion Preset" + (" (For gpt-4o-mini-tts)" if instructions_relevant else " (Less impact on other models)")
# vibe_dd = gr.Dropdown(
# label=vibe_label, choices=VIBE_CHOICES, value=speaker_cfg.get("vibe", DEFAULT_VIBE), interactive=True
# )
# vibe_dd.change(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="vibe", tts_model=tts_model),
# inputs=[vibe_dd, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# # Custom Instructions Textbox
# custom_instr_label = "Custom Instructions"
# custom_instr_placeholder = "Used if Vibe is 'Custom...'. Overrides Vibe preset."
# custom_instr_tb = gr.Textbox(
# label=custom_instr_label,
# value=speaker_cfg.get("custom_instructions", ""),
# placeholder=custom_instr_placeholder,
# lines=2, interactive=True
# )
# custom_instr_tb.input(
# fn=partial(handle_dynamic_input_change, speaker_name=speaker_name, config_key="custom_instructions", tts_model=tts_model),
# inputs=[custom_instr_tb, speaker_configs_state_component],
# outputs=[speaker_configs_state_component]
# )
# new_ui_components.append(speaker_accordion)
# print(f"Returning {len(new_ui_components)} UI components for dynamic area.") # Debug
# return new_ui_components, current_configs_state_dict
async def handle_script_processing(
openai_api_key: str, async_openai_client, nsfw_api_url_template: str, # Passed from app.py
dialogue_script: str, tts_model: str, pause_ms: int,
speaker_config_method: str, global_voice_selection: str,
speaker_configs_state_dict: dict,
global_speed: float,
global_instructions: str,
progress=gr.Progress(track_tqdm=True)
):
if not openai_api_key or not async_openai_client:
return None, None, "Error: OpenAI API Key or client is not configured."
if not dialogue_script.strip():
return None, None, "Error: Script is empty."
job_audio_path_prefix = os.path.join(tempfile.gettempdir(), f"dialogue_tts_job_{random.randint(10000, 99999)}")
if os.path.exists(job_audio_path_prefix): shutil.rmtree(job_audio_path_prefix)
os.makedirs(job_audio_path_prefix, exist_ok=True)
try:
parsed_lines, _ = parse_dialogue_script(dialogue_script)
if not parsed_lines:
shutil.rmtree(job_audio_path_prefix)
return None, None, "Error: No valid lines found in script."
except ValueError as e:
shutil.rmtree(job_audio_path_prefix)
return None, None, f"Script parsing error: {str(e)}"
if speaker_configs_state_dict is None: speaker_configs_state_dict = {}
# Ensure a default voice for safety
safe_default_global_voice = global_voice_selection if global_voice_selection in APP_AVAILABLE_VOICES else DEFAULT_FALLBACK_VOICE
speaker_voice_map = {}
if speaker_config_method in ["Random per Speaker", "A/B Round Robin"]:
unique_script_speakers_for_map = get_speakers_from_script(dialogue_script)
temp_voices_pool = APP_AVAILABLE_VOICES.copy()
if not temp_voices_pool: temp_voices_pool = [DEFAULT_FALLBACK_VOICE] # Ensure pool isn't empty
if speaker_config_method == "Random per Speaker":
for spk_name in unique_script_speakers_for_map:
speaker_voice_map[spk_name] = random.choice(temp_voices_pool)
elif speaker_config_method == "A/B Round Robin":
for i, spk_name in enumerate(unique_script_speakers_for_map):
speaker_voice_map[spk_name] = temp_voices_pool[i % len(temp_voices_pool)]
tasks = []
# line_audio_files map to store results by original line ID for correct ordering
line_audio_files_map = {}
for i, line_data in enumerate(parsed_lines):
speaker_name = line_data["speaker"]
line_voice = safe_default_global_voice
line_speed = global_speed
line_instructions = global_instructions if global_instructions and global_instructions.strip() else None
if speaker_config_method == "Detailed Configuration (Per Speaker UI)":
spk_cfg = speaker_configs_state_dict.get(speaker_name, {})
line_voice = spk_cfg.get("voice", safe_default_global_voice)
if tts_model in ["tts-1", "tts-1-hd"]:
line_speed = float(spk_cfg.get("speed", global_speed))
if tts_model == "gpt-4o-mini-tts":
vibe = spk_cfg.get("vibe", DEFAULT_VIBE)
custom_instr = spk_cfg.get("custom_instructions", "").strip()
if vibe == "Custom..." and custom_instr:
line_instructions = custom_instr
elif vibe != "None" and vibe != "Custom...":
line_instructions = PREDEFINED_VIBES.get(vibe, "")
if not line_instructions and global_instructions and global_instructions.strip():
line_instructions = global_instructions
elif not line_instructions:
line_instructions = None
elif speaker_config_method in ["Random per Speaker", "A/B Round Robin"]:
line_voice = speaker_voice_map.get(speaker_name, safe_default_global_voice)
if tts_model not in ["tts-1", "tts-1-hd"]:
line_speed = 1.0
out_fn = os.path.join(job_audio_path_prefix, f"line_{line_data['id']}_{speaker_name.replace(' ','_')}.mp3")
progress(i / len(parsed_lines), desc=f"Synthesizing: Line {i+1}/{len(parsed_lines)} ({speaker_name})")
tasks.append(synthesize_speech_line(
client=async_openai_client, text=line_data["text"], voice=line_voice,
output_path=out_fn, model=tts_model, speed=line_speed,
instructions=line_instructions, nsfw_api_url_template=nsfw_api_url_template,
line_index=line_data['id']
))
results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, res_path_or_exc in enumerate(results):
original_line_id = parsed_lines[idx]['id'] # Get original ID from the parsed line
if isinstance(res_path_or_exc, Exception):
print(f"Error synthesizing line ID {original_line_id} ({parsed_lines[idx]['speaker']}): {res_path_or_exc}")
line_audio_files_map[original_line_id] = None
elif res_path_or_exc is None:
print(f"Skipped or failed synthesizing line ID {original_line_id} ({parsed_lines[idx]['speaker']})")
line_audio_files_map[original_line_id] = None
else:
line_audio_files_map[original_line_id] = res_path_or_exc
# Reconstruct ordered list of files for merging, using original line IDs
ordered_files_for_merge_and_zip = []
for p_line in parsed_lines:
file_path = line_audio_files_map.get(p_line['id'])
if file_path and os.path.exists(file_path) and os.path.getsize(file_path) > 0:
ordered_files_for_merge_and_zip.append(file_path)
else:
ordered_files_for_merge_and_zip.append(None) # Keep placeholder for failed lines for merge logic
valid_files_for_zip = [f for f in ordered_files_for_merge_and_zip if f]
if not valid_files_for_zip:
shutil.rmtree(job_audio_path_prefix)
return None, None, "Error: No audio was successfully synthesized."
zip_fn = os.path.join(job_audio_path_prefix, "dialogue_lines.zip")
with zipfile.ZipFile(zip_fn, 'w') as zf:
for f_path in valid_files_for_zip:
zf.write(f_path, os.path.basename(f_path))
merged_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3")
# For merge_mp3_files, pass only the list of existing files in order
files_to_actually_merge = [f for f in ordered_files_for_merge_and_zip if f]
merged_path = merge_mp3_files(files_to_actually_merge, merged_fn, pause_ms)
status = f"Successfully processed {len(valid_files_for_zip)} out of {len(parsed_lines)} lines. "
if len(valid_files_for_zip) < len(parsed_lines): status += "Some lines may have failed. "
if not merged_path and len(valid_files_for_zip) > 0: status += "Merging audio failed. "
elif not merged_path: status = "No audio to merge."
else: status += "Merged audio generated."
return (zip_fn if os.path.exists(zip_fn) else None,
merged_path if merged_path and os.path.exists(merged_path) else None,
status)
def handle_calculate_cost(dialogue_script: str, tts_model: str):
if not dialogue_script.strip(): return "Cost: $0.00 (Script is empty)"
try:
parsed, chars = parse_dialogue_script(dialogue_script)
if not parsed: return "Cost: $0.00 (No valid lines in script)"
cost = calculate_cost(chars, len(parsed), tts_model)
return f"Estimated Cost for {len(parsed)} lines ({chars} chars): ${cost:.6f}"
except ValueError as e:
return f"Cost calculation error: {str(e)}"
except Exception as e:
return f"An unexpected error occurred during cost calculation: {str(e)}"
def update_model_controls_visibility(selected_model: str, script_text_for_refresh: str, current_speaker_configs_for_refresh: dict, speaker_configs_state_comp: gr.State):
"""Updates visibility of global controls and refreshes per-speaker UI when TTS model changes."""
print(f"Model changed to: {selected_model}. Refreshing dynamic UI and controls.") # Debug
try:
# load_refresh_per_speaker_ui might return components or markdown
# It now takes speaker_configs_state_comp as an argument to wire up .change() correctly
dynamic_ui_output, updated_state = load_refresh_per_speaker_ui(
script_text_for_refresh, current_speaker_configs_for_refresh, selected_model, speaker_configs_state_comp
)
except Exception as e:
print(f"Error in load_refresh_per_speaker_ui called from model_controls_visibility: {e}")
# Fallback: clear dynamic UI and keep state as is, or return an error message component
dynamic_ui_output = [gr.Markdown(f"Error refreshing per-speaker UI: {e}")]
updated_state = current_speaker_configs_for_refresh # or {} to reset
is_tts1_family = selected_model in ["tts-1", "tts-1-hd"]
is_gpt_mini_tts = selected_model == "gpt-4o-mini-tts"
# The keys in this dictionary must match the Gradio components passed in the `outputs` list
# of the .change() event.
updates = {
"global_speed_input": gr.update(visible=is_tts1_family, interactive=is_tts1_family),
"global_instructions_input": gr.update(visible=is_gpt_mini_tts, interactive=is_gpt_mini_tts),
"dynamic_speaker_ui_area": dynamic_ui_output, # This directly provides the new children for the Column
"speaker_configs_state": updated_state
}
return updates["global_speed_input"], updates["global_instructions_input"], updates["dynamic_speaker_ui_area"], updates["speaker_configs_state"]
def update_speaker_config_method_visibility(method: str):
"""Updates visibility of UI groups based on selected speaker configuration method."""
is_single = (method == "Single Voice (Global)")
is_detailed_per_speaker = (method == "Detailed Configuration (Per Speaker UI)")
# Keys here must match the Gradio components in the .change() event's `outputs` list.
return {
"single_voice_group": gr.update(visible=is_single),
"detailed_per_speaker_ui_group": gr.update(visible=is_detailed_per_speaker),
}