Spaces:
Running
Running
| # FILE: event_handlers.py | |
| import gradio as gr | |
| import os | |
| import asyncio | |
| import tempfile | |
| import shutil | |
| import zipfile | |
| import random | |
| from functools import partial | |
| import datetime | |
| from utils.script_parser import parse_dialogue_script, calculate_cost | |
| from utils.openai_tts import synthesize_speech_line | |
| from utils.merge_audio import merge_mp3_files | |
| from ui_layout import APP_AVAILABLE_VOICES, DEFAULT_VIBE, VIBE_CHOICES, PREDEFINED_VIBES, DEFAULT_GLOBAL_VOICE | |
| def get_speakers_from_script(script_text: str) -> list: | |
| """Extracts unique, ordered speaker names from the script.""" | |
| if not script_text or not script_text.strip(): | |
| return [] | |
| try: | |
| parsed_lines, _ = parse_dialogue_script(script_text) # Assuming this returns (list_of_dicts, total_chars) | |
| if not parsed_lines: | |
| return [] | |
| seen_speakers = set() | |
| ordered_unique_speakers = [] | |
| for line_data in parsed_lines: | |
| speaker = line_data.get("speaker") | |
| if speaker and speaker not in seen_speakers: # Ensure speaker is not None or empty | |
| ordered_unique_speakers.append(speaker) | |
| seen_speakers.add(speaker) | |
| return ordered_unique_speakers | |
| except ValueError: | |
| print("ValueError during script parsing in get_speakers_from_script.") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error in get_speakers_from_script: {e}") | |
| return [] | |
| def handle_dynamic_accordion_input_change( | |
| new_value, # Value from the changed component (e.g., voice_dropdown) | |
| current_speaker_configs: dict, # Current value of speaker_configs_state | |
| speaker_name: str, # Passed via partial from the event listener | |
| config_key: str # Passed via partial from the event listener | |
| ): | |
| """ | |
| Updates the speaker_configs_state when a dynamic UI element within an Accordion changes. | |
| """ | |
| if not isinstance(current_speaker_configs, dict): | |
| print(f"Warning: current_speaker_configs was not a dict in handle_dynamic_accordion_input_change. Type: {type(current_speaker_configs)}. Re-initializing.") | |
| current_speaker_configs = {} | |
| updated_configs = current_speaker_configs.copy() | |
| if speaker_name not in updated_configs: | |
| updated_configs[speaker_name] = {} | |
| updated_configs[speaker_name][config_key] = new_value | |
| updated_configs["_last_dynamic_update_details"] = f"Speaker: {speaker_name}, Key: {config_key}, Val: {str(new_value)[:20]}, TS: {datetime.datetime.now().isoformat()}" | |
| print(f"DEBUG (dynamic_input_change): Speaker '{speaker_name}' config '{config_key}' to '{str(new_value)[:50]}'. New state hint: {updated_configs.get('_last_dynamic_update_details')}") | |
| return updated_configs | |
| async def handle_script_processing( | |
| openai_api_key: str, async_openai_client, nsfw_api_url_template: str, | |
| dialogue_script: str, tts_model: str, pause_ms: int, | |
| speaker_config_method: str, global_voice_selection: str, | |
| speaker_configs_state_dict: dict, | |
| global_speed: float, | |
| global_instructions: str, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| if not openai_api_key or not async_openai_client: | |
| return None, None, "Error: OpenAI API Key or client is not configured." | |
| if not dialogue_script or not dialogue_script.strip(): | |
| return None, None, "Error: Script is empty." | |
| job_audio_path_prefix = os.path.join(tempfile.gettempdir(), f"dialogue_tts_job_{random.randint(10000, 99999)}") | |
| if os.path.exists(job_audio_path_prefix): shutil.rmtree(job_audio_path_prefix) | |
| os.makedirs(job_audio_path_prefix, exist_ok=True) | |
| try: | |
| parsed_lines, _ = parse_dialogue_script(dialogue_script) | |
| if not parsed_lines: | |
| shutil.rmtree(job_audio_path_prefix); return None, None, "Error: No valid lines found in script." | |
| except ValueError as e: | |
| shutil.rmtree(job_audio_path_prefix); return None, None, f"Script parsing error: {str(e)}" | |
| if not isinstance(speaker_configs_state_dict, dict): | |
| print(f"Warning: speaker_configs_state_dict was not a dict in handle_script_processing. Re-initializing. Type: {type(speaker_configs_state_dict)}") | |
| speaker_configs_state_dict = {} | |
| safe_default_global_voice = global_voice_selection if global_voice_selection in APP_AVAILABLE_VOICES else (APP_AVAILABLE_VOICES[0] if APP_AVAILABLE_VOICES else "alloy") | |
| speaker_voice_map = {} | |
| if speaker_config_method in ["Random per Speaker", "A/B Round Robin"]: | |
| unique_script_speakers_for_map = get_speakers_from_script(dialogue_script) | |
| temp_voices_pool = APP_AVAILABLE_VOICES.copy() | |
| if not temp_voices_pool: temp_voices_pool = [safe_default_global_voice] | |
| if speaker_config_method == "Random per Speaker": | |
| for spk_name in unique_script_speakers_for_map: | |
| speaker_voice_map[spk_name] = random.choice(temp_voices_pool) | |
| elif speaker_config_method == "A/B Round Robin" and temp_voices_pool: | |
| for i, spk_name in enumerate(unique_script_speakers_for_map): | |
| speaker_voice_map[spk_name] = temp_voices_pool[i % len(temp_voices_pool)] | |
| task_info_list = [] | |
| for i, line_data in enumerate(parsed_lines): | |
| speaker_name = line_data["speaker"] | |
| line_text = line_data["text"] | |
| line_id = line_data["id"] | |
| line_voice = safe_default_global_voice | |
| line_speed = global_speed | |
| line_instructions = global_instructions.strip() if global_instructions and global_instructions.strip() else None | |
| if speaker_config_method == "Detailed Configuration (Per Speaker UI)": | |
| spk_cfg = speaker_configs_state_dict.get(speaker_name, {}) | |
| line_voice = spk_cfg.get("voice", safe_default_global_voice) | |
| if tts_model in ["tts-1", "tts-1-hd"]: | |
| line_speed = float(spk_cfg.get("speed", global_speed)) | |
| else: line_speed = 1.0 | |
| if tts_model == "gpt-4o-mini-tts": | |
| vibe = spk_cfg.get("vibe", DEFAULT_VIBE) | |
| custom_instr_raw = spk_cfg.get("custom_instructions", "") | |
| custom_instr = custom_instr_raw.strip() if custom_instr_raw else "" | |
| current_line_specific_instructions = None | |
| if vibe == "Custom..." and custom_instr: | |
| current_line_specific_instructions = custom_instr | |
| elif vibe != "None" and vibe != "Custom..." and PREDEFINED_VIBES.get(vibe): | |
| current_line_specific_instructions = PREDEFINED_VIBES[vibe] | |
| # If per-speaker instructions are set, they take precedence. Otherwise, fall back to global instructions. | |
| line_instructions = current_line_specific_instructions if current_line_specific_instructions is not None else line_instructions | |
| else: # tts-1, tts-1-hd do not use vibe/custom_instructions from per-speaker UI | |
| # They will use the global_instructions if set. | |
| pass # line_instructions already set to global_instructions or None | |
| elif speaker_config_method in ["Random per Speaker", "A/B Round Robin"]: | |
| line_voice = speaker_voice_map.get(speaker_name, safe_default_global_voice) | |
| # For these methods, speed and instructions remain global | |
| if tts_model not in ["tts-1", "tts-1-hd"]: line_speed = 1.0 | |
| if tts_model not in ["tts-1", "tts-1-hd"]: line_speed = 1.0 | |
| out_fn = os.path.join(job_audio_path_prefix, f"line_{line_id}_{speaker_name.replace(' ','_')}.mp3") | |
| progress(i / len(parsed_lines), desc=f"Synthesizing: Line {i+1}/{len(parsed_lines)} ({speaker_name})") | |
| current_task = synthesize_speech_line( | |
| client=async_openai_client, text=line_text, voice=line_voice, | |
| output_path=out_fn, model=tts_model, speed=line_speed, | |
| instructions=line_instructions, nsfw_api_url_template=nsfw_api_url_template, | |
| line_index=line_id | |
| ) | |
| task_info_list.append({"id": line_id, "speaker": speaker_name, "task": current_task, "out_fn": out_fn}) | |
| processed_results_map = {} # Store by line_id for easier lookup | |
| for info in task_info_list: | |
| try: | |
| result_path = await info['task'] | |
| processed_results_map[info['id']] = {"path": result_path, "speaker": info['speaker']} | |
| except Exception as e: | |
| print(f"Error synthesizing line ID {info['id']} ({info['speaker']}): {e}") | |
| processed_results_map[info['id']] = {"path": None, "error": e, "speaker": info['speaker']} | |
| ordered_files_for_merge_and_zip = [] | |
| for p_line in parsed_lines: # Iterate through original parsed lines to maintain order | |
| line_id = p_line['id'] | |
| res = processed_results_map.get(line_id) | |
| if res and res.get("path") and os.path.exists(res["path"]) and os.path.getsize(res["path"]) > 0: | |
| ordered_files_for_merge_and_zip.append(res["path"]) | |
| else: | |
| # File was not successfully created or result not found, append None placeholder | |
| ordered_files_for_merge_and_zip.append(None) | |
| if res: print(f"Skipped or failed synthesizing line ID {line_id} ({res.get('speaker', 'Unknown')}) for merge/zip.") | |
| else: print(f"Result for line ID {line_id} not found in processed_results_map.") | |
| valid_files_for_zip = [f for f in ordered_files_for_merge_and_zip if f] | |
| if not valid_files_for_zip: | |
| shutil.rmtree(job_audio_path_prefix); return None, None, "Error: No audio was successfully synthesized for any line." | |
| zip_fn = os.path.join(job_audio_path_prefix, "dialogue_lines.zip") | |
| with zipfile.ZipFile(zip_fn, 'w') as zf: | |
| for f_path in valid_files_for_zip: | |
| zf.write(f_path, os.path.basename(f_path)) | |
| files_to_actually_merge = valid_files_for_zip # Already ordered and filtered | |
| merged_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3") | |
| merged_path = merge_mp3_files(files_to_actually_merge, merged_fn, pause_ms) | |
| status_msg = f"Successfully processed {len(valid_files_for_zip)} out of {len(parsed_lines)} lines. " | |
| if len(valid_files_for_zip) < len(parsed_lines): status_msg += "Some lines may have failed. " | |
| if not merged_path and len(valid_files_for_zip) > 0 : status_msg += "Merging audio failed. " | |
| elif not merged_path: status_msg = "No audio to merge (all lines failed or were skipped)." | |
| else: status_msg += "Merged audio generated." | |
| return (zip_fn if os.path.exists(zip_fn) else None, | |
| merged_path if merged_path and os.path.exists(merged_path) else None, | |
| status_msg) | |
| def handle_calculate_cost(dialogue_script: str, tts_model: str): | |
| if not dialogue_script or not dialogue_script.strip(): return "Cost: $0.00 (Script is empty)" | |
| try: | |
| parsed_lines, total_chars = parse_dialogue_script(dialogue_script) | |
| if not parsed_lines: return "Cost: $0.00 (No valid lines in script)" | |
| cost = calculate_cost(total_chars, len(parsed_lines), tts_model) | |
| return f"Estimated Cost for {len(parsed_lines)} lines ({total_chars} chars): ${cost:.6f}" | |
| except ValueError as e: return f"Cost calculation error: {str(e)}" | |
| except Exception as e: return f"An unexpected error: {str(e)}" | |
| def handle_load_refresh_per_speaker_ui_trigger(script_text: str, current_speaker_configs: dict, tts_model: str): | |
| print(f"DEBUG (Load/Refresh Trigger): Script: '{script_text[:30]}...', Model: {tts_model}, Current State Keys: {list(current_speaker_configs.keys()) if isinstance(current_speaker_configs, dict) else 'Not a dict'}") | |
| if not isinstance(current_speaker_configs, dict): current_speaker_configs = {} | |
| updated_configs = current_speaker_configs.copy() | |
| # Optionally, you might want to pre-populate settings for new speakers here | |
| # unique_speakers_in_script = get_speakers_from_script(script_text) | |
| # for speaker in unique_speakers_in_script: | |
| # if speaker not in updated_configs: | |
| # updated_configs[speaker] = {"voice": DEFAULT_GLOBAL_VOICE} # Default init | |
| # if tts_model in ["tts-1", "tts-1-hd"]: updated_configs[speaker]["speed"] = 1.0 | |
| # elif tts_model == "gpt-4o-mini-tts": updated_configs[speaker]["vibe"] = DEFAULT_VIBE | |
| updated_configs["_last_action_source"] = "load_refresh_button" | |
| updated_configs["_last_action_timestamp"] = datetime.datetime.now().isoformat() | |
| return updated_configs | |
| def handle_tts_model_change(selected_model: str, current_speaker_configs: dict): | |
| print(f"DEBUG (TTS Model Change): Model: {selected_model}, Current State Keys: {list(current_speaker_configs.keys()) if isinstance(current_speaker_configs, dict) else 'Not a dict'}") | |
| if not isinstance(current_speaker_configs, dict): current_speaker_configs = {} | |
| updated_configs = current_speaker_configs.copy() | |
| # When model changes, you might want to reset or adjust model-specific settings for all speakers | |
| # For example, 'speed' is for tts-1, 'vibe' for gpt-4o-mini-tts | |
| for speaker_name_key in list(updated_configs.keys()): # Iterate over keys if modifying dict | |
| if isinstance(updated_configs[speaker_name_key], dict): # Check if it's a speaker config dict | |
| if selected_model == "gpt-4o-mini-tts": | |
| updated_configs[speaker_name_key].pop("speed", None) | |
| if "vibe" not in updated_configs[speaker_name_key]: | |
| updated_configs[speaker_name_key]["vibe"] = DEFAULT_VIBE | |
| elif selected_model in ["tts-1", "tts-1-hd"]: | |
| updated_configs[speaker_name_key].pop("vibe", None) | |
| updated_configs[speaker_name_key].pop("custom_instructions", None) | |
| if "speed" not in updated_configs[speaker_name_key]: | |
| updated_configs[speaker_name_key]["speed"] = 1.0 | |
| # Add other model-specific adjustments if needed | |
| updated_configs["_last_action_source"] = "tts_model_change" | |
| updated_configs["_last_action_timestamp"] = datetime.datetime.now().isoformat() | |
| is_tts1_family = selected_model in ["tts-1", "tts-1-hd"] | |
| is_gpt_mini_tts = selected_model == "gpt-4o-mini-tts" | |
| return ( | |
| gr.update(visible=is_tts1_family, interactive=is_tts1_family), | |
| gr.update(visible=is_gpt_mini_tts, interactive=is_gpt_mini_tts), | |
| updated_configs | |
| ) | |
| def handle_speaker_config_method_visibility_change(method: str): | |
| print(f"DEBUG (Config Method Change): Method: {method}") | |
| is_single_voice_visible = (method == "Single Voice (Global)") | |
| is_detailed_per_speaker_container_visible = (method == "Detailed Configuration (Per Speaker UI)") | |
| return ( | |
| gr.update(visible=is_single_voice_visible), | |
| gr.update(visible=is_detailed_per_speaker_container_visible) | |
| ) |