Spaces:
Running
Running
import gradio as gr | |
import os | |
import asyncio | |
import tempfile | |
import shutil | |
import zipfile | |
import random | |
import json | |
from openai import AsyncOpenAI | |
from utils.script_parser import parse_dialogue_script, calculate_cost, MAX_SCRIPT_LENGTH | |
from utils.openai_tts import synthesize_speech_line, OPENAI_VOICES as ALL_TTS_VOICES | |
from utils.merge_audio import merge_mp3_files | |
# --- Configuration --- | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
NSFW_API_URL_TEMPLATE = os.getenv("NSFW_API_URL_TEMPLATE") | |
MODEL_DEFAULT = os.getenv("MODEL_DEFAULT", "tts-1-hd") | |
if not OPENAI_API_KEY: | |
try: | |
from huggingface_hub import HfApi | |
api = HfApi() | |
space_id = os.getenv("SPACE_ID") | |
if space_id: | |
secrets = api.get_space_secrets(repo_id=space_id) | |
OPENAI_API_KEY = secrets.get("OPENAI_API_KEY") | |
NSFW_API_URL_TEMPLATE = secrets.get("NSFW_API_URL_TEMPLATE", NSFW_API_URL_TEMPLATE) | |
MODEL_DEFAULT = secrets.get("MODEL_DEFAULT", MODEL_DEFAULT) | |
except Exception as e: | |
print(f"Could not retrieve secrets from Hugging Face Hub: {e}") | |
async_openai_client = None | |
if OPENAI_API_KEY: | |
async_openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY) | |
else: | |
print("ERROR: OPENAI_API_KEY secret is not set. The application will not function properly.") | |
TTS_MODELS_AVAILABLE = ["tts-1", "tts-1-hd", "gpt-4o-mini-tts"] | |
if MODEL_DEFAULT not in TTS_MODELS_AVAILABLE: | |
MODEL_DEFAULT = "tts-1-hd" | |
SPEAKER_CONFIG_METHODS = [ | |
"Single Voice (Global)", | |
"Random per Speaker", | |
"A/B Round Robin", | |
"Detailed Configuration (JSON)" | |
] | |
DEFAULT_SPEAKER_CONFIG_METHOD = "Random per Speaker" | |
APP_AVAILABLE_VOICES = ALL_TTS_VOICES.copy() | |
_speaker_config_cache = {} | |
def parse_detailed_speaker_config(json_text, parsed_script_lines): | |
config_map = {} | |
default_voice = APP_AVAILABLE_VOICES[0] | |
try: | |
if not json_text.strip(): | |
return {} | |
config_list = json.loads(json_text) | |
if not isinstance(config_list, list): | |
raise ValueError("JSON config must be a list of speaker objects.") | |
for item in config_list: | |
if not isinstance(item, dict) or "speaker" not in item or "voice" not in item: | |
print(f"Skipping malformed item in JSON config: {item}") | |
continue | |
if item["voice"] not in APP_AVAILABLE_VOICES: | |
print(f"Warning: Voice '{item['voice']}' for speaker '{item['speaker']}' not recognized. Falling back to '{default_voice}'.") | |
item["voice"] = default_voice | |
if "speed" in item: | |
try: | |
item["speed"] = float(item["speed"]) | |
if not (0.25 <= item["speed"] <= 4.0): | |
print(f"Warning: Speed for speaker '{item['speaker']}' out of range (0.25-4.0). Clamping.") | |
item["speed"] = max(0.25, min(item["speed"], 4.0)) | |
except ValueError: | |
print(f"Warning: Invalid speed value for speaker '{item['speaker']}'. Using default.") | |
item.pop("speed", None) | |
config_map[item["speaker"]] = { | |
"voice": item["voice"], | |
"speed": item.get("speed"), | |
"instructions": item.get("instructions") | |
} | |
return config_map | |
except json.JSONDecodeError as e: | |
raise ValueError(f"Invalid JSON in Detailed Speaker Configuration: {e}") | |
except ValueError as e: | |
raise e | |
except Exception as e: | |
raise ValueError(f"Error parsing Detailed Speaker Configuration: {e}") | |
def get_config_for_speaker(speaker_name, speaker_config_method, unique_script_speakers, | |
global_selected_voice, detailed_config_map): | |
global _speaker_config_cache | |
if _speaker_config_cache.get("__method") != speaker_config_method or \ | |
_speaker_config_cache.get("__speakers_set") != frozenset(unique_script_speakers): | |
_speaker_config_cache = {"__method": speaker_config_method, "__speakers_set": frozenset(unique_script_speakers)} | |
base_config = {"voice": APP_AVAILABLE_VOICES[0], "speed": None, "instructions": None} | |
if speaker_config_method == "Single Voice (Global)": | |
base_config["voice"] = global_selected_voice if global_selected_voice in APP_AVAILABLE_VOICES else APP_AVAILABLE_VOICES[0] | |
return base_config | |
if speaker_config_method == "Detailed Configuration (JSON)": | |
if speaker_name in detailed_config_map: | |
speaker_specific = detailed_config_map[speaker_name] | |
return { | |
"voice": speaker_specific.get("voice", base_config["voice"]), | |
"speed": speaker_specific.get("speed"), | |
"instructions": speaker_specific.get("instructions") | |
} | |
else: | |
print(f"Warning: Speaker '{speaker_name}' not found in Detailed JSON. Using default voice '{base_config['voice']}'.") | |
return base_config | |
if speaker_name not in _speaker_config_cache: | |
if speaker_config_method == "Random per Speaker": | |
available_voices_shuffled = random.sample(APP_AVAILABLE_VOICES, len(APP_AVAILABLE_VOICES)) | |
if not _speaker_config_cache.get("__all_assigned_random"): | |
for i, spk_unique in enumerate(unique_script_speakers): | |
if spk_unique not in _speaker_config_cache: | |
_speaker_config_cache[spk_unique] = {"voice": available_voices_shuffled[i % len(available_voices_shuffled)]} | |
_speaker_config_cache["__all_assigned_random"] = True | |
if speaker_name not in _speaker_config_cache: | |
_speaker_config_cache[speaker_name] = {"voice": random.choice(APP_AVAILABLE_VOICES)} | |
elif speaker_config_method == "A/B Round Robin": | |
if not _speaker_config_cache.get("__all_assigned_ab"): | |
for i, spk_unique in enumerate(unique_script_speakers): | |
if spk_unique not in _speaker_config_cache: | |
_speaker_config_cache[spk_unique] = {"voice": APP_AVAILABLE_VOICES[i % len(APP_AVAILABLE_VOICES)]} | |
_speaker_config_cache["__all_assigned_ab"] = True | |
if speaker_name not in _speaker_config_cache: | |
speaker_idx = unique_script_speakers.index(speaker_name) if speaker_name in unique_script_speakers else 0 | |
_speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[speaker_idx % len(APP_AVAILABLE_VOICES)]} | |
else: | |
_speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[0]} | |
cached_entry = _speaker_config_cache.get(speaker_name, base_config.copy()) | |
return {"voice": cached_entry.get("voice", base_config["voice"]), "speed": None, "instructions": None} | |
async def handle_script_processing( | |
dialogue_script: str, tts_model: str, pause_ms: int, | |
speaker_config_method: str, global_voice_selection: str, | |
detailed_speaker_json: str, global_speed: float, | |
global_instructions: str, progress=gr.Progress(track_tqdm=True)): | |
global _speaker_config_cache | |
_speaker_config_cache = {} | |
if not OPENAI_API_KEY or not async_openai_client: | |
return None, None, "Error: OPENAI_API_KEY is not configured." | |
if not dialogue_script.strip(): | |
return None, None, "Error: Script is empty." | |
job_audio_path_prefix = os.path.join(tempfile.gettempdir(), "current_job_audio") | |
if os.path.exists(job_audio_path_prefix): | |
shutil.rmtree(job_audio_path_prefix) | |
os.makedirs(job_audio_path_prefix, exist_ok=True) | |
try: | |
parsed_lines, total_chars = parse_dialogue_script(dialogue_script) | |
if not parsed_lines: | |
return None, None, "Error: No valid dialogue lines found." | |
except ValueError as e: | |
return None, None, f"Script parsing error: {str(e)}" | |
unique_speakers = sorted(list(set(p["speaker"] for p in parsed_lines))) | |
parsed_detailed_config_map = {} | |
if speaker_config_method == "Detailed Configuration (JSON)": | |
try: | |
parsed_detailed_config_map = parse_detailed_speaker_config(detailed_speaker_json, parsed_lines) | |
except ValueError as e: | |
return None, None, f"Configuration Error: {str(e)}" | |
tasks, line_audio_files = [], [None] * len(parsed_lines) | |
for i, line_data in enumerate(parsed_lines): | |
speaker_name = line_data["speaker"] | |
speaker_base_cfg = get_config_for_speaker( | |
speaker_name, speaker_config_method, unique_speakers, | |
global_voice_selection, parsed_detailed_config_map) | |
line_voice = speaker_base_cfg["voice"] | |
effective_speed = global_speed | |
if speaker_base_cfg.get("speed") is not None: | |
effective_speed = speaker_base_cfg["speed"] | |
effective_instructions = global_instructions if global_instructions and global_instructions.strip() else None | |
if speaker_base_cfg.get("instructions") is not None and speaker_base_cfg["instructions"].strip(): | |
effective_instructions = speaker_base_cfg["instructions"] | |
output_filename = os.path.join(job_audio_path_prefix, f"line_{line_data['id']}.mp3") | |
progress(i / len(parsed_lines), desc=f"Synthesizing line {i+1}/{len(parsed_lines)} ({speaker_name} w/ {line_voice})") | |
tasks.append(synthesize_speech_line( | |
client=async_openai_client, text=line_data["text"], voice=line_voice, | |
output_path=output_filename, model=tts_model, speed=effective_speed, | |
instructions=effective_instructions, nsfw_api_url_template=NSFW_API_URL_TEMPLATE, | |
line_index=line_data['id'])) | |
synthesis_results = await asyncio.gather(*tasks, return_exceptions=True) | |
for idx, result in enumerate(synthesis_results): | |
if isinstance(result, Exception): print(f"Error for line {parsed_lines[idx]['id']}: {result}") | |
elif result is None: print(f"Skipped/failed line {parsed_lines[idx]['id']}") | |
else: line_audio_files[idx] = result | |
valid_audio_files = [f for f in line_audio_files if f and os.path.exists(f) and os.path.getsize(f) > 0] | |
if not valid_audio_files: | |
shutil.rmtree(job_audio_path_prefix) | |
return None, None, "Error: No audio files successfully synthesized." | |
zip_filename = os.path.join(job_audio_path_prefix, "dialogue_lines.zip") | |
with zipfile.ZipFile(zip_filename, 'w') as zf: | |
for pth in valid_audio_files: zf.write(pth, os.path.basename(pth)) | |
merged_mp3_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3") | |
merged_out_path = merge_mp3_files([f for f in line_audio_files if f], merged_mp3_fn, pause_ms) | |
status = f"Processed {len(valid_audio_files)}/{len(parsed_lines)} lines. " | |
if len(valid_audio_files) < len(parsed_lines): status += "Some lines failed/skipped. " | |
if not merged_out_path and len(valid_audio_files) > 0: status += "Merged audio failed. " | |
elif not merged_out_path and len(valid_audio_files) == 0: status += "No audio generated." | |
else: status += "Outputs generated." | |
return (zip_filename if os.path.exists(zip_filename) else None, | |
merged_out_path if merged_out_path and os.path.exists(merged_out_path) else None, | |
status) | |
def handle_calculate_cost(dialogue_script: str, tts_model: str): | |
if not dialogue_script.strip(): return "Cost: $0.000000 (Empty script)" | |
try: | |
parsed, chars = parse_dialogue_script(dialogue_script) | |
if not parsed: return "Cost: $0.000000 (No valid lines)" | |
cost = calculate_cost(chars, len(parsed), tts_model) | |
return f"Est. Cost: ${cost:.6f} ({chars} chars, {len(parsed)} lines)" | |
except ValueError as e: return f"Error: {str(e)}" | |
except Exception as e: return f"Cost calc error: {str(e)}" | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Dialogue Script to Speech Converter (Enhanced)") | |
gr.Markdown("Convert scripts with control over voices, speed, and instructions.") | |
if not OPENAI_API_KEY or not async_openai_client: | |
gr.Markdown("<h3 style='color:red;'>Warning: OPENAI_API_KEY not set. Synthesis will fail.</h3>") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
script_input = gr.TextArea(label="Dialogue Script", placeholder="[S1] Hi!\n[S2] Hello!", lines=10, info=f"Max {MAX_SCRIPT_LENGTH} chars.") | |
with gr.Column(scale=1): | |
tts_model_dropdown = gr.Dropdown(TTS_MODELS_AVAILABLE, label="TTS Model", value=MODEL_DEFAULT, info="Affects controls below.") | |
pause_input = gr.Number(label="Pause (ms)", value=500, minimum=0, maximum=5000, step=50, info="Between merged lines.") | |
global_speed_input = gr.Slider(minimum=0.25, maximum=4.0, value=1.0, step=0.05, label="Global Speed (tts-1/tts-1-hd)", visible=(MODEL_DEFAULT in ["tts-1", "tts-1-hd"]), interactive=True) | |
global_instructions_input = gr.Textbox(label="Global Instructions (gpt-4o-mini-tts)", placeholder="e.g., Speak calmly.", visible=(MODEL_DEFAULT == "gpt-4o-mini-tts"), interactive=True, lines=2) | |
gr.Markdown("### Speaker Configuration") | |
with gr.Row(): | |
speaker_config_method_dropdown = gr.Dropdown(SPEAKER_CONFIG_METHODS, label="Speaker Config Method", value=DEFAULT_SPEAKER_CONFIG_METHOD) | |
global_voice_dropdown = gr.Dropdown(APP_AVAILABLE_VOICES, label="Global Voice ('Single Voice' method)", value=APP_AVAILABLE_VOICES[0], visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Single Voice (Global)"), interactive=True) | |
initial_json_config_example = """[ | |
{"speaker": "Alice", "voice": "nova", "speed": 1.1, "instructions": "sound excited"}, | |
{"speaker": "Bob", "voice": "echo"}, | |
{"speaker": "Narrator", "voice": "shimmer", "instructions": "be very serious"} | |
]""".strip() | |
detailed_speaker_config_input = gr.Code( | |
label="Detailed Speaker Configuration (JSON)", | |
language="json", | |
lines=7, | |
value=initial_json_config_example, | |
visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)") | |
) | |
# New Markdown for info text, visibility tied to dropdown | |
detailed_config_info_md = gr.Markdown( | |
"<small>Define voice per speaker. Optionally, `speed` (0.25-4.0) for `tts-1`/`tts-1-hd` models, " | |
"and `instructions` (text) for `gpt-4o-mini-tts`.</small>", | |
visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)") | |
) | |
with gr.Row(): | |
calculate_cost_button = gr.Button("Calculate Cost") | |
generate_button = gr.Button("Generate Audio", variant="primary") | |
cost_output = gr.Textbox(label="Estimated Cost", interactive=False) | |
with gr.Row(): | |
individual_lines_zip_output = gr.File(label="Download ZIP") | |
merged_dialogue_mp3_output = gr.Audio(label="Merged MP3", type="filepath") | |
status_output = gr.Textbox(label="Status", interactive=False, lines=2) | |
def update_model_controls(selected_model): | |
is_tts1 = selected_model in ["tts-1", "tts-1-hd"] | |
is_gpt_mini = selected_model == "gpt-4o-mini-tts" | |
return { | |
global_speed_input: gr.update(visible=is_tts1, interactive=is_tts1), | |
global_instructions_input: gr.update(visible=is_gpt_mini, interactive=is_gpt_mini) | |
} | |
tts_model_dropdown.change(fn=update_model_controls, inputs=[tts_model_dropdown], outputs=[global_speed_input, global_instructions_input]) | |
def update_speaker_controls(method): | |
is_single = (method == "Single Voice (Global)") | |
is_detailed = (method == "Detailed Configuration (JSON)") | |
return { | |
global_voice_dropdown: gr.update(visible=is_single, interactive=is_single), | |
detailed_speaker_config_input: gr.update(visible=is_detailed, interactive=is_detailed), | |
detailed_config_info_md: gr.update(visible=is_detailed) # Control visibility of new Markdown | |
} | |
speaker_config_method_dropdown.change(fn=update_speaker_controls, inputs=[speaker_config_method_dropdown], outputs=[global_voice_dropdown, detailed_speaker_config_input, detailed_config_info_md]) | |
calculate_cost_button.click(fn=handle_calculate_cost, inputs=[script_input, tts_model_dropdown], outputs=[cost_output]) | |
generate_button.click( | |
fn=handle_script_processing, | |
inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown, | |
global_voice_dropdown, detailed_speaker_config_input, global_speed_input, global_instructions_input], | |
outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output]) | |
gr.Markdown("## Examples") | |
ex_script1 = "[A] Hi B.\n[B] Hey A.\n[A] What's up?" | |
ex_json1 = '[{"speaker": "A", "voice": "nova", "instr...": "curious"}, {"speaker": "B", "voice": "echo", "speed": 0.9}]'.replace("instr...", "instructions") # Hack for brevity | |
ex_script2 = "[Cpt] Status?\n[Comp] Nominal. Slow.\n[Cpt] Good." | |
ex_json2 = '[{"speaker": "Cpt", "voice": "alloy"}, {"speaker": "Comp", "voice": "onyx", "speed": 0.8, "instr...": "robotic"}]'.replace("instr...", "instructions") | |
gr.Examples( | |
examples=[ | |
[ex_script1, "gpt-4o-mini-tts", 250, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json1, 1.0, "Speak naturally."], | |
[ex_script2, "tts-1-hd", 300, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json2, 1.1, ""], | |
["[N] Single line.", "tts-1", 0, "Single Voice (Global)", "fable", "", 1.2, ""]], | |
inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown, global_voice_dropdown, | |
detailed_speaker_config_input, global_speed_input, global_instructions_input], | |
outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output], | |
fn=handle_script_processing, cache_examples=False) | |
if __name__ == "__main__": | |
if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
demo.launch() |