Spaces:
Sleeping
Sleeping
##fix overlap, remove silence, leave a tiny bit of silence | |
## Simplified | |
## Add 0 after prefix make it permanent voice | |
import spaces | |
import gradio as gr | |
import edge_tts | |
import asyncio | |
import tempfile | |
import os | |
import re | |
from pathlib import Path | |
from pydub.silence import detect_nonsilent | |
from pydub import AudioSegment | |
default_voice_short= "" | |
check1 = False # set global variable to check to see if process_text is begin of transcript line or not. | |
def strip_silence(audio: AudioSegment, silence_thresh=-40, min_silence_len=100, silence_padding_ms=100): | |
from pydub.silence import detect_nonsilent | |
# Detect non-silent regions | |
nonsilent = detect_nonsilent(audio, min_silence_len=min_silence_len, silence_thresh=silence_thresh) | |
# If no speech is detected, return a small silent audio (not totally empty) | |
if not nonsilent: | |
return AudioSegment.silent(duration=silence_padding_ms) | |
# Start and end of the first and last non-silent segments | |
start_trim = nonsilent[0][0] | |
end_trim = nonsilent[-1][1] | |
# Add padding before and after the trimmed audio | |
# Ensure the padding doesn't exceed the trimmed boundaries | |
if not check1: | |
silence_padding_ms=00 | |
start_trim = max(0, start_trim - silence_padding_ms) # Ensure no negative start | |
end_trim = min(len(audio), end_trim + silence_padding_ms) # Ensure end doesn't go past audio length | |
# Return the trimmed and padded audio | |
# Debugging: print input arguments | |
print(f"Audio length: {len(audio)} ms") | |
print(f"Silence threshold: {silence_thresh} dB") | |
print(f"Minimum silence length: {min_silence_len} ms") | |
print(f"Silence padding: {silence_padding_ms} ms") | |
print(f"Check1: {check1}**") | |
return audio[start_trim:end_trim] | |
def get_silence(duration_ms=1000): | |
# Create silent audio segment with specified parameters | |
silent_audio = AudioSegment.silent( | |
duration=duration_ms, | |
frame_rate=24000 # 24kHz sampling rate | |
) | |
# Set audio parameters | |
silent_audio = silent_audio.set_channels(1) # Mono | |
silent_audio = silent_audio.set_sample_width(4) # 32-bit (4 bytes per sample) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
# Export with specific bitrate and codec parameters | |
silent_audio.export( | |
tmp_file.name, | |
format="mp3", | |
bitrate="48k", | |
parameters=[ | |
"-ac", "1", # Mono | |
"-ar", "24000", # Sample rate | |
"-sample_fmt", "s32", # 32-bit samples | |
"-codec:a", "libmp3lame" # MP3 codec | |
] | |
) | |
return tmp_file.name | |
# Get all available voices | |
async def get_voices(): | |
voices = await edge_tts.list_voices() | |
return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} | |
async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch): | |
global default_voice_short # Use the global variable | |
global check1 # Use the global variable | |
"""Generates audio for a text segment, handling voice prefixes, retries, and fallback.""" | |
print(f"Text: {text_segment}") #Debug | |
voice_map = { | |
"1F": ("en-GB-SoniaNeural", 25, 0), | |
"2F": ("en-US-JennyNeural", 0, 0), | |
"3F": ("en-HK-YanNeural", 0, 0), | |
"4F": ("en-US-EmmaNeural", 0, 0), | |
"1M": ("en-AU-WilliamNeural", 0, 0), | |
"2M": ("en-GB-RyanNeural", 0, 0), | |
"3M": ("en-US-BrianMultilingualNeural", 0, 0), | |
"4M": ("en-GB-ThomasNeural", 0, 0), | |
"1O": ("en-GB-RyanNeural", -20, -10), | |
"1C": ("en-GB-MaisieNeural", 0, 0), | |
"1V": ("vi-VN-HoaiMyNeural", 0, 0), | |
"2V": ("vi-VN-NamMinhNeural", 0, 0), | |
"3V": ("en-US-EmmaMultilingualNeural", 0, 0), | |
"4V": ("en-US-BrianMultilingualNeural", 0, 0), | |
"5V": ("en-US-AvaMultilingualNeural", 0, 0), | |
"6V": ("en-US-AndrewMultilingualNeural", 0, 0), | |
"7V": ("de-DE-SeraphinaMultilingualNeural", 0, 0), | |
"8V": ("ko-KR-HyunsuMultilingualNeural", 0, 0), | |
} | |
if default_voice_short == "": | |
current_voice_full = default_voice | |
current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else "" | |
else: | |
current_voice_short = default_voice_short | |
current_rate = rate | |
current_pitch = pitch | |
processed_text = text_segment.strip() | |
detect = False | |
prefix = processed_text[:2] | |
if prefix in voice_map: | |
current_voice_short, pitch_adj, rate_adj = voice_map[prefix] | |
current_pitch += pitch_adj | |
current_rate += rate_adj | |
detect = True | |
match = re.search(r'[A-Za-z]+\-?\d+', processed_text) | |
if match: | |
group = match.group() | |
prefix_only = ''.join(filter(str.isalpha, group)) | |
number = int(''.join(ch for ch in group if ch.isdigit() or ch == '-')) | |
if number == 0: | |
default_voice_short= current_voice_short | |
current_pitch += number | |
processed_text = re.sub(r'[A-Za-z]+\-?\d+', '', processed_text, count=1).strip() | |
processed_text = processed_text[len(prefix_only):].strip() | |
elif detect: | |
processed_text = processed_text[2:].strip() | |
if processed_text: | |
rate_str = f"{current_rate:+d}%" | |
pitch_str = f"{current_pitch:+d}Hz" | |
# Retry logic | |
for attempt in range(3): | |
try: | |
communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
audio_path = tmp_file.name | |
await communicate.save(audio_path) | |
audio = AudioSegment.from_mp3(audio_path) | |
if not check1: | |
print(f"not last part of sentence - SHORT silence") | |
audio = strip_silence(audio, silence_thresh=-40, min_silence_len=50, silence_padding_ms=50) ##silence between sentences | |
else: | |
audio = strip_silence(audio, silence_thresh=-40, min_silence_len=50, silence_padding_ms=100) ##less silence for mid-sentence segments | |
print(f"Last part of sentence - long silence") | |
stripped_path = tempfile.mktemp(suffix=".mp3") | |
audio.export(stripped_path, format="mp3") | |
return stripped_path | |
except Exception as e: | |
print(f"Edge TTS Failed# {attempt}:: {e}") #Debug | |
if attempt == 2: | |
# Final failure: return 500ms of silence | |
silent_audio = AudioSegment.silent(duration=500) | |
fallback_path = tempfile.mktemp(suffix=".mp3") | |
silent_audio.export(fallback_path, format="mp3") | |
return fallback_path | |
await asyncio.sleep(0.5) # brief wait before retry | |
return None | |
async def process_transcript_line(line, default_voice, rate, pitch): | |
"""Processes a single transcript line with HH:MM:SS.milliseconds timestamp and quoted text segments.""" | |
match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) # Modified timestamp regex | |
if match: | |
count = 0 | |
hours, minutes, seconds, milliseconds, text_parts = match.groups() | |
start_time_ms = ( | |
int(hours) * 3600000 + | |
int(minutes) * 60000 + | |
int(seconds) * 1000 + | |
int(milliseconds) | |
) | |
audio_segments = [] | |
split_parts = re.split(r'(")', text_parts) # Split by quote marks, keeping the quotes | |
# Initialize a variable to track if it's the first iteration | |
global check1 # Use the global variable | |
check1 = False | |
process_next = False | |
for part in split_parts: | |
if part == '"': #process text that are inside quote | |
process_next = not process_next | |
check1 = False # set it to False | |
continue | |
if process_next and part.strip(): | |
#if part == split_parts[-1]: # check if this is laster iteration, | |
# check1 = False # set it to False | |
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) | |
if audio_path: | |
audio_segments.append(audio_path) | |
elif not process_next and part.strip(): | |
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) # Process unquoted text with default voice | |
if audio_path: | |
audio_segments.append(audio_path) | |
return start_time_ms, audio_segments | |
return None, None | |
async def transcript_to_speech(transcript_text, voice, rate, pitch): | |
if not transcript_text.strip(): | |
return None, gr.Warning("Please enter transcript text.") | |
if not voice: | |
return None, gr.Warning("Please select a voice.") | |
lines = transcript_text.strip().split('\n') | |
timed_audio_segments = [] | |
max_end_time_ms = 0 | |
previous_end_time_ms = 0 | |
i = 0 | |
while i < len(lines): | |
start_time, audio_paths = await process_transcript_line(lines[i], voice, rate, pitch) | |
if start_time is not None and audio_paths: | |
combined_line_audio = AudioSegment.empty() | |
for path in audio_paths: | |
try: | |
audio = AudioSegment.from_mp3(path) | |
#audio = strip_silence(audio, silence_thresh=-40, min_silence_len=100) | |
combined_line_audio += audio | |
#combined_line_audio = strip_silence(combined_line_audio, silence_thresh=-40, min_silence_len=100) | |
os.remove(path) | |
except FileNotFoundError: | |
print(f"Warning: Audio file not found: {path}") | |
current_audio_duration = len(combined_line_audio) | |
intended_start_time = start_time | |
# Check duration until the next timestamp | |
if i + 1 < len(lines): | |
next_start_time_line = lines[i+1] | |
next_start_time_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line) | |
if next_start_time_match: | |
next_h, next_m, next_s, next_ms = next_start_time_match.groups() | |
next_start_time_ms = (int(next_h) * 3600000 + int(next_m) * 60000 + int(next_s) * 1000 + int(next_ms)) | |
duration_to_next = next_start_time_ms - start_time | |
else: | |
duration_to_next = float('inf') # Or some other large value | |
if current_audio_duration > duration_to_next: | |
# Hold and append audio from subsequent lines | |
j = i + 1 | |
while j < len(lines): | |
next_start_time, next_audio_paths = await process_transcript_line(lines[j], voice, rate, pitch) | |
if next_start_time is not None and next_audio_paths: | |
for next_path in next_audio_paths: | |
try: | |
next_audio = AudioSegment.from_mp3(next_path) | |
combined_line_audio += next_audio | |
os.remove(next_path) | |
except FileNotFoundError: | |
print(f"Warning: Audio file not found: {next_path}") | |
current_audio_duration = len(combined_line_audio) | |
#check duration to the next timestamp. | |
if j + 1 < len(lines): | |
next_start_time_line_2 = lines[j+1] | |
next_start_time_match_2 = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line_2) | |
if next_start_time_match_2: | |
next_h_2, next_m_2, next_s_2, next_ms_2 = next_start_time_match_2.groups() | |
next_start_time_ms_2 = (int(next_h_2) * 3600000 + int(next_m_2) * 60000 + int(next_s_2) * 1000 + int(next_ms_2)) | |
duration_to_next_2 = next_start_time_ms_2 - start_time | |
if current_audio_duration <= duration_to_next_2: | |
break | |
else: | |
break | |
j += 1 | |
else: | |
break | |
i = j #update i to j | |
timed_audio_segments.append({'start': intended_start_time, 'audio': combined_line_audio}) | |
previous_end_time_ms = max(previous_end_time_ms, intended_start_time + current_audio_duration) | |
max_end_time_ms = max(max_end_time_ms, previous_end_time_ms) | |
elif audio_paths: | |
for path in audio_paths: | |
try: | |
os.remove(path) | |
except FileNotFoundError: | |
pass # Clean up even if no timestamp | |
i += 1 | |
if not timed_audio_segments: | |
return None, "No processable audio segments found." | |
print(f"Combining Audio - final stage.") | |
final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000) | |
for segment in timed_audio_segments: | |
final_audio = final_audio.overlay(segment['audio'], position=segment['start']) | |
combined_audio_path = tempfile.mktemp(suffix=".mp3") | |
final_audio.export(combined_audio_path, format="mp3") | |
global default_voice_short # Use the global variable | |
default_voice_short="" | |
print(f"Job done! reset voice back to default.") | |
return combined_audio_path, None | |
def tts_interface(transcript, voice, rate, pitch): | |
audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch)) | |
return audio, warning | |
async def create_demo(): | |
voices = await get_voices() | |
default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)" | |
description = """ | |
Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes. | |
Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "1F Different Voice" | |
Example: | |
``` | |
00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default. | |
00:00:05,000 "1C Yes," said the child, "it is fun!" | |
``` | |
*************************************************************************************************** | |
<b> 1F : en-GB-SoniaNeural | |
2F : en-US-JennyNeural | |
3F : en-HK-YanNeural | |
4F : en-US-EmmaNeural | |
1M : en-AU-WilliamNeural | |
2M : en-GB-RyanNeural | |
3M : en-US-BrianMultilingualNeural | |
4M : en-GB-ThomasNeural | |
1O : en-GB-RyanNeural" | |
1C : en-GB-MaisieNeural | |
1V : vi-VN-HoaiMyNeural | |
2V : vi-VN-NamMinhNeural | |
3V : en-US-EmmaMultilingualNeural | |
4V : en-US-BrianMultilingualNeural | |
5V : en-US-AvaMultilingualNeural | |
6V : en-US-AndrewMultilingualNeural | |
7V : de-DE-SeraphinaMultilingualNeural | |
8V : ko-KR-HyunsuMultilingualNeural </b> | |
**************************************************************************************************** | |
""" | |
demo = gr.Interface( | |
fn=tts_interface, | |
inputs=[ | |
gr.Textbox(label="Timestamped Text with Voice Changes", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'), | |
gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice), | |
gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1), | |
gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1) # Removed the duplicate value argument | |
], | |
outputs=[ | |
gr.Audio(label="Generated Audio", type="filepath"), | |
gr.Markdown(label="Warning", visible=False) | |
], | |
title="TTS with HH:MM:SS,milliseconds and In-Quote Voice Switching", | |
description=description, | |
analytics_enabled=False, | |
allow_flagging=False | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = asyncio.run(create_demo()) | |
demo.launch() | |