abocha's picture
fix
954cec7
raw
history blame
18 kB
import gradio as gr
import os
import asyncio
import tempfile
import shutil
import zipfile
import random
import json
from openai import AsyncOpenAI
from utils.script_parser import parse_dialogue_script, calculate_cost, MAX_SCRIPT_LENGTH
from utils.openai_tts import synthesize_speech_line, OPENAI_VOICES as ALL_TTS_VOICES
from utils.merge_audio import merge_mp3_files
# --- Configuration ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
NSFW_API_URL_TEMPLATE = os.getenv("NSFW_API_URL_TEMPLATE")
MODEL_DEFAULT = os.getenv("MODEL_DEFAULT", "tts-1-hd")
if not OPENAI_API_KEY:
try:
from huggingface_hub import HfApi
api = HfApi()
space_id = os.getenv("SPACE_ID")
if space_id:
secrets = api.get_space_secrets(repo_id=space_id)
OPENAI_API_KEY = secrets.get("OPENAI_API_KEY")
NSFW_API_URL_TEMPLATE = secrets.get("NSFW_API_URL_TEMPLATE", NSFW_API_URL_TEMPLATE)
MODEL_DEFAULT = secrets.get("MODEL_DEFAULT", MODEL_DEFAULT)
except Exception as e:
print(f"Could not retrieve secrets from Hugging Face Hub: {e}")
async_openai_client = None
if OPENAI_API_KEY:
async_openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
else:
print("ERROR: OPENAI_API_KEY secret is not set. The application will not function properly.")
TTS_MODELS_AVAILABLE = ["tts-1", "tts-1-hd", "gpt-4o-mini-tts"]
if MODEL_DEFAULT not in TTS_MODELS_AVAILABLE:
MODEL_DEFAULT = "tts-1-hd"
SPEAKER_CONFIG_METHODS = [
"Single Voice (Global)",
"Random per Speaker",
"A/B Round Robin",
"Detailed Configuration (JSON)"
]
DEFAULT_SPEAKER_CONFIG_METHOD = "Random per Speaker"
APP_AVAILABLE_VOICES = ALL_TTS_VOICES.copy()
_speaker_config_cache = {}
def parse_detailed_speaker_config(json_text, parsed_script_lines):
config_map = {}
default_voice = APP_AVAILABLE_VOICES[0]
try:
if not json_text.strip():
return {}
config_list = json.loads(json_text)
if not isinstance(config_list, list):
raise ValueError("JSON config must be a list of speaker objects.")
for item in config_list:
if not isinstance(item, dict) or "speaker" not in item or "voice" not in item:
print(f"Skipping malformed item in JSON config: {item}")
continue
if item["voice"] not in APP_AVAILABLE_VOICES:
print(f"Warning: Voice '{item['voice']}' for speaker '{item['speaker']}' not recognized. Falling back to '{default_voice}'.")
item["voice"] = default_voice
if "speed" in item:
try:
item["speed"] = float(item["speed"])
if not (0.25 <= item["speed"] <= 4.0):
print(f"Warning: Speed for speaker '{item['speaker']}' out of range (0.25-4.0). Clamping.")
item["speed"] = max(0.25, min(item["speed"], 4.0))
except ValueError:
print(f"Warning: Invalid speed value for speaker '{item['speaker']}'. Using default.")
item.pop("speed", None)
config_map[item["speaker"]] = {
"voice": item["voice"],
"speed": item.get("speed"),
"instructions": item.get("instructions")
}
return config_map
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in Detailed Speaker Configuration: {e}")
except ValueError as e:
raise e
except Exception as e:
raise ValueError(f"Error parsing Detailed Speaker Configuration: {e}")
def get_config_for_speaker(speaker_name, speaker_config_method, unique_script_speakers,
global_selected_voice, detailed_config_map):
global _speaker_config_cache
if _speaker_config_cache.get("__method") != speaker_config_method or \
_speaker_config_cache.get("__speakers_set") != frozenset(unique_script_speakers):
_speaker_config_cache = {"__method": speaker_config_method, "__speakers_set": frozenset(unique_script_speakers)}
base_config = {"voice": APP_AVAILABLE_VOICES[0], "speed": None, "instructions": None}
if speaker_config_method == "Single Voice (Global)":
base_config["voice"] = global_selected_voice if global_selected_voice in APP_AVAILABLE_VOICES else APP_AVAILABLE_VOICES[0]
return base_config
if speaker_config_method == "Detailed Configuration (JSON)":
if speaker_name in detailed_config_map:
speaker_specific = detailed_config_map[speaker_name]
return {
"voice": speaker_specific.get("voice", base_config["voice"]),
"speed": speaker_specific.get("speed"),
"instructions": speaker_specific.get("instructions")
}
else:
print(f"Warning: Speaker '{speaker_name}' not found in Detailed JSON. Using default voice '{base_config['voice']}'.")
return base_config
if speaker_name not in _speaker_config_cache:
if speaker_config_method == "Random per Speaker":
available_voices_shuffled = random.sample(APP_AVAILABLE_VOICES, len(APP_AVAILABLE_VOICES))
if not _speaker_config_cache.get("__all_assigned_random"):
for i, spk_unique in enumerate(unique_script_speakers):
if spk_unique not in _speaker_config_cache:
_speaker_config_cache[spk_unique] = {"voice": available_voices_shuffled[i % len(available_voices_shuffled)]}
_speaker_config_cache["__all_assigned_random"] = True
if speaker_name not in _speaker_config_cache:
_speaker_config_cache[speaker_name] = {"voice": random.choice(APP_AVAILABLE_VOICES)}
elif speaker_config_method == "A/B Round Robin":
if not _speaker_config_cache.get("__all_assigned_ab"):
for i, spk_unique in enumerate(unique_script_speakers):
if spk_unique not in _speaker_config_cache:
_speaker_config_cache[spk_unique] = {"voice": APP_AVAILABLE_VOICES[i % len(APP_AVAILABLE_VOICES)]}
_speaker_config_cache["__all_assigned_ab"] = True
if speaker_name not in _speaker_config_cache:
speaker_idx = unique_script_speakers.index(speaker_name) if speaker_name in unique_script_speakers else 0
_speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[speaker_idx % len(APP_AVAILABLE_VOICES)]}
else:
_speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[0]}
cached_entry = _speaker_config_cache.get(speaker_name, base_config.copy())
return {"voice": cached_entry.get("voice", base_config["voice"]), "speed": None, "instructions": None}
async def handle_script_processing(
dialogue_script: str, tts_model: str, pause_ms: int,
speaker_config_method: str, global_voice_selection: str,
detailed_speaker_json: str, global_speed: float,
global_instructions: str, progress=gr.Progress(track_tqdm=True)):
global _speaker_config_cache
_speaker_config_cache = {}
if not OPENAI_API_KEY or not async_openai_client:
return None, None, "Error: OPENAI_API_KEY is not configured."
if not dialogue_script.strip():
return None, None, "Error: Script is empty."
job_audio_path_prefix = os.path.join(tempfile.gettempdir(), "current_job_audio")
if os.path.exists(job_audio_path_prefix):
shutil.rmtree(job_audio_path_prefix)
os.makedirs(job_audio_path_prefix, exist_ok=True)
try:
parsed_lines, total_chars = parse_dialogue_script(dialogue_script)
if not parsed_lines:
return None, None, "Error: No valid dialogue lines found."
except ValueError as e:
return None, None, f"Script parsing error: {str(e)}"
unique_speakers = sorted(list(set(p["speaker"] for p in parsed_lines)))
parsed_detailed_config_map = {}
if speaker_config_method == "Detailed Configuration (JSON)":
try:
parsed_detailed_config_map = parse_detailed_speaker_config(detailed_speaker_json, parsed_lines)
except ValueError as e:
return None, None, f"Configuration Error: {str(e)}"
tasks, line_audio_files = [], [None] * len(parsed_lines)
for i, line_data in enumerate(parsed_lines):
speaker_name = line_data["speaker"]
speaker_base_cfg = get_config_for_speaker(
speaker_name, speaker_config_method, unique_speakers,
global_voice_selection, parsed_detailed_config_map)
line_voice = speaker_base_cfg["voice"]
effective_speed = global_speed
if speaker_base_cfg.get("speed") is not None:
effective_speed = speaker_base_cfg["speed"]
effective_instructions = global_instructions if global_instructions and global_instructions.strip() else None
if speaker_base_cfg.get("instructions") is not None and speaker_base_cfg["instructions"].strip():
effective_instructions = speaker_base_cfg["instructions"]
output_filename = os.path.join(job_audio_path_prefix, f"line_{line_data['id']}.mp3")
progress(i / len(parsed_lines), desc=f"Synthesizing line {i+1}/{len(parsed_lines)} ({speaker_name} w/ {line_voice})")
tasks.append(synthesize_speech_line(
client=async_openai_client, text=line_data["text"], voice=line_voice,
output_path=output_filename, model=tts_model, speed=effective_speed,
instructions=effective_instructions, nsfw_api_url_template=NSFW_API_URL_TEMPLATE,
line_index=line_data['id']))
synthesis_results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(synthesis_results):
if isinstance(result, Exception): print(f"Error for line {parsed_lines[idx]['id']}: {result}")
elif result is None: print(f"Skipped/failed line {parsed_lines[idx]['id']}")
else: line_audio_files[idx] = result
valid_audio_files = [f for f in line_audio_files if f and os.path.exists(f) and os.path.getsize(f) > 0]
if not valid_audio_files:
shutil.rmtree(job_audio_path_prefix)
return None, None, "Error: No audio files successfully synthesized."
zip_filename = os.path.join(job_audio_path_prefix, "dialogue_lines.zip")
with zipfile.ZipFile(zip_filename, 'w') as zf:
for pth in valid_audio_files: zf.write(pth, os.path.basename(pth))
merged_mp3_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3")
merged_out_path = merge_mp3_files([f for f in line_audio_files if f], merged_mp3_fn, pause_ms)
status = f"Processed {len(valid_audio_files)}/{len(parsed_lines)} lines. "
if len(valid_audio_files) < len(parsed_lines): status += "Some lines failed/skipped. "
if not merged_out_path and len(valid_audio_files) > 0: status += "Merged audio failed. "
elif not merged_out_path and len(valid_audio_files) == 0: status += "No audio generated."
else: status += "Outputs generated."
return (zip_filename if os.path.exists(zip_filename) else None,
merged_out_path if merged_out_path and os.path.exists(merged_out_path) else None,
status)
def handle_calculate_cost(dialogue_script: str, tts_model: str):
if not dialogue_script.strip(): return "Cost: $0.000000 (Empty script)"
try:
parsed, chars = parse_dialogue_script(dialogue_script)
if not parsed: return "Cost: $0.000000 (No valid lines)"
cost = calculate_cost(chars, len(parsed), tts_model)
return f"Est. Cost: ${cost:.6f} ({chars} chars, {len(parsed)} lines)"
except ValueError as e: return f"Error: {str(e)}"
except Exception as e: return f"Cost calc error: {str(e)}"
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Dialogue Script to Speech Converter (Enhanced)")
gr.Markdown("Convert scripts with control over voices, speed, and instructions.")
if not OPENAI_API_KEY or not async_openai_client:
gr.Markdown("<h3 style='color:red;'>Warning: OPENAI_API_KEY not set. Synthesis will fail.</h3>")
with gr.Row():
with gr.Column(scale=2):
script_input = gr.TextArea(label="Dialogue Script", placeholder="[S1] Hi!\n[S2] Hello!", lines=10, info=f"Max {MAX_SCRIPT_LENGTH} chars.")
with gr.Column(scale=1):
tts_model_dropdown = gr.Dropdown(TTS_MODELS_AVAILABLE, label="TTS Model", value=MODEL_DEFAULT, info="Affects controls below.")
pause_input = gr.Number(label="Pause (ms)", value=500, minimum=0, maximum=5000, step=50, info="Between merged lines.")
global_speed_input = gr.Slider(minimum=0.25, maximum=4.0, value=1.0, step=0.05, label="Global Speed (tts-1/tts-1-hd)", visible=(MODEL_DEFAULT in ["tts-1", "tts-1-hd"]), interactive=True)
global_instructions_input = gr.Textbox(label="Global Instructions (gpt-4o-mini-tts)", placeholder="e.g., Speak calmly.", visible=(MODEL_DEFAULT == "gpt-4o-mini-tts"), interactive=True, lines=2)
gr.Markdown("### Speaker Configuration")
with gr.Row():
speaker_config_method_dropdown = gr.Dropdown(SPEAKER_CONFIG_METHODS, label="Speaker Config Method", value=DEFAULT_SPEAKER_CONFIG_METHOD)
global_voice_dropdown = gr.Dropdown(APP_AVAILABLE_VOICES, label="Global Voice ('Single Voice' method)", value=APP_AVAILABLE_VOICES[0], visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Single Voice (Global)"), interactive=True)
initial_json_config_example = """[
{"speaker": "Alice", "voice": "nova", "speed": 1.1, "instructions": "sound excited"},
{"speaker": "Bob", "voice": "echo"},
{"speaker": "Narrator", "voice": "shimmer", "instructions": "be very serious"}
]""".strip()
detailed_speaker_config_input = gr.Code(
label="Detailed Speaker Configuration (JSON)",
language="json",
lines=7,
value=initial_json_config_example,
visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)")
)
# New Markdown for info text, visibility tied to dropdown
detailed_config_info_md = gr.Markdown(
"<small>Define voice per speaker. Optionally, `speed` (0.25-4.0) for `tts-1`/`tts-1-hd` models, "
"and `instructions` (text) for `gpt-4o-mini-tts`.</small>",
visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)")
)
with gr.Row():
calculate_cost_button = gr.Button("Calculate Cost")
generate_button = gr.Button("Generate Audio", variant="primary")
cost_output = gr.Textbox(label="Estimated Cost", interactive=False)
with gr.Row():
individual_lines_zip_output = gr.File(label="Download ZIP")
merged_dialogue_mp3_output = gr.Audio(label="Merged MP3", type="filepath")
status_output = gr.Textbox(label="Status", interactive=False, lines=2)
def update_model_controls(selected_model):
is_tts1 = selected_model in ["tts-1", "tts-1-hd"]
is_gpt_mini = selected_model == "gpt-4o-mini-tts"
return {
global_speed_input: gr.update(visible=is_tts1, interactive=is_tts1),
global_instructions_input: gr.update(visible=is_gpt_mini, interactive=is_gpt_mini)
}
tts_model_dropdown.change(fn=update_model_controls, inputs=[tts_model_dropdown], outputs=[global_speed_input, global_instructions_input])
def update_speaker_controls(method):
is_single = (method == "Single Voice (Global)")
is_detailed = (method == "Detailed Configuration (JSON)")
return {
global_voice_dropdown: gr.update(visible=is_single, interactive=is_single),
detailed_speaker_config_input: gr.update(visible=is_detailed, interactive=is_detailed),
detailed_config_info_md: gr.update(visible=is_detailed) # Control visibility of new Markdown
}
speaker_config_method_dropdown.change(fn=update_speaker_controls, inputs=[speaker_config_method_dropdown], outputs=[global_voice_dropdown, detailed_speaker_config_input, detailed_config_info_md])
calculate_cost_button.click(fn=handle_calculate_cost, inputs=[script_input, tts_model_dropdown], outputs=[cost_output])
generate_button.click(
fn=handle_script_processing,
inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown,
global_voice_dropdown, detailed_speaker_config_input, global_speed_input, global_instructions_input],
outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output])
gr.Markdown("## Examples")
ex_script1 = "[A] Hi B.\n[B] Hey A.\n[A] What's up?"
ex_json1 = '[{"speaker": "A", "voice": "nova", "instr...": "curious"}, {"speaker": "B", "voice": "echo", "speed": 0.9}]'.replace("instr...", "instructions") # Hack for brevity
ex_script2 = "[Cpt] Status?\n[Comp] Nominal. Slow.\n[Cpt] Good."
ex_json2 = '[{"speaker": "Cpt", "voice": "alloy"}, {"speaker": "Comp", "voice": "onyx", "speed": 0.8, "instr...": "robotic"}]'.replace("instr...", "instructions")
gr.Examples(
examples=[
[ex_script1, "gpt-4o-mini-tts", 250, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json1, 1.0, "Speak naturally."],
[ex_script2, "tts-1-hd", 300, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json2, 1.1, ""],
["[N] Single line.", "tts-1", 0, "Single Voice (Global)", "fable", "", 1.2, ""]],
inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown, global_voice_dropdown,
detailed_speaker_config_input, global_speed_input, global_instructions_input],
outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output],
fn=handle_script_processing, cache_examples=False)
if __name__ == "__main__":
if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
demo.launch()