|
import gradio as gr |
|
import edge_tts |
|
import asyncio |
|
import tempfile |
|
import os |
|
import json |
|
import datetime |
|
import re |
|
import io |
|
|
|
|
|
async def get_voices(): |
|
voices = await edge_tts.list_voices() |
|
return { |
|
f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v["ShortName"] |
|
for v in voices |
|
} |
|
|
|
|
|
def format_time(milliseconds): |
|
"""Convert milliseconds to SRT time format (HH:MM:SS,mmm)""" |
|
|
|
milliseconds = int(milliseconds) |
|
seconds, milliseconds = divmod(milliseconds, 1000) |
|
minutes, seconds = divmod(seconds, 60) |
|
hours, minutes = divmod(minutes, 60) |
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" |
|
|
|
|
|
def time_to_ms(time_str): |
|
"""Convert SRT time format (HH:MM:SS,mmm) to milliseconds""" |
|
hours, minutes, rest = time_str.split(':') |
|
seconds, milliseconds = rest.split(',') |
|
return int(hours) * 3600000 + int(minutes) * 60000 + int(seconds) * 1000 + int(milliseconds) |
|
|
|
|
|
def parse_srt_content(content): |
|
"""Parse SRT file content and extract text and timing data""" |
|
lines = content.split('\n') |
|
timing_data = [] |
|
text_only = [] |
|
|
|
i = 0 |
|
while i < len(lines): |
|
if not lines[i].strip(): |
|
i += 1 |
|
continue |
|
|
|
|
|
if lines[i].strip().isdigit(): |
|
subtitle_num = int(lines[i].strip()) |
|
i += 1 |
|
if i >= len(lines): |
|
break |
|
|
|
|
|
timestamp_match = re.search(r'(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})', lines[i]) |
|
if timestamp_match: |
|
start_time = timestamp_match.group(1) |
|
end_time = timestamp_match.group(2) |
|
|
|
|
|
start_ms = time_to_ms(start_time) |
|
end_ms = time_to_ms(end_time) |
|
|
|
i += 1 |
|
subtitle_text = "" |
|
|
|
|
|
while i < len(lines) and lines[i].strip(): |
|
subtitle_text += lines[i] + " " |
|
i += 1 |
|
|
|
subtitle_text = subtitle_text.strip() |
|
text_only.append(subtitle_text) |
|
timing_data.append({ |
|
'text': subtitle_text, |
|
'start': start_ms, |
|
'end': end_ms |
|
}) |
|
else: |
|
i += 1 |
|
|
|
return " ".join(text_only), timing_data |
|
|
|
|
|
async def process_uploaded_file(file): |
|
"""Process uploaded file and detect if it's SRT or plain text""" |
|
if file is None: |
|
return None, None, False, None |
|
|
|
try: |
|
file_path = file.name if hasattr(file, 'name') else file |
|
file_extension = os.path.splitext(file_path)[1].lower() |
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
|
|
is_subtitle = False |
|
timing_data = None |
|
|
|
if file_extension == '.srt' or re.search(r'^\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}', content, re.MULTILINE): |
|
is_subtitle = True |
|
text_content, timing_data = parse_srt_content(content) |
|
|
|
return text_content, timing_data, is_subtitle, content |
|
else: |
|
|
|
text_content = content |
|
|
|
return text_content, timing_data, is_subtitle, content |
|
except Exception as e: |
|
return f"Error processing file: {str(e)}", None, False, None |
|
|
|
|
|
async def update_text_from_file(file): |
|
"""Callback function to update text area when file is uploaded""" |
|
if file is None: |
|
return "", None |
|
|
|
text_content, timing_data, is_subtitle, original_content = await process_uploaded_file(file) |
|
if original_content is not None: |
|
|
|
return original_content, None |
|
return "", gr.Warning("Failed to process the file") |
|
|
|
|
|
async def text_to_speech(text, voice, rate, pitch, generate_subtitles=False, uploaded_file=None): |
|
"""Convert text to speech, handling both direct text input and uploaded files""" |
|
if not text.strip() and uploaded_file is None: |
|
return None, None, "Please enter text or upload a file to convert." |
|
if not voice: |
|
return None, None, "Please select a voice." |
|
|
|
|
|
is_srt_format = bool(re.search(r'^\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}', text, re.MULTILINE)) |
|
|
|
|
|
if is_srt_format: |
|
text_content, timing_data = parse_srt_content(text) |
|
is_subtitle = True |
|
else: |
|
|
|
timing_data = None |
|
is_subtitle = False |
|
|
|
if uploaded_file is not None: |
|
file_text, file_timing_data, file_is_subtitle, _ = await process_uploaded_file(uploaded_file) |
|
if isinstance(file_text, str) and file_text.strip(): |
|
if file_is_subtitle: |
|
text = file_text |
|
timing_data = file_timing_data |
|
is_subtitle = file_is_subtitle |
|
|
|
voice_short_name = voice.split(" - ")[0] |
|
rate_str = f"{rate:+d}%" |
|
pitch_str = f"{pitch:+d}Hz" |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: |
|
audio_path = tmp_file.name |
|
|
|
subtitle_path = None |
|
|
|
|
|
if is_srt_format or (is_subtitle and timing_data): |
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
audio_segments = [] |
|
max_end_time = 0 |
|
|
|
|
|
if not timing_data and is_srt_format: |
|
_, timing_data = parse_srt_content(text) |
|
|
|
|
|
for i, entry in enumerate(timing_data): |
|
segment_text = entry['text'] |
|
start_time = entry['start'] |
|
end_time = entry['end'] |
|
max_end_time = max(max_end_time, end_time) |
|
|
|
|
|
segment_file = os.path.join(temp_dir, f"segment_{i}.mp3") |
|
|
|
|
|
communicate = edge_tts.Communicate(segment_text, voice_short_name, rate=rate_str, pitch=pitch_str) |
|
await communicate.save(segment_file) |
|
|
|
audio_segments.append({ |
|
'file': segment_file, |
|
'start': start_time, |
|
'end': end_time, |
|
'text': segment_text |
|
}) |
|
|
|
|
|
import wave |
|
import audioop |
|
from pydub import AudioSegment |
|
|
|
|
|
final_audio = AudioSegment.silent(duration=max_end_time + 1000) |
|
|
|
|
|
for segment in audio_segments: |
|
segment_audio = AudioSegment.from_file(segment['file']) |
|
final_audio = final_audio.overlay(segment_audio, position=segment['start']) |
|
|
|
|
|
final_audio.export(audio_path, format="mp3") |
|
|
|
|
|
if generate_subtitles: |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".srt") as srt_file: |
|
subtitle_path = srt_file.name |
|
with open(subtitle_path, "w", encoding="utf-8") as f: |
|
for i, entry in enumerate(timing_data): |
|
f.write(f"{i+1}\n") |
|
f.write(f"{format_time(entry['start'])} --> {format_time(entry['end'])}\n") |
|
f.write(f"{entry['text']}\n\n") |
|
else: |
|
|
|
communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str) |
|
|
|
if generate_subtitles: |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".srt") as srt_file: |
|
subtitle_path = srt_file.name |
|
|
|
|
|
async def process_audio(): |
|
word_boundaries = [] |
|
async for chunk in communicate.stream(): |
|
if chunk["type"] == "audio": |
|
with open(audio_path, "ab") as audio_file: |
|
audio_file.write(chunk["data"]) |
|
elif chunk["type"] == "WordBoundary": |
|
word_boundaries.append(chunk) |
|
return word_boundaries |
|
|
|
word_boundaries = await process_audio() |
|
|
|
|
|
phrases = [] |
|
current_phrase = [] |
|
current_text = "" |
|
phrase_start = 0 |
|
|
|
for i, boundary in enumerate(word_boundaries): |
|
word = boundary["text"] |
|
start_time = boundary["offset"] / 10000 |
|
duration = boundary["duration"] / 10000 |
|
end_time = start_time + duration |
|
|
|
if not current_phrase: |
|
phrase_start = start_time |
|
|
|
current_phrase.append(boundary) |
|
|
|
if word in ['.', ',', '!', '?', ':', ';'] or word.startswith(('.', ',', '!', '?', ':', ';')): |
|
current_text = current_text.rstrip() + word + " " |
|
else: |
|
current_text += word + " " |
|
|
|
|
|
should_break = False |
|
|
|
|
|
if word.endswith(('.', '!', '?', ':', ';', ',')) or i == len(word_boundaries) - 1: |
|
should_break = True |
|
|
|
|
|
elif len(current_phrase) >= 5: |
|
should_break = True |
|
|
|
|
|
elif i < len(word_boundaries) - 1: |
|
next_start = word_boundaries[i + 1]["offset"] / 10000 |
|
if next_start - end_time > 300: |
|
should_break = True |
|
|
|
if should_break or i == len(word_boundaries) - 1: |
|
if current_phrase: |
|
last_boundary = current_phrase[-1] |
|
phrase_end = (last_boundary["offset"] + last_boundary["duration"]) / 10000 |
|
phrases.append({ |
|
"text": current_text.strip(), |
|
"start": phrase_start, |
|
"end": phrase_end |
|
}) |
|
current_phrase = [] |
|
current_text = "" |
|
|
|
|
|
with open(subtitle_path, "w", encoding="utf-8") as srt_file: |
|
for i, phrase in enumerate(phrases): |
|
|
|
srt_file.write(f"{i+1}\n") |
|
srt_file.write(f"{format_time(phrase['start'])} --> {format_time(phrase['end'])}\n") |
|
srt_file.write(f"{phrase['text']}\n\n") |
|
|
|
return audio_path, subtitle_path, None |
|
|
|
|
|
async def tts_interface(text, voice, rate, pitch, generate_subtitles, uploaded_file=None): |
|
audio, subtitle, warning = await text_to_speech(text, voice, rate, pitch, generate_subtitles, uploaded_file) |
|
if warning: |
|
return audio, subtitle, gr.Warning(warning) |
|
return audio, subtitle, None |
|
|
|
|
|
async def create_demo(): |
|
voices = await get_voices() |
|
|
|
description = """ |
|
Convert text to speech using Microsoft Edge TTS. Adjust speech rate and pitch: 0 is default, positive values increase, negative values decrease. |
|
You can also generate subtitle files (.srt) along with the audio. |
|
|
|
**Note:** Edge TTS is a cloud-based service and requires an active internet connection.""" |
|
|
|
features = """ |
|
## ✨ Latest Features |
|
- **SRT Subtitle Support**: Upload SRT files or input SRT format text to generate perfectly synchronized speech |
|
- **SRT Generation**: Create subtitle files alongside your audio for perfect timing |
|
- **File Upload**: Easily upload TXT or SRT files for conversion |
|
- **Smart Format Detection**: Automatically detects plain text or SRT subtitle format |
|
""" |
|
|
|
with gr.Blocks(title="Edge TTS Text-to-Speech", analytics_enabled=False) as demo: |
|
gr.Markdown("# Edge TTS Text-to-Speech Converter") |
|
gr.Markdown(description) |
|
gr.Markdown(features) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
text_input = gr.Textbox(label="Input Text", lines=5, value="Hello, how are you doing!") |
|
file_input = gr.File(label="Or upload a TXT/SRT file", file_types=[".txt", ".srt"]) |
|
|
|
with gr.Column(scale=2): |
|
voice_dropdown = gr.Dropdown( |
|
choices=[""] + list(voices.keys()), |
|
label="Select Voice", |
|
value=list(voices.keys())[0] if voices else "", |
|
) |
|
rate_slider = gr.Slider( |
|
minimum=-50, |
|
maximum=50, |
|
value=0, |
|
label="Speech Rate Adjustment (%)", |
|
step=1, |
|
) |
|
pitch_slider = gr.Slider( |
|
minimum=-20, maximum=20, value=0, label="Pitch Adjustment (Hz)", step=1 |
|
) |
|
subtitle_checkbox = gr.Checkbox(label="Generate Subtitles (.srt)", value=False) |
|
|
|
submit_btn = gr.Button("Convert to Speech", variant="primary") |
|
warning_md = gr.Markdown(visible=False) |
|
|
|
outputs = [ |
|
gr.Audio(label="Generated Audio", type="filepath"), |
|
gr.File(label="Generated Subtitles"), |
|
warning_md |
|
] |
|
|
|
|
|
file_input.change( |
|
fn=update_text_from_file, |
|
inputs=[file_input], |
|
outputs=[text_input, warning_md] |
|
) |
|
|
|
|
|
submit_btn.click( |
|
fn=tts_interface, |
|
api_name="predict", |
|
inputs=[text_input, voice_dropdown, rate_slider, pitch_slider, subtitle_checkbox, file_input], |
|
outputs=outputs |
|
) |
|
|
|
gr.Markdown("Experience the power of Edge TTS for text-to-speech conversion, and explore our advanced Text-to-Video Converter for even more creative possibilities!") |
|
|
|
return demo |
|
|
|
|
|
async def main(): |
|
demo = await create_demo() |
|
demo.queue(default_concurrency_limit=50) |
|
demo.launch(show_api=True, show_error=True) |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |
|
|