walidadebayo's picture
Add multi-speaker support and interface for text-to-speech conversion
4f2415a
import gradio as gr
import edge_tts
import asyncio
import tempfile
import os
import json
import datetime
import re
import io
async def get_voices():
voices = await edge_tts.list_voices()
return {
f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v["ShortName"]
for v in voices
}
def format_time(milliseconds):
"""Convert milliseconds to SRT time format (HH:MM:SS,mmm)"""
# Ensure milliseconds is an integer
milliseconds = int(milliseconds)
seconds, milliseconds = divmod(milliseconds, 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def time_to_ms(time_str):
"""Convert SRT time format (HH:MM:SS,mmm) to milliseconds"""
hours, minutes, rest = time_str.split(':')
seconds, milliseconds = rest.split(',')
return int(hours) * 3600000 + int(minutes) * 60000 + int(seconds) * 1000 + int(milliseconds)
def parse_srt_content(content):
"""Parse SRT file content and extract text and timing data"""
lines = content.split('\n')
timing_data = []
text_only = []
i = 0
while i < len(lines):
if not lines[i].strip():
i += 1
continue
# Check if this is a subtitle number line
if lines[i].strip().isdigit():
subtitle_num = int(lines[i].strip())
i += 1
if i >= len(lines):
break
# Parse timestamp line
timestamp_match = re.search(r'(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})', lines[i])
if timestamp_match:
start_time = timestamp_match.group(1)
end_time = timestamp_match.group(2)
# Convert to milliseconds
start_ms = time_to_ms(start_time)
end_ms = time_to_ms(end_time)
i += 1
subtitle_text = ""
# Collect all text lines until empty line or end of file
while i < len(lines) and lines[i].strip():
subtitle_text += lines[i] + " "
i += 1
subtitle_text = subtitle_text.strip()
text_only.append(subtitle_text)
timing_data.append({
'text': subtitle_text,
'start': start_ms,
'end': end_ms
})
else:
i += 1
return " ".join(text_only), timing_data
async def process_uploaded_file(file):
"""Process uploaded file and detect if it's SRT or plain text"""
if file is None:
return None, None, False, None
try:
file_path = file.name if hasattr(file, 'name') else file
file_extension = os.path.splitext(file_path)[1].lower()
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Check if it's an SRT file
is_subtitle = False
timing_data = None
if file_extension == '.srt' or re.search(r'^\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}', content, re.MULTILINE):
is_subtitle = True
text_content, timing_data = parse_srt_content(content)
# Return original content for display
return text_content, timing_data, is_subtitle, content
else:
# Treat as plain text
text_content = content
return text_content, timing_data, is_subtitle, content
except Exception as e:
return f"Error processing file: {str(e)}", None, False, None
async def update_text_from_file(file):
"""Callback function to update text area when file is uploaded"""
if file is None:
return "", None
text_content, timing_data, is_subtitle, original_content = await process_uploaded_file(file)
if original_content is not None:
# Return the original content to preserve formatting
return original_content, None
return "", gr.Warning("Failed to process the file")
async def text_to_speech(text, voice, rate, pitch, generate_subtitles=False, uploaded_file=None):
"""Convert text to speech, handling both direct text input and uploaded files"""
if not text.strip() and uploaded_file is None:
return None, None, "Please enter text or upload a file to convert."
if not voice:
return None, None, "Please select a voice."
# First, determine if the text is SRT format
is_srt_format = bool(re.search(r'^\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}', text, re.MULTILINE))
# If the text is in SRT format, parse it directly
if is_srt_format:
text_content, timing_data = parse_srt_content(text)
is_subtitle = True
else:
# Process uploaded file if provided
timing_data = None
is_subtitle = False
if uploaded_file is not None:
file_text, file_timing_data, file_is_subtitle, _ = await process_uploaded_file(uploaded_file)
if isinstance(file_text, str) and file_text.strip():
if file_is_subtitle:
text = file_text
timing_data = file_timing_data
is_subtitle = file_is_subtitle
voice_short_name = voice.split(" - ")[0]
rate_str = f"{rate:+d}%"
pitch_str = f"{pitch:+d}Hz"
# Create temporary file for audio
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
audio_path = tmp_file.name
subtitle_path = None
# Handle SRT-formatted text or subtitle files differently for audio generation
if is_srt_format or (is_subtitle and timing_data):
# Create separate audio files for each subtitle entry and then combine them
with tempfile.TemporaryDirectory() as temp_dir:
audio_segments = []
max_end_time = 0
# If we don't have timing data but have SRT format text, parse it
if not timing_data and is_srt_format:
_, timing_data = parse_srt_content(text)
# Process each subtitle entry separately
for i, entry in enumerate(timing_data):
segment_text = entry['text']
start_time = entry['start']
end_time = entry['end']
max_end_time = max(max_end_time, end_time)
# Create temporary file for this segment
segment_file = os.path.join(temp_dir, f"segment_{i}.mp3")
# Generate audio for this segment
communicate = edge_tts.Communicate(segment_text, voice_short_name, rate=rate_str, pitch=pitch_str)
await communicate.save(segment_file)
audio_segments.append({
'file': segment_file,
'start': start_time,
'end': end_time,
'text': segment_text
})
# Combine audio segments with proper timing
import wave
import audioop
from pydub import AudioSegment
# Initialize final audio
final_audio = AudioSegment.silent(duration=max_end_time + 1000) # Add 1 second buffer
# Add each segment at its proper time
for segment in audio_segments:
segment_audio = AudioSegment.from_file(segment['file'])
final_audio = final_audio.overlay(segment_audio, position=segment['start'])
# Export the combined audio
final_audio.export(audio_path, format="mp3")
# Generate subtitles if requested
if generate_subtitles:
with tempfile.NamedTemporaryFile(delete=False, suffix=".srt") as srt_file:
subtitle_path = srt_file.name
with open(subtitle_path, "w", encoding="utf-8") as f:
for i, entry in enumerate(timing_data):
f.write(f"{i+1}\n")
f.write(f"{format_time(entry['start'])} --> {format_time(entry['end'])}\n")
f.write(f"{entry['text']}\n\n")
else:
# Use the existing approach for regular text
communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
if not generate_subtitles:
await communicate.save(audio_path)
if generate_subtitles:
with tempfile.NamedTemporaryFile(delete=False, suffix=".srt") as srt_file:
subtitle_path = srt_file.name
# Generate audio and collect word boundary data
async def process_audio():
word_boundaries = []
async for chunk in communicate.stream():
if chunk["type"] == "audio":
with open(audio_path, "ab") as audio_file:
audio_file.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
word_boundaries.append(chunk)
return word_boundaries
word_boundaries = await process_audio()
# Group words into sensible phrases/sentences for subtitles
phrases = []
current_phrase = []
current_text = ""
phrase_start = 0
for i, boundary in enumerate(word_boundaries):
word = boundary["text"]
start_time = boundary["offset"] / 10000
duration = boundary["duration"] / 10000
end_time = start_time + duration
if not current_phrase:
phrase_start = start_time
current_phrase.append(boundary)
if word in ['.', ',', '!', '?', ':', ';'] or word.startswith(('.', ',', '!', '?', ':', ';')):
current_text = current_text.rstrip() + word + " "
else:
current_text += word + " "
# Determine if we should end this phrase and start a new one
should_break = False
# Break on punctuation
if word.endswith(('.', '!', '?', ':', ';', ',')) or i == len(word_boundaries) - 1:
should_break = True
# Break after a certain number of words (4-5 is typical for subtitles)
elif len(current_phrase) >= 5:
should_break = True
# Break on long pause (more than 300ms between words)
elif i < len(word_boundaries) - 1:
next_start = word_boundaries[i + 1]["offset"] / 10000
if next_start - end_time > 300:
should_break = True
if should_break or i == len(word_boundaries) - 1:
if current_phrase:
last_boundary = current_phrase[-1]
phrase_end = (last_boundary["offset"] + last_boundary["duration"]) / 10000
phrases.append({
"text": current_text.strip(),
"start": phrase_start,
"end": phrase_end
})
current_phrase = []
current_text = ""
# Write phrases to SRT file
with open(subtitle_path, "w", encoding="utf-8") as srt_file:
for i, phrase in enumerate(phrases):
# Write SRT entry
srt_file.write(f"{i+1}\n")
srt_file.write(f"{format_time(phrase['start'])} --> {format_time(phrase['end'])}\n")
srt_file.write(f"{phrase['text']}\n\n")
return audio_path, subtitle_path, None
async def tts_interface(text, voice, rate, pitch, generate_subtitles, uploaded_file=None):
audio, subtitle, warning = await text_to_speech(text, voice, rate, pitch, generate_subtitles, uploaded_file)
if warning:
return audio, subtitle, gr.Warning(warning)
return audio, subtitle, None
async def parse_multi_speaker_text(text):
"""Parse text containing speaker designations like 'Speaker1: Hello'"""
lines = text.split('\n')
speaker_segments = []
current_speaker = None
current_text = []
speaker_pattern = re.compile(r'^(Speaker\s*\d+|S\d+)\s*:\s*(.*)$', re.IGNORECASE)
for line in lines:
match = speaker_pattern.match(line.strip())
if match:
# If collecting text for a previous speaker, save it
if current_speaker and current_text:
speaker_segments.append({
'speaker': current_speaker,
'text': ' '.join(current_text).strip()
})
current_text = []
# Set the new current speaker and start collecting their text
current_speaker = match.group(1).strip()
if match.group(2).strip(): # If there's text after the speaker designation
current_text.append(match.group(2).strip())
elif line.strip() and current_speaker: # Continue with the current speaker
current_text.append(line.strip())
# Add the last speaker's text if any
if current_speaker and current_text:
speaker_segments.append({
'speaker': current_speaker,
'text': ' '.join(current_text).strip()
})
return speaker_segments
async def multi_speaker_tts(text, speaker_settings, generate_subtitles=False):
"""Process multi-speaker text and generate audio with different voices and settings"""
if not text.strip():
return None, None, "Please enter text to convert."
# Parse the multi-speaker text
speaker_segments = await parse_multi_speaker_text(text)
if not speaker_segments:
return None, None, "No valid speaker segments found in the text."
# Create temporary file for final audio
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
final_audio_path = tmp_file.name
subtitle_path = None
if generate_subtitles:
with tempfile.NamedTemporaryFile(delete=False, suffix=".srt") as srt_file:
subtitle_path = srt_file.name
# Process each speaker segment with the corresponding voice
with tempfile.TemporaryDirectory() as temp_dir:
audio_segments = []
subtitle_entries = []
current_offset = 0 # Track the time offset in milliseconds
for i, segment in enumerate(speaker_segments):
speaker = segment['speaker']
text = segment['text']
# Get the voice for this speaker
speaker_num = int(re.search(r'\d+', speaker).group()) if re.search(r'\d+', speaker) else 1
speaker_idx = min(speaker_num - 1, len(speaker_settings) - 1) # Ensure we don't go out of bounds
if speaker_idx < 0 or speaker_idx >= len(speaker_settings) or not speaker_settings[speaker_idx]['voice']:
return None, None, f"No voice selected for {speaker}."
# Get voice, rate, and pitch for this speaker
voice_short_name = speaker_settings[speaker_idx]['voice'].split(" - ")[0]
rate_str = f"{speaker_settings[speaker_idx]['rate']:+d}%"
pitch_str = f"{speaker_settings[speaker_idx]['pitch']:+d}Hz"
# Create temporary file for this segment
segment_file = os.path.join(temp_dir, f"segment_{i}.mp3")
# Generate audio for this segment with speaker-specific settings
communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
# For subtitle generation, we need word boundaries
if generate_subtitles:
word_boundaries = []
async for chunk in communicate.stream():
if chunk["type"] == "audio":
with open(segment_file, "ab") as audio_file:
audio_file.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
# Adjust offset to account for previous segments
adjusted_chunk = chunk.copy()
adjusted_chunk["offset"] += current_offset * 10000 # Convert ms to 100ns units
word_boundaries.append(adjusted_chunk)
# Process word boundaries for subtitles
if word_boundaries:
# Group words into phrases for subtitles
phrases = []
current_phrase = []
current_text = ""
phrase_start = 0
for j, boundary in enumerate(word_boundaries):
word = boundary["text"]
start_time = boundary["offset"] / 10000
duration = boundary["duration"] / 10000
end_time = start_time + duration
if not current_phrase:
phrase_start = start_time
current_phrase.append(boundary)
if word in ['.', ',', '!', '?', ':', ';'] or word.startswith(('.', ',', '!', '?', ':', ';')):
current_text = current_text.rstrip() + word + " "
else:
current_text += word + " "
# Determine if we should end this phrase
should_break = False
if word.endswith(('.', '!', '?', ':', ';', ',')) or j == len(word_boundaries) - 1:
should_break = True
elif len(current_phrase) >= 5:
should_break = True
elif j < len(word_boundaries) - 1:
next_start = word_boundaries[j + 1]["offset"] / 10000
if next_start - end_time > 300:
should_break = True
if should_break or j == len(word_boundaries) - 1:
if current_phrase:
last_boundary = current_phrase[-1]
phrase_end = (last_boundary["offset"] + last_boundary["duration"]) / 10000
phrases.append({
"text": f"[{speaker}] {current_text.strip()}",
"start": phrase_start,
"end": phrase_end
})
subtitle_entries.extend(phrases)
current_phrase = []
current_text = ""
else:
# Simple audio generation without subtitles
await communicate.save(segment_file)
# Get duration of the generated audio
from pydub import AudioSegment
audio = AudioSegment.from_file(segment_file)
duration = len(audio)
audio_segments.append({
'file': segment_file,
'duration': duration
})
# Update the current offset for the next segment
current_offset += duration
# Combine all audio segments
from pydub import AudioSegment
combined = AudioSegment.empty()
for segment in audio_segments:
audio = AudioSegment.from_file(segment['file'])
combined += audio
combined.export(final_audio_path, format="mp3")
# Generate subtitles file if requested
if generate_subtitles and subtitle_path:
with open(subtitle_path, "w", encoding="utf-8") as f:
for i, entry in enumerate(subtitle_entries):
f.write(f"{i+1}\n")
f.write(f"{format_time(entry['start'])} --> {format_time(entry['end'])}\n")
f.write(f"{entry['text']}\n\n")
return final_audio_path, subtitle_path, None
async def multi_speaker_interface(text, generate_subtitles, speaker1_voice, speaker1_rate, speaker1_pitch,
speaker2_voice, speaker2_rate, speaker2_pitch):
"""Interface function for multi-speaker TTS"""
# Create speaker settings from individual parameters
speaker_settings = []
# Add Speaker 1 if voice is selected
if speaker1_voice:
speaker_settings.append({
'voice': speaker1_voice,
'rate': speaker1_rate,
'pitch': speaker1_pitch
})
# Add Speaker 2 if voice is selected
if speaker2_voice:
speaker_settings.append({
'voice': speaker2_voice,
'rate': speaker2_rate,
'pitch': speaker2_pitch
})
if not speaker_settings:
return None, None, gr.Warning("Please select at least one speaker voice.")
audio, subtitle, warning = await multi_speaker_tts(text, speaker_settings, generate_subtitles)
if warning:
return audio, subtitle, gr.Warning(warning)
return audio, subtitle, None
async def create_demo():
voices = await get_voices()
description = """
Convert text to speech using Microsoft Edge TTS. Adjust speech rate and pitch: 0 is default, positive values increase, negative values decrease.
You can also generate subtitle files (.srt) along with the audio.
**Note:** Edge TTS is a cloud-based service and requires an active internet connection."""
features = """
## ✨ Latest Features
- **Single & Multi-Speaker Support**: Choose between single speaker or multi-speaker modes
- **SRT Subtitle Support**: Upload SRT files or input SRT format text to generate perfectly synchronized speech
- **SRT Generation**: Create subtitle files alongside your audio for perfect timing
- **File Upload**: Easily upload TXT or SRT files for conversion
- **Smart Format Detection**: Automatically detects plain text or SRT subtitle format
"""
with gr.Blocks(title="Edge TTS Text-to-Speech", analytics_enabled=False) as demo:
gr.Markdown("# Edge TTS Text-to-Speech Converter")
gr.Markdown(description)
gr.Markdown(features)
with gr.Tabs() as tabs:
with gr.Tab("Single Speaker"):
with gr.Row():
with gr.Column(scale=3):
text_input = gr.Textbox(label="Input Text", lines=5, value="Hello, how are you doing!")
file_input = gr.File(label="Or upload a TXT/SRT file", file_types=[".txt", ".srt"])
with gr.Column(scale=2):
voice_dropdown = gr.Dropdown(
choices=[""] + list(voices.keys()),
label="Select Voice",
value=list(voices.keys())[0] if voices else "",
)
rate_slider = gr.Slider(
minimum=-50,
maximum=50,
value=0,
label="Speech Rate Adjustment (%)",
step=1,
)
pitch_slider = gr.Slider(
minimum=-20, maximum=20, value=0, label="Pitch Adjustment (Hz)", step=1
)
subtitle_checkbox = gr.Checkbox(label="Generate Subtitles (.srt)", value=False)
gr.Markdown("""
**πŸ“ Subtitle Timing Tip:**
When creating SRT files for continuous speech, avoid exact matching timestamps between segments.
**For smoother speech flow:**
```
1
00:00:00,112 --> 00:00:01,647
Hello how are you doing
2
00:00:01,617 --> 00:00:02,000
I'm fine
```
βœ… Create a small overlap (20-30ms) between segments to prevent pauses
❌ Avoid exact matching timestamps (where end time = next start time) except you want a pause
""")
submit_single_btn = gr.Button("Convert to Speech", variant="primary")
warning_single_md = gr.Markdown(visible=False)
single_outputs = [
gr.Audio(label="Generated Audio", type="filepath"),
gr.File(label="Generated Subtitles"),
warning_single_md
]
# Handle file upload to update text
file_input.change(
fn=update_text_from_file,
inputs=[file_input],
outputs=[text_input, warning_single_md]
)
# Handle submit button for single speaker
submit_single_btn.click(
fn=tts_interface,
api_name="predict",
inputs=[text_input, voice_dropdown, rate_slider, pitch_slider, subtitle_checkbox, file_input],
outputs=single_outputs
)
with gr.Tab("Multi Speaker"):
with gr.Column():
multi_text_input = gr.Textbox(
label="Multi-Speaker Text (Format: 'Speaker1: text' or 'S1: text')",
lines=8,
value="Speaker1: Hello, this is the first speaker.\nSpeaker2: And I'm the second speaker!"
)
multi_subtitle_checkbox = gr.Checkbox(label="Generate Subtitles (.srt)", value=False)
with gr.Row():
with gr.Column():
speaker1_voice = gr.Dropdown(
choices=[""] + list(voices.keys()),
label="Speaker 1 Voice",
value=list(voices.keys())[0] if voices else "",
)
speaker1_rate = gr.Slider(
minimum=-50,
maximum=50,
value=0,
label="Speaker 1 Rate (%)",
step=1,
)
speaker1_pitch = gr.Slider(
minimum=-20,
maximum=20,
value=0,
label="Speaker 1 Pitch (Hz)",
step=1,
)
with gr.Column():
speaker2_voice = gr.Dropdown(
choices=[""] + list(voices.keys()),
label="Speaker 2 Voice",
value=list(voices.keys())[10] if len(voices) > 10 else "",
)
speaker2_rate = gr.Slider(
minimum=-50,
maximum=50,
value=0,
label="Speaker 2 Rate (%)",
step=1,
)
speaker2_pitch = gr.Slider(
minimum=-20,
maximum=20,
value=0,
label="Speaker 2 Pitch (Hz)",
step=1,
)
submit_multi_btn = gr.Button("Convert Multi-Speaker to Speech", variant="primary")
warning_multi_md = gr.Markdown(visible=False)
multi_outputs = [
gr.Audio(label="Generated Audio", type="filepath"),
gr.File(label="Generated Subtitles"),
warning_multi_md
]
# Correctly pass the individual Gradio components to the click function
submit_multi_btn.click(
fn=multi_speaker_interface,
api_name="predict_multi",
inputs=[
multi_text_input,
multi_subtitle_checkbox,
speaker1_voice,
speaker1_rate,
speaker1_pitch,
speaker2_voice,
speaker2_rate,
speaker2_pitch
],
outputs=multi_outputs
)
gr.Markdown("Experience the power of Edge TTS for text-to-speech conversion with support for both single speaker and multi-speaker scenarios!")
return demo
async def main():
demo = await create_demo()
demo.queue(default_concurrency_limit=50)
demo.launch(show_api=True, show_error=True)
if __name__ == "__main__":
asyncio.run(main())