Spaces:
Sleeping
Sleeping
import spaces | |
import gradio as gr | |
import edge_tts | |
import asyncio | |
import tempfile | |
import os | |
import re | |
from pathlib import Path | |
from pydub import AudioSegment | |
import librosa | |
import soundfile as sf | |
import numpy as np | |
from pydub import AudioSegment | |
from pydub.playback import play | |
import math | |
from scipy.signal import butter, sosfiltfilt | |
def apply_low_pass_filter(audio_segment, cutoff_freq, order=6): | |
""" | |
Applies a low-pass filter to an AudioSegment. | |
Args: | |
audio_segment: The AudioSegment to filter. | |
cutoff_freq: The cutoff frequency in Hz. | |
order: The order of the Butterworth filter. | |
Returns: | |
A new AudioSegment with the filtered audio. | |
""" | |
segment_array = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) | |
frame_rate = audio_segment.frame_rate | |
nyquist_freq = 0.5 * frame_rate | |
normalized_cutoff = cutoff_freq / nyquist_freq | |
sos = butter(order, normalized_cutoff, btype='low', output='sos') | |
filtered_array = sosfiltfilt(sos, segment_array) | |
sample_width = audio_segment.sample_width | |
dtype = None | |
if sample_width == 1: | |
dtype = np.int8 | |
elif sample_width == 2: | |
dtype = np.int16 | |
elif sample_width == 3: | |
dtype = np.int32 # Or potentially a custom type depending on the library | |
elif sample_width == 4: | |
dtype = np.int32 | |
if dtype is not None: | |
return audio_segment._spawn(filtered_array.astype(dtype)) | |
else: | |
raise ValueError(f"Unsupported sample width: {sample_width}") | |
def get_silence(duration_ms=1000): | |
# Create silent audio segment with specified parameters | |
silent_audio = AudioSegment.silent( | |
duration=duration_ms, | |
frame_rate=24000 # 24kHz sampling rate | |
) | |
# Set audio parameters | |
silent_audio = silent_audio.set_channels(1) # Mono | |
silent_audio = silent_audio.set_sample_width(4) # 32-bit (4 bytes per sample) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
# Export with specific bitrate and codec parameters | |
silent_audio.export( | |
tmp_file.name, | |
format="mp3", | |
bitrate="48k", | |
parameters=[ | |
"-ac", "1", # Mono | |
"-ar", "24000", # Sample rate | |
"-sample_fmt", "s32", # 32-bit samples | |
"-codec:a", "libmp3lame" # MP3 codec | |
] | |
) | |
return tmp_file.name | |
# Get all available voices | |
async def get_voices(): | |
try: | |
voices = await edge_tts.list_voices() | |
return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} | |
except Exception as e: | |
print(f"Error listing voices: {e}") | |
return {} | |
async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, overall_target_duration_ms=None, speed_adjustment_factor=1.0): | |
"""Generates audio for a text segment, handling voice prefixes and adjusting rate for duration.""" | |
current_voice_full = default_voice | |
current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else "" | |
current_rate = rate | |
current_pitch = pitch | |
processed_text = text_segment.strip() | |
#print(f"Processing this text segment: '{processed_text}'") # Debug | |
voice_map = { | |
"1F": "en-GB-SoniaNeural", | |
"2M": "en-GB-RyanNeural", | |
"3M": "en-US-BrianMultilingualNeural", | |
"2F": "en-US-JennyNeural", | |
"1M": "en-AU-WilliamNeural", | |
"3F": "en-HK-YanNeural", | |
"4M": "en-GB-ThomasNeural", | |
"4F": "en-US-EmmaNeural", | |
"1O": "en-GB-RyanNeural", # Old Man | |
"1C": "en-GB-MaisieNeural", # Child | |
"1V": "vi-VN-HoaiMyNeural", # Vietnamese (Female) | |
"2V": "vi-VN-NamMinhNeural", # Vietnamese (Male) | |
"3V": "vi-VN-HoaiMyNeural", # Vietnamese (Female) | |
"4V": "vi-VN-NamMinhNeural", # Vietnamese (Male) | |
} | |
detect = 0 | |
#iterate throught the voice map to see if a match if found, if found then set the voice | |
for prefix, voice_short in voice_map.items(): | |
if processed_text.startswith(prefix): | |
current_voice_short = voice_short | |
if prefix in ["1F", "3F", "1V", "3V"]: | |
current_pitch = 25 | |
elif prefix in ["1O", "4V"]: | |
current_pitch = -20 | |
current_rate = -10 | |
detect = 1 | |
processed_text = processed_text[len(prefix):].strip() | |
break | |
#match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text) | |
#example of match: XYZ-45: Group 1: XYZ, Group 2: -45 | |
match = re.search(r'([A-Za-z]+)([-]?\d*)', processed_text) | |
if match: | |
prefix_pitch = match.group(1) | |
number_str = match.group(2) | |
if number_str: # Check if the second group (number part) is not empty | |
try: | |
number = int(number_str) | |
# Now you can use the 'number' variable | |
print(f"Prefix: {prefix_pitch}, Number: {number}") # Example usage | |
except ValueError as e: | |
print(f"Error converting number string to int: {e}") | |
number = 0 # Or some other default value | |
else: | |
number = 0 # Or some other default value if no number is found | |
print(f"Prefix: {prefix_pitch}, No number found.") # Example handling | |
if prefix_pitch in voice_map: | |
current_pitch += number | |
#processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip() | |
processed_text = re.sub(r'([A-Za-z]+)([-]?\d*)', '', processed_text, count=1).strip() | |
elif detect: | |
processed_text = processed_text.lstrip('-0123456789').strip() # Remove potential leftover numbers | |
if processed_text: | |
rate_str = f"{current_rate:+d}%" | |
pitch_str = f"{current_pitch:+d}Hz" | |
print(f"Sending to Edge: '{processed_text}'") # Debug | |
try: | |
communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
audio_path = tmp_file.name | |
await communicate.save(audio_path) | |
if os.path.exists(audio_path): | |
audio = AudioSegment.from_mp3(audio_path) | |
# Trim leading and trailing silence | |
def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10): | |
trim_ms = 0 | |
assert chunk_size > 0 # to avoid infinite loop | |
while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound): | |
trim_ms += chunk_size | |
return trim_ms | |
start_trim = detect_leading_silence(audio) | |
end_trim = detect_leading_silence(audio.reverse()) | |
trimmed_audio = audio[start_trim:len(audio)-end_trim] | |
trimmed_audio.export(audio_path, format="mp3") # Overwrite with trimmed version | |
return audio_path | |
except Exception as e: | |
print(f"Edge TTS error processing '{processed_text}': {e}") | |
return None | |
return None | |
async def process_transcript_line(line, next_line_start_time, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor): | |
"""Processes a single transcript line with HH:MM:SS,milliseconds timestamp.""" | |
match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) | |
if match: | |
start_h, start_m, start_s, start_ms, text_parts = match.groups() | |
start_time_ms = ( | |
int(start_h) * 3600000 + | |
int(start_m) * 60000 + | |
int(start_s) * 1000 + | |
int(start_ms) | |
) | |
audio_segments = [] | |
split_parts = re.split(r'[“”"]', text_parts) | |
process_next = False | |
for part in split_parts: | |
if part == '"': | |
process_next = not process_next | |
continue | |
if process_next and part.strip(): | |
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor) | |
if audio_path: | |
audio_segments.append(audio_path) | |
elif not process_next and part.strip(): | |
audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor) | |
if audio_path: | |
audio_segments.append(audio_path) | |
if audio_segments: | |
combined_audio = AudioSegment.empty() | |
for segment_path in audio_segments: | |
try: | |
segment = AudioSegment.from_mp3(segment_path) | |
combined_audio += segment | |
os.remove(segment_path) # Clean up individual segment files | |
except Exception as e: | |
print(f"Error loading or combining audio segment {segment_path}: {e}") | |
return None, None, None | |
combined_audio_path = f"combined_audio_{start_time_ms}.mp3" | |
try: | |
combined_audio.export(combined_audio_path, format="mp3") | |
return start_time_ms, [combined_audio_path], overall_duration_ms | |
except Exception as e: | |
print(f"Error exporting combined audio: {e}") | |
return None, None, None | |
return start_time_ms, [], overall_duration_ms # Return empty list if no audio generated | |
return None, None, None | |
async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjustment_factor): | |
if not transcript_text.strip(): | |
return None, gr.Warning("Please enter transcript text.") | |
if not voice: | |
return None, gr.Warning("Please select a voice.") | |
lines = transcript_text.strip().split('\n') | |
timed_audio_segments = [] | |
max_end_time_ms = 0 | |
for i, line in enumerate(lines): | |
next_line_start_time = None | |
if i < len(lines) - 1: | |
next_line_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', lines[i+1]) | |
if next_line_match: | |
nh, nm, ns, nms = next_line_match.groups() | |
next_line_start_time = ( | |
int(nh) * 3600000 + | |
int(nm) * 60000 + | |
int(ns) * 1000 + | |
int(nms) | |
) | |
current_line_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) | |
if current_line_match: | |
sh, sm, ss, sms, text_content = current_line_match.groups() | |
start_time_ms = ( | |
int(sh) * 3600000 + | |
int(sm) * 60000 + | |
int(ss) * 1000 + | |
int(sms) | |
) | |
overall_duration_ms = None | |
if next_line_start_time is not None: | |
overall_duration_ms = next_line_start_time - start_time_ms | |
start_time, audio_paths, duration = await process_transcript_line(line, next_line_start_time, voice, rate, pitch, overall_duration_ms, speed_adjustment_factor) | |
if start_time is not None and audio_paths: | |
combined_line_audio = AudioSegment.empty() | |
total_generated_duration_ms = 0 | |
for path in audio_paths: | |
if path: | |
try: | |
audio = AudioSegment.from_mp3(path) | |
combined_line_audio += audio | |
total_generated_duration_ms += len(audio) | |
os.remove(path) | |
except FileNotFoundError: | |
print(f"Warning: Audio file not found: {path}") | |
Rem1=''' | |
if combined_line_audio and overall_duration_ms is not None and overall_duration_ms > 0 and total_generated_duration_ms > overall_duration_ms: | |
speed_factor = (total_generated_duration_ms / overall_duration_ms) * speed_adjustment_factor | |
if speed_factor > 0: | |
if speed_factor < 1.0: | |
speed_factor = 1.0 | |
combined_line_audio = combined_line_audio.speedup(playback_speed=speed_factor) | |
if combined_line_audio: | |
timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio}) | |
max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio)) | |
''' | |
if combined_line_audio and overall_duration_ms is not None and overall_duration_ms > 0 and total_generated_duration_ms > overall_duration_ms: | |
speed_factor = (total_generated_duration_ms / overall_duration_ms) * speed_adjustment_factor | |
if speed_factor > 0: | |
if speed_factor < 1.0: | |
speed_factor = 1.0 | |
combined_line_audio = combined_line_audio.speedup(playback_speed=speed_factor) | |
# Apply low-pass filter AFTER speed adjustment | |
cutoff_freq = 7000.0 # Adjust as needed | |
combined_line_audio = apply_low_pass_filter(combined_line_audio, cutoff_freq) | |
if combined_line_audio: | |
timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio}) | |
max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio)) | |
elif audio_paths: | |
for path in audio_paths: | |
if path: | |
try: | |
os.remove(path) | |
except FileNotFoundError: | |
pass # Clean up even if no timestamp | |
if not timed_audio_segments: | |
return None, "No processable audio segments found." | |
oldx= ''' | |
final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000) | |
for segment in timed_audio_segments: | |
final_audio = final_audio.overlay(segment['audio'], position=segment['start']) | |
''' | |
final_audio = AudioSegment.silent(duration=int(max_end_time_ms * 1000 + 500), frame_rate=24000) | |
for segment in timed_audio_segments: | |
start_position_ms = int(segment['start'] * 1000) | |
audio_to_overlay = segment['audio'] | |
if start_position_ms + len(audio_to_overlay) > len(final_audio): | |
padding_needed = (start_position_ms + len(audio_to_overlay)) - len(final_audio) | |
final_audio += AudioSegment.silent(duration=padding_needed + 100, frame_rate=final_audio.frame_rate) | |
try: | |
final_audio = final_audio.overlay(audio_to_overlay, position=start_position_ms) | |
except Exception as e: | |
print(f"Error during overlay: {e}") | |
print(f" - Start position (ms): {start_position_ms}") | |
print(f" - Length of audio to overlay (ms): {len(audio_to_overlay)}") | |
print(f" - Length of final_audio (ms): {len(final_audio)}") | |
# Consider adding logic here to handle the error, e.g., truncating audio_to_overlay | |
# or skipping the overlay if it consistently fails. | |
combined_audio_path = tempfile.mktemp(suffix=".mp3") | |
final_audio.export(combined_audio_path, format="mp3") | |
return combined_audio_path, None | |
def tts_interface(transcript, voice, rate, pitch, speed_adjustment_factor): | |
audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch, speed_adjustment_factor)) | |
return audio, warning | |
async def create_demo(): | |
voices = await get_voices() | |
default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)" | |
description = """ | |
Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes. | |
The duration for each line is determined by the timestamp of the following line. | |
The speed of the ENTIRE generated audio for a line will be adjusted to fit within this duration. | |
If there is no subsequent timestamp, the speed adjustment will be skipped. | |
You can control the intensity of the speed adjustment using the "Speed Adjustment Factor" slider. | |
Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "AnotherVoicePrefix More Text"` | |
Example: | |
``` | |
00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default. | |
00:00:05,500 "1C Yes," said the child, "it is fun!" | |
``` | |
*************************************************************************************************** | |
1M = en-AU-WilliamNeural - en-AU (Male) | |
1F = en-GB-SoniaNeural - en-GB (Female) | |
2M = en-GB-RyanNeural - en-GB (Male) | |
2F = en-US-JennyNeural - en-US (Female) | |
3M = en-US-BrianMultilingualNeural - en-US (Male) | |
3F = en-HK-YanNeural - en-HK (Female) | |
4M = en-GB-ThomasNeural - en-GB (Male) | |
4F = en-US-EmmaNeural - en-US (Female) | |
1O = en-GB-RyanNeural - en-GB (Male) # Old Man | |
1C = en-GB-MaisieNeural - en-GB (Female) # Child | |
1V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) | |
2V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) | |
3V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) | |
4V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) | |
**************************************************************************************************** | |
""" | |
demo = gr.Interface( | |
fn=tts_interface, | |
inputs=[ | |
gr.Textbox(label="Timestamped Text with Voice Changes and Duration", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'), | |
gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice), | |
gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1), | |
gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1), | |
gr.Slider(minimum=0.5, maximum=1.5, value=1.0, step=0.05, label="Speed Adjustment Factor") | |
], | |
outputs=[ | |
gr.Audio(label="Generated Audio", type="filepath"), | |
gr.Markdown(label="Warning", visible=False) | |
], | |
title="TTS with Line-Wide Duration Adjustment and In-Quote Voice Switching", | |
description=description, | |
analytics_enabled=False, | |
allow_flagging=False | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = asyncio.run(create_demo()) | |
demo.launch() | |