Spaces:
Running
Running
File size: 18,031 Bytes
1190db4 5c85d81 1190db4 354d940 1190db4 5c85d81 1190db4 5c85d81 1190db4 5c85d81 354d940 5c85d81 1190db4 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 1190db4 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 1190db4 354d940 5c85d81 354d940 1190db4 354d940 1190db4 5c85d81 1190db4 5c85d81 1190db4 354d940 1190db4 5c85d81 354d940 5c85d81 1190db4 354d940 5c85d81 354d940 5c85d81 354d940 1190db4 354d940 1190db4 5c85d81 354d940 1190db4 354d940 1190db4 354d940 1190db4 354d940 1190db4 354d940 1190db4 354d940 1190db4 5c85d81 354d940 1190db4 354d940 1190db4 354d940 1190db4 354d940 1190db4 5c85d81 354d940 5c85d81 354d940 7a4548f 354d940 7a4548f 5c85d81 354d940 5c85d81 1190db4 354d940 1190db4 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 5c85d81 354d940 1190db4 354d940 1190db4 354d940 1190db4 354d940 5c85d81 354d940 1190db4 354d940 1190db4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
import gradio as gr
import os
import asyncio
import tempfile
import shutil
import zipfile
import random
import json
from openai import AsyncOpenAI
from utils.script_parser import parse_dialogue_script, calculate_cost, MAX_SCRIPT_LENGTH
from utils.openai_tts import synthesize_speech_line, OPENAI_VOICES as ALL_TTS_VOICES
from utils.merge_audio import merge_mp3_files
# --- Configuration ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
NSFW_API_URL_TEMPLATE = os.getenv("NSFW_API_URL_TEMPLATE")
MODEL_DEFAULT = os.getenv("MODEL_DEFAULT", "tts-1-hd")
if not OPENAI_API_KEY:
try:
from huggingface_hub import HfApi
api = HfApi()
space_id = os.getenv("SPACE_ID")
if space_id:
secrets = api.get_space_secrets(repo_id=space_id)
OPENAI_API_KEY = secrets.get("OPENAI_API_KEY")
NSFW_API_URL_TEMPLATE = secrets.get("NSFW_API_URL_TEMPLATE", NSFW_API_URL_TEMPLATE)
MODEL_DEFAULT = secrets.get("MODEL_DEFAULT", MODEL_DEFAULT)
except Exception as e:
print(f"Could not retrieve secrets from Hugging Face Hub: {e}")
async_openai_client = None
if OPENAI_API_KEY:
async_openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
else:
print("ERROR: OPENAI_API_KEY secret is not set. The application will not function properly.")
TTS_MODELS_AVAILABLE = ["tts-1", "tts-1-hd", "gpt-4o-mini-tts"]
if MODEL_DEFAULT not in TTS_MODELS_AVAILABLE:
MODEL_DEFAULT = "tts-1-hd"
SPEAKER_CONFIG_METHODS = [
"Single Voice (Global)",
"Random per Speaker",
"A/B Round Robin",
"Detailed Configuration (JSON)"
]
DEFAULT_SPEAKER_CONFIG_METHOD = "Random per Speaker"
APP_AVAILABLE_VOICES = ALL_TTS_VOICES.copy()
_speaker_config_cache = {}
def parse_detailed_speaker_config(json_text, parsed_script_lines):
config_map = {}
default_voice = APP_AVAILABLE_VOICES[0]
try:
if not json_text.strip():
return {}
config_list = json.loads(json_text)
if not isinstance(config_list, list):
raise ValueError("JSON config must be a list of speaker objects.")
for item in config_list:
if not isinstance(item, dict) or "speaker" not in item or "voice" not in item:
print(f"Skipping malformed item in JSON config: {item}")
continue
if item["voice"] not in APP_AVAILABLE_VOICES:
print(f"Warning: Voice '{item['voice']}' for speaker '{item['speaker']}' not recognized. Falling back to '{default_voice}'.")
item["voice"] = default_voice
if "speed" in item:
try:
item["speed"] = float(item["speed"])
if not (0.25 <= item["speed"] <= 4.0):
print(f"Warning: Speed for speaker '{item['speaker']}' out of range (0.25-4.0). Clamping.")
item["speed"] = max(0.25, min(item["speed"], 4.0))
except ValueError:
print(f"Warning: Invalid speed value for speaker '{item['speaker']}'. Using default.")
item.pop("speed", None)
config_map[item["speaker"]] = {
"voice": item["voice"],
"speed": item.get("speed"),
"instructions": item.get("instructions")
}
return config_map
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in Detailed Speaker Configuration: {e}")
except ValueError as e:
raise e
except Exception as e:
raise ValueError(f"Error parsing Detailed Speaker Configuration: {e}")
def get_config_for_speaker(speaker_name, speaker_config_method, unique_script_speakers,
global_selected_voice, detailed_config_map):
global _speaker_config_cache
if _speaker_config_cache.get("__method") != speaker_config_method or \
_speaker_config_cache.get("__speakers_set") != frozenset(unique_script_speakers):
_speaker_config_cache = {"__method": speaker_config_method, "__speakers_set": frozenset(unique_script_speakers)}
base_config = {"voice": APP_AVAILABLE_VOICES[0], "speed": None, "instructions": None}
if speaker_config_method == "Single Voice (Global)":
base_config["voice"] = global_selected_voice if global_selected_voice in APP_AVAILABLE_VOICES else APP_AVAILABLE_VOICES[0]
return base_config
if speaker_config_method == "Detailed Configuration (JSON)":
if speaker_name in detailed_config_map:
speaker_specific = detailed_config_map[speaker_name]
return {
"voice": speaker_specific.get("voice", base_config["voice"]),
"speed": speaker_specific.get("speed"),
"instructions": speaker_specific.get("instructions")
}
else:
print(f"Warning: Speaker '{speaker_name}' not found in Detailed JSON. Using default voice '{base_config['voice']}'.")
return base_config
if speaker_name not in _speaker_config_cache:
if speaker_config_method == "Random per Speaker":
available_voices_shuffled = random.sample(APP_AVAILABLE_VOICES, len(APP_AVAILABLE_VOICES))
if not _speaker_config_cache.get("__all_assigned_random"):
for i, spk_unique in enumerate(unique_script_speakers):
if spk_unique not in _speaker_config_cache:
_speaker_config_cache[spk_unique] = {"voice": available_voices_shuffled[i % len(available_voices_shuffled)]}
_speaker_config_cache["__all_assigned_random"] = True
if speaker_name not in _speaker_config_cache:
_speaker_config_cache[speaker_name] = {"voice": random.choice(APP_AVAILABLE_VOICES)}
elif speaker_config_method == "A/B Round Robin":
if not _speaker_config_cache.get("__all_assigned_ab"):
for i, spk_unique in enumerate(unique_script_speakers):
if spk_unique not in _speaker_config_cache:
_speaker_config_cache[spk_unique] = {"voice": APP_AVAILABLE_VOICES[i % len(APP_AVAILABLE_VOICES)]}
_speaker_config_cache["__all_assigned_ab"] = True
if speaker_name not in _speaker_config_cache:
speaker_idx = unique_script_speakers.index(speaker_name) if speaker_name in unique_script_speakers else 0
_speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[speaker_idx % len(APP_AVAILABLE_VOICES)]}
else:
_speaker_config_cache[speaker_name] = {"voice": APP_AVAILABLE_VOICES[0]}
cached_entry = _speaker_config_cache.get(speaker_name, base_config.copy())
return {"voice": cached_entry.get("voice", base_config["voice"]), "speed": None, "instructions": None}
async def handle_script_processing(
dialogue_script: str, tts_model: str, pause_ms: int,
speaker_config_method: str, global_voice_selection: str,
detailed_speaker_json: str, global_speed: float,
global_instructions: str, progress=gr.Progress(track_tqdm=True)):
global _speaker_config_cache
_speaker_config_cache = {}
if not OPENAI_API_KEY or not async_openai_client:
return None, None, "Error: OPENAI_API_KEY is not configured."
if not dialogue_script.strip():
return None, None, "Error: Script is empty."
job_audio_path_prefix = os.path.join(tempfile.gettempdir(), "current_job_audio")
if os.path.exists(job_audio_path_prefix):
shutil.rmtree(job_audio_path_prefix)
os.makedirs(job_audio_path_prefix, exist_ok=True)
try:
parsed_lines, total_chars = parse_dialogue_script(dialogue_script)
if not parsed_lines:
return None, None, "Error: No valid dialogue lines found."
except ValueError as e:
return None, None, f"Script parsing error: {str(e)}"
unique_speakers = sorted(list(set(p["speaker"] for p in parsed_lines)))
parsed_detailed_config_map = {}
if speaker_config_method == "Detailed Configuration (JSON)":
try:
parsed_detailed_config_map = parse_detailed_speaker_config(detailed_speaker_json, parsed_lines)
except ValueError as e:
return None, None, f"Configuration Error: {str(e)}"
tasks, line_audio_files = [], [None] * len(parsed_lines)
for i, line_data in enumerate(parsed_lines):
speaker_name = line_data["speaker"]
speaker_base_cfg = get_config_for_speaker(
speaker_name, speaker_config_method, unique_speakers,
global_voice_selection, parsed_detailed_config_map)
line_voice = speaker_base_cfg["voice"]
effective_speed = global_speed
if speaker_base_cfg.get("speed") is not None:
effective_speed = speaker_base_cfg["speed"]
effective_instructions = global_instructions if global_instructions and global_instructions.strip() else None
if speaker_base_cfg.get("instructions") is not None and speaker_base_cfg["instructions"].strip():
effective_instructions = speaker_base_cfg["instructions"]
output_filename = os.path.join(job_audio_path_prefix, f"line_{line_data['id']}.mp3")
progress(i / len(parsed_lines), desc=f"Synthesizing line {i+1}/{len(parsed_lines)} ({speaker_name} w/ {line_voice})")
tasks.append(synthesize_speech_line(
client=async_openai_client, text=line_data["text"], voice=line_voice,
output_path=output_filename, model=tts_model, speed=effective_speed,
instructions=effective_instructions, nsfw_api_url_template=NSFW_API_URL_TEMPLATE,
line_index=line_data['id']))
synthesis_results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(synthesis_results):
if isinstance(result, Exception): print(f"Error for line {parsed_lines[idx]['id']}: {result}")
elif result is None: print(f"Skipped/failed line {parsed_lines[idx]['id']}")
else: line_audio_files[idx] = result
valid_audio_files = [f for f in line_audio_files if f and os.path.exists(f) and os.path.getsize(f) > 0]
if not valid_audio_files:
shutil.rmtree(job_audio_path_prefix)
return None, None, "Error: No audio files successfully synthesized."
zip_filename = os.path.join(job_audio_path_prefix, "dialogue_lines.zip")
with zipfile.ZipFile(zip_filename, 'w') as zf:
for pth in valid_audio_files: zf.write(pth, os.path.basename(pth))
merged_mp3_fn = os.path.join(job_audio_path_prefix, "merged_dialogue.mp3")
merged_out_path = merge_mp3_files([f for f in line_audio_files if f], merged_mp3_fn, pause_ms)
status = f"Processed {len(valid_audio_files)}/{len(parsed_lines)} lines. "
if len(valid_audio_files) < len(parsed_lines): status += "Some lines failed/skipped. "
if not merged_out_path and len(valid_audio_files) > 0: status += "Merged audio failed. "
elif not merged_out_path and len(valid_audio_files) == 0: status += "No audio generated."
else: status += "Outputs generated."
return (zip_filename if os.path.exists(zip_filename) else None,
merged_out_path if merged_out_path and os.path.exists(merged_out_path) else None,
status)
def handle_calculate_cost(dialogue_script: str, tts_model: str):
if not dialogue_script.strip(): return "Cost: $0.000000 (Empty script)"
try:
parsed, chars = parse_dialogue_script(dialogue_script)
if not parsed: return "Cost: $0.000000 (No valid lines)"
cost = calculate_cost(chars, len(parsed), tts_model)
return f"Est. Cost: ${cost:.6f} ({chars} chars, {len(parsed)} lines)"
except ValueError as e: return f"Error: {str(e)}"
except Exception as e: return f"Cost calc error: {str(e)}"
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Dialogue Script to Speech Converter (Enhanced)")
gr.Markdown("Convert scripts with control over voices, speed, and instructions.")
if not OPENAI_API_KEY or not async_openai_client:
gr.Markdown("<h3 style='color:red;'>Warning: OPENAI_API_KEY not set. Synthesis will fail.</h3>")
with gr.Row():
with gr.Column(scale=2):
script_input = gr.TextArea(label="Dialogue Script", placeholder="[S1] Hi!\n[S2] Hello!", lines=10, info=f"Max {MAX_SCRIPT_LENGTH} chars.")
with gr.Column(scale=1):
tts_model_dropdown = gr.Dropdown(TTS_MODELS_AVAILABLE, label="TTS Model", value=MODEL_DEFAULT, info="Affects controls below.")
pause_input = gr.Number(label="Pause (ms)", value=500, minimum=0, maximum=5000, step=50, info="Between merged lines.")
global_speed_input = gr.Slider(minimum=0.25, maximum=4.0, value=1.0, step=0.05, label="Global Speed (tts-1/tts-1-hd)", visible=(MODEL_DEFAULT in ["tts-1", "tts-1-hd"]), interactive=True)
global_instructions_input = gr.Textbox(label="Global Instructions (gpt-4o-mini-tts)", placeholder="e.g., Speak calmly.", visible=(MODEL_DEFAULT == "gpt-4o-mini-tts"), interactive=True, lines=2)
gr.Markdown("### Speaker Configuration")
with gr.Row():
speaker_config_method_dropdown = gr.Dropdown(SPEAKER_CONFIG_METHODS, label="Speaker Config Method", value=DEFAULT_SPEAKER_CONFIG_METHOD)
global_voice_dropdown = gr.Dropdown(APP_AVAILABLE_VOICES, label="Global Voice ('Single Voice' method)", value=APP_AVAILABLE_VOICES[0], visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Single Voice (Global)"), interactive=True)
initial_json_config_example = """[
{"speaker": "Alice", "voice": "nova", "speed": 1.1, "instructions": "sound excited"},
{"speaker": "Bob", "voice": "echo"},
{"speaker": "Narrator", "voice": "shimmer", "instructions": "be very serious"}
]""".strip()
detailed_speaker_config_input = gr.Code(
label="Detailed Speaker Configuration (JSON)",
language="json",
lines=7,
value=initial_json_config_example,
visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)")
)
# New Markdown for info text, visibility tied to dropdown
detailed_config_info_md = gr.Markdown(
"<small>Define voice per speaker. Optionally, `speed` (0.25-4.0) for `tts-1`/`tts-1-hd` models, "
"and `instructions` (text) for `gpt-4o-mini-tts`.</small>",
visible=(DEFAULT_SPEAKER_CONFIG_METHOD == "Detailed Configuration (JSON)")
)
with gr.Row():
calculate_cost_button = gr.Button("Calculate Cost")
generate_button = gr.Button("Generate Audio", variant="primary")
cost_output = gr.Textbox(label="Estimated Cost", interactive=False)
with gr.Row():
individual_lines_zip_output = gr.File(label="Download ZIP")
merged_dialogue_mp3_output = gr.Audio(label="Merged MP3", type="filepath")
status_output = gr.Textbox(label="Status", interactive=False, lines=2)
def update_model_controls(selected_model):
is_tts1 = selected_model in ["tts-1", "tts-1-hd"]
is_gpt_mini = selected_model == "gpt-4o-mini-tts"
return {
global_speed_input: gr.update(visible=is_tts1, interactive=is_tts1),
global_instructions_input: gr.update(visible=is_gpt_mini, interactive=is_gpt_mini)
}
tts_model_dropdown.change(fn=update_model_controls, inputs=[tts_model_dropdown], outputs=[global_speed_input, global_instructions_input])
def update_speaker_controls(method):
is_single = (method == "Single Voice (Global)")
is_detailed = (method == "Detailed Configuration (JSON)")
return {
global_voice_dropdown: gr.update(visible=is_single, interactive=is_single),
detailed_speaker_config_input: gr.update(visible=is_detailed, interactive=is_detailed),
detailed_config_info_md: gr.update(visible=is_detailed) # Control visibility of new Markdown
}
speaker_config_method_dropdown.change(fn=update_speaker_controls, inputs=[speaker_config_method_dropdown], outputs=[global_voice_dropdown, detailed_speaker_config_input, detailed_config_info_md])
calculate_cost_button.click(fn=handle_calculate_cost, inputs=[script_input, tts_model_dropdown], outputs=[cost_output])
generate_button.click(
fn=handle_script_processing,
inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown,
global_voice_dropdown, detailed_speaker_config_input, global_speed_input, global_instructions_input],
outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output])
gr.Markdown("## Examples")
ex_script1 = "[A] Hi B.\n[B] Hey A.\n[A] What's up?"
ex_json1 = '[{"speaker": "A", "voice": "nova", "instr...": "curious"}, {"speaker": "B", "voice": "echo", "speed": 0.9}]'.replace("instr...", "instructions") # Hack for brevity
ex_script2 = "[Cpt] Status?\n[Comp] Nominal. Slow.\n[Cpt] Good."
ex_json2 = '[{"speaker": "Cpt", "voice": "alloy"}, {"speaker": "Comp", "voice": "onyx", "speed": 0.8, "instr...": "robotic"}]'.replace("instr...", "instructions")
gr.Examples(
examples=[
[ex_script1, "gpt-4o-mini-tts", 250, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json1, 1.0, "Speak naturally."],
[ex_script2, "tts-1-hd", 300, "Detailed Configuration (JSON)", APP_AVAILABLE_VOICES[0], ex_json2, 1.1, ""],
["[N] Single line.", "tts-1", 0, "Single Voice (Global)", "fable", "", 1.2, ""]],
inputs=[script_input, tts_model_dropdown, pause_input, speaker_config_method_dropdown, global_voice_dropdown,
detailed_speaker_config_input, global_speed_input, global_instructions_input],
outputs=[individual_lines_zip_output, merged_dialogue_mp3_output, status_output],
fn=handle_script_processing, cache_examples=False)
if __name__ == "__main__":
if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
demo.launch() |