|
import spaces |
|
import gradio as gr |
|
import edge_tts |
|
import asyncio |
|
import tempfile |
|
import os |
|
import re |
|
from pathlib import Path |
|
from pydub import AudioSegment |
|
import librosa |
|
import soundfile as sf |
|
import numpy as np |
|
|
|
|
|
VOICE_MAP = { |
|
"1F": "en-GB-SoniaNeural", |
|
"2M": "en-GB-RyanNeural", |
|
"3M": "en-US-BrianMultilingualNeural", |
|
"2F": "en-US-JennyNeural", |
|
"1M": "en-AU-WilliamNeural", |
|
"3F": "en-HK-YanNeural", |
|
"4M": "en-GB-ThomasNeural", |
|
"4F": "en-US-EmmaNeural", |
|
"1O": "en-GB-RyanNeural", |
|
"1C": "en-GB-MaisieNeural", |
|
"1V": "vi-VN-HoaiMyNeural", |
|
"2V": "vi-VN-NamMinhNeural", |
|
"3V": "vi-VN-HoaiMyNeural", |
|
"4V": "vi-VN-NamMinhNeural", |
|
} |
|
|
|
def get_silence(duration_ms=1000): |
|
"""Creates a silent AudioSegment.""" |
|
return AudioSegment.silent( |
|
duration=duration_ms, |
|
frame_rate=24000, |
|
sample_width=4, |
|
channels=1 |
|
) |
|
|
|
async def get_voices(): |
|
"""Lists available Edge TTS voices.""" |
|
try: |
|
voices = await edge_tts.list_voices() |
|
return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} |
|
except Exception as e: |
|
print(f"Error listing voices: {e}") |
|
return {} |
|
|
|
async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, target_duration_ms=None, speed_adjustment_factor=1.0): |
|
"""Generates audio for a text segment, handling voice prefixes and adjusting rate for duration.""" |
|
processed_text = text_segment.strip() |
|
current_voice_short = default_voice.split(" - ")[0] if default_voice else "" |
|
current_rate = rate |
|
current_pitch = pitch |
|
|
|
for prefix, voice_short in VOICE_MAP.items(): |
|
if processed_text.startswith(prefix): |
|
current_voice_short = voice_short |
|
if prefix in ["1F", "3F", "1V", "3V"]: |
|
current_pitch = 25 |
|
elif prefix in ["1O", "4V"]: |
|
current_pitch = -20 |
|
current_rate = -10 |
|
processed_text = processed_text[len(prefix):].strip() |
|
break |
|
|
|
match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text) |
|
if match and match.group(1) in VOICE_MAP: |
|
pitch_adjustment = int(match.group(2)) |
|
current_pitch += pitch_adjustment |
|
processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip() |
|
elif any(processed_text.startswith(prefix) for prefix in VOICE_MAP): |
|
processed_text = re.sub(r'^[A-Za-z]{1,2}', '', processed_text).lstrip('-').strip() |
|
|
|
if processed_text: |
|
rate_str = f"{current_rate:+d}%" |
|
pitch_str = f"{current_pitch:+d}Hz" |
|
try: |
|
communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: |
|
audio_path = tmp_file.name |
|
await communicate.save(audio_path) |
|
|
|
if target_duration_ms is not None and os.path.exists(audio_path) and target_duration_ms > 0: |
|
audio = AudioSegment.from_mp3(audio_path) |
|
audio_duration_ms = len(audio) |
|
if audio_duration_ms > target_duration_ms: |
|
speed_factor = (audio_duration_ms / target_duration_ms) * speed_adjustment_factor |
|
if speed_factor > 0 and speed_factor >= 1.0: |
|
y, sr = librosa.load(audio_path, sr=None) |
|
y_stretched = librosa.effects.time_stretch(y, rate=speed_factor) |
|
sf.write(audio_path, y_stretched, sr) |
|
return audio_path |
|
except Exception as e: |
|
print(f"Edge TTS error processing '{processed_text}': {e}") |
|
return None |
|
return None |
|
|
|
async def process_transcript_line(line, default_voice, rate, pitch, speed_adjustment_factor): |
|
"""Processes a single transcript line with timestamp and potential voice changes.""" |
|
match = re.match(r'(\d{2}:\d{2}:\d{2},\d{3})\s+-\s+(\d{2}:\d{2}:\d{2},\d{3})\s+(.*)', line) |
|
if match: |
|
start_time_str, end_time_str, text_parts = match.groups() |
|
|
|
def time_str_to_ms(time_str): |
|
h, m, s_ms = time_str.split(':') |
|
s, ms = s_ms.split(',') |
|
return int(h) * 3600000 + int(m) * 60000 + int(s) * 1000 + int(ms) |
|
|
|
start_time_ms = time_str_to_ms(start_time_str) |
|
end_time_ms = time_str_to_ms(end_time_str) |
|
duration_ms = end_time_ms - start_time_ms |
|
|
|
audio_segments = [] |
|
parts = re.split(r'([“”"])', text_parts) |
|
in_quote = False |
|
for part in parts: |
|
if part == '"': |
|
in_quote = not in_quote |
|
continue |
|
if part.strip(): |
|
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, duration_ms, speed_adjustment_factor if in_quote else 1.0) |
|
if audio_path: |
|
audio_segments.append(audio_path) |
|
return start_time_ms, audio_segments, duration_ms |
|
return None, None, None |
|
|
|
async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjustment_factor): |
|
"""Converts a timestamped transcript with voice changes to a single audio file.""" |
|
if not transcript_text.strip(): |
|
return None, gr.Warning("Please enter transcript text.") |
|
if not voice: |
|
return None, gr.Warning("Please select a voice.") |
|
|
|
lines = transcript_text.strip().split('\n') |
|
timed_audio_segments = [] |
|
max_end_time_ms = 0 |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
for line in lines: |
|
start_time, audio_paths, duration = await process_transcript_line(line, voice, rate, pitch, speed_adjustment_factor) |
|
if start_time is not None and audio_paths: |
|
combined_line_audio = AudioSegment.empty() |
|
for path in audio_paths: |
|
if path and os.path.exists(path): |
|
try: |
|
audio = AudioSegment.from_mp3(path) |
|
combined_line_audio += audio |
|
except FileNotFoundError: |
|
print(f"Warning: Audio file not found: {path}") |
|
finally: |
|
try: |
|
os.remove(path) |
|
except OSError: |
|
print(f"Warning: Could not remove temporary file: {path}") |
|
if combined_line_audio: |
|
timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio}) |
|
max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio)) |
|
elif audio_paths: |
|
for path in audio_paths: |
|
if path: |
|
try: |
|
os.remove(path) |
|
except FileNotFoundError: |
|
pass |
|
|
|
if not timed_audio_segments: |
|
return None, "No processable audio segments found." |
|
|
|
final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000) |
|
for segment in timed_audio_segments: |
|
final_audio = final_audio.overlay(segment['audio'], position=segment['start']) |
|
|
|
combined_audio_path = Path(tmpdir) / "combined_audio.mp3" |
|
final_audio.export(str(combined_audio_path), format="mp3") |
|
return str(combined_audio_path), None |
|
|
|
@spaces.GPU |
|
def tts_interface(transcript, voice, rate, pitch, speed_adjustment_factor): |
|
"""Gradio interface function for TTS.""" |
|
audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch, speed_adjustment_factor)) |
|
return audio, warning |
|
|
|
async def create_demo(): |
|
"""Creates the Gradio demo interface.""" |
|
voices = await get_voices() |
|
default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)" |
|
description = """ |
|
Process timestamped text (HH:MM:SS,milliseconds - HH:MM:SS,milliseconds) with voice changes within quotes. |
|
The duration specified in the timestamp will be used to adjust the speech rate so the generated audio fits within that time. |
|
You can control the intensity of the speed adjustment using the "Speed Adjustment Factor" slider. |
|
Format: `HH:MM:SS,milliseconds - HH:MM:SS,milliseconds "VoicePrefix Text" more text "AnotherVoicePrefix More Text"` |
|
Example: |
|
``` |
|
00:00:00,000 - 00:00:05,000 "This is the default voice." more default. "1F Now a female voice." and back to default. |
|
00:00:05,500 - 00:00:10,250 "1C Yes," said the child, "it is fun!" |
|
``` |
|
*************************************************************************************************** |
|
1M = en-AU-WilliamNeural - en-AU (Male) |
|
1F = en-GB-SoniaNeural - en-GB (Female) |
|
2M = en-GB-RyanNeural - en-GB (Male) |
|
2F = en-US-JennyNeural - en-US (Female) |
|
3M = en-US-BrianMultilingualNeural - en-US (Male) |
|
3F = en-HK-YanNeural - en-HK (Female) |
|
4M = en-GB-ThomasNeural - en-GB (Male) |
|
4F = en-US-EmmaNeural - en-US (Female) |
|
1O = en-GB-RyanNeural - en-GB (Male) # Old Man |
|
1C = en-GB-MaisieNeural - en-GB (Female) # Child |
|
1V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) |
|
2V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) |
|
3V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) |
|
4V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) |
|
**************************************************************************************************** |
|
""" |
|
demo = gr.Interface( |
|
fn=tts_interface, |
|
inputs=[ |
|
gr.Textbox(label="Timestamped Text with Voice Changes and Duration", lines=10, placeholder='00:00:00,000 - 00:00:05,000 "Text" more text "1F Different Voice"'), |
|
gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice), |
|
gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1), |
|
gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1), |
|
gr.Slider(minimum=0.5, maximum=1.5, value=1.0, step=0.05, label="Speed Adjustment Factor") |
|
], |
|
outputs=[ |
|
gr.Audio(label="Generated Audio", type="filepath"), |
|
gr.Markdown(label="Warning", visible=False) |
|
], |
|
title="TTS with Duration-Aware Speed Adjustment and In-Quote Voice Switching", |
|
description=description, |
|
analytics_enabled=False, |
|
allow_flagging=False |
|
) |
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = asyncio.run(create_demo()) |
|
demo.launch() |