## fix overlap, remove silence, leave a tiny bit of silence ## Simplified ## Permanent voice change implemented import spaces import gradio as gr import edge_tts import asyncio import tempfile import os import re from pathlib import Path from pydub.silence import detect_nonsilent from pydub import AudioSegment def strip_silence(audio: AudioSegment, silence_thresh=-40, min_silence_len=100, silence_padding_ms=100): from pydub.silence import detect_nonsilent # Detect non-silent regions nonsilent = detect_nonsilent(audio, min_silence_len=min_silence_len, silence_thresh=silence_thresh) # If no speech is detected, return a small silent audio (not totally empty) if not nonsilent: return AudioSegment.silent(duration=silence_padding_ms) # Start and end of the first and last non-silent segments start_trim = nonsilent[0][0] end_trim = nonsilent[-1][1] # Add padding before and after the trimmed audio # Ensure the padding doesn't exceed the trimmed boundaries start_trim = max(0, start_trim - silence_padding_ms) # Ensure no negative start end_trim = min(len(audio), end_trim + silence_padding_ms) # Ensure end doesn't go past audio length # Return the trimmed and padded audio return audio[start_trim:end_trim] def get_silence(duration_ms=1000): # Create silent audio segment with specified parameters silent_audio = AudioSegment.silent( duration=duration_ms, frame_rate=24000 # 24kHz sampling rate ) # Set audio parameters silent_audio = silent_audio.set_channels(1) # Mono silent_audio = silent_audio.set_sample_width(4) # 32-bit (4 bytes per sample) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: # Export with specific bitrate and codec parameters silent_audio.export( tmp_file.name, format="mp3", bitrate="48k", parameters=[ "-ac", "1", # Mono "-ar", "24000", # Sample rate "-sample_fmt", "s32", # 32-bit samples "-codec:a", "libmp3lame" # MP3 codec ] ) return tmp_file.name # Get all available voices async def get_voices(): voices = await edge_tts.list_voices() return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} ## EDIT async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch): """Generates audio for a text segment, handling permanent and temporary voice changes.""" # Define the voice map for reference voice_map = { "1F": ("en-GB-SoniaNeural", 25, 0), "2F": ("en-US-JennyNeural", 0, 0), "3F": ("en-HK-YanNeural", 0, 0), "4F": ("en-US-EmmaNeural", 0, 0), "1M": ("en-AU-WilliamNeural", 0, 0), "2M": ("en-GB-RyanNeural", 0, 0), "3M": ("en-US-BrianMultilingualNeural", 0, 0), "4M": ("en-GB-ThomasNeural", 0, 0), "1O": ("en-GB-RyanNeural", -20, -10), # Old man "1C": ("en-GB-MaisieNeural", 0, 0), # Child "1V": ("vi-VN-HoaiMyNeural", 0, 0), "2V": ("vi-VN-NamMinhNeural", 0, 0), "3V": ("de-DE-SeraphinaMultilingualNeural", 25, 0), "4V": ("ko-KR-HyunsuMultilingualNeural", -20, 0), } # Initialize current voice and processing variables current_voice_full = default_voice current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else "" current_rate = rate current_pitch = pitch processed_text = text_segment.strip() # Track permanent voice and temporary changes permanent_voice = current_voice_short temp_voice = None # We'll process the text and adjust voices accordingly result = [] idx = 0 while idx < len(processed_text): # Detect potential voice change match = re.match(r"(1F|2F|3F|4F|1M|2M|3M|4M|1O|1C|1V|2V|3V|4V)(P?)(-?\d+)?", processed_text[idx:]) if match: prefix = match.group(1) permanent_flag = match.group(2) == 'P' # Check if it's a permanent change pitch_modifier = match.group(3) # This will be None or a number if permanent_flag: # Permanent voice change (e.g., "4VP") permanent_voice, pitch_adj, rate_adj = voice_map[prefix] current_pitch += pitch_adj current_rate += rate_adj result.append(f"{prefix}P") # Mark as permanent change temp_voice = None # Clear temporary voice changes elif pitch_modifier: # Temporary pitch adjustment (e.g., "4V-10" or "4V+5") pitch_adjustment = int(pitch_modifier) current_pitch += pitch_adjustment result.append(f"{prefix}{pitch_modifier}") # Mark as temporary change # Move index forward past the match idx += len(match.group(0)) continue # If no match, just add the normal text character result.append(processed_text[idx]) idx += 1 # Rebuild the text with permanent and temporary voice marks final_processed_text = ''.join(result).strip() if final_processed_text: rate_str = f"{current_rate:+d}%" pitch_str = f"{current_pitch:+d}Hz" # Retry logic for TTS for attempt in range(3): try: communicate = edge_tts.Communicate(final_processed_text, permanent_voice, rate=rate_str, pitch=pitch_str) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: audio_path = tmp_file.name await communicate.save(audio_path) audio = AudioSegment.from_mp3(audio_path) audio = strip_silence(audio, silence_thresh=-40, min_silence_len=100) stripped_path = tempfile.mktemp(suffix=".mp3") audio.export(stripped_path, format="mp3") return stripped_path except Exception as e: if attempt == 2: # Final failure: return 500ms of silence silent_audio = AudioSegment.silent(duration=500) fallback_path = tempfile.mktemp(suffix=".mp3") silent_audio.export(fallback_path, format="mp3") return fallback_path await asyncio.sleep(0.5) # Retry after brief pause return None ### END EDIT async def process_transcript_line(line, default_voice, rate, pitch): """Processes a single transcript line with HH:MM:SS.milliseconds timestamp and quoted text segments.""" match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) # Modified timestamp regex if match: hours, minutes, seconds, milliseconds, text_parts = match.groups() start_time_ms = ( int(hours) * 3600000 + int(minutes) * 60000 + int(seconds) * 1000 + int(milliseconds) ) audio_segments = [] split_parts = re.split(r'(")', text_parts) # Split by quote marks, keeping the quotes process_next = False for part in split_parts: if part == '"': process_next = not process_next continue if process_next and part.strip(): audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) if audio_path: audio_segments.append(audio_path) elif not process_next and part.strip(): audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) # Process unquoted text with default voice if audio_path: audio_segments.append(audio_path) return start_time_ms, audio_segments return None, None async def transcript_to_speech(transcript_text, voice, rate, pitch): if not transcript_text.strip(): return None, gr.Warning("Please enter transcript text.") if not voice: return None, gr.Warning("Please select a voice.") lines = transcript_text.strip().split('\n') timed_audio_segments = [] max_end_time_ms = 0 previous_end_time_ms = 0 i = 0 while i < len(lines): start_time, audio_paths = await process_transcript_line(lines[i], voice, rate, pitch) if start_time is not None and audio_paths: combined_line_audio = AudioSegment.empty() for path in audio_paths: try: audio = AudioSegment.from_mp3(path) #audio = strip_silence(audio, silence_thresh=-40, min_silence_len=100) combined_line_audio += audio #combined_line_audio = strip_silence(combined_line_audio, silence_thresh=-40, min_silence_len=100) os.remove(path) except FileNotFoundError: print(f"Warning: Audio file not found: {path}") current_audio_duration = len(combined_line_audio) intended_start_time = start_time # Check duration until the next timestamp if i + 1 < len(lines): next_start_time_line = lines[i+1] next_start_time_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line) if next_start_time_match: next_h, next_m, next_s, next_ms = next_start_time_match.groups() next_start_time_ms = (int(next_h) * 3600000 + int(next_m) * 60000 + int(next_s) * 1000 + int(next_ms)) duration_to_next = next_start_time_ms - start_time else: duration_to_next = float('inf') # Or some other large value if current_audio_duration > duration_to_next: # Hold and append audio from subsequent lines j = i + 1 while j < len(lines): next_start_time, next_audio_paths = await process_transcript_line(lines[j], voice, rate, pitch) if next_start_time is not None and next_audio_paths: for next_path in next_audio_paths: try: next_audio = AudioSegment.from_mp3(next_path) combined_line_audio += next_audio os.remove(next_path) except FileNotFoundError: print(f"Warning: Audio file not found: {next_path}") current_audio_duration = len(combined_line_audio) #check duration to the next timestamp. if j + 1 < len(lines): next_start_time_line_2 = lines[j+1] next_start_time_match_2 = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line_2) if next_start_time_match_2: next_h_2, next_m_2, next_s_2, next_ms_2 = next_start_time_match_2.groups() next_start_time_ms_2 = (int(next_h_2) * 3600000 + int(next_m_2) * 60000 + int(next_s_2) * 1000 + int(next_ms_2)) duration_to_next_2 = next_start_time_ms_2 - start_time if current_audio_duration <= duration_to_next_2: break else: break j += 1 else: break i = j #update i to j timed_audio_segments.append({'start': intended_start_time, 'audio': combined_line_audio}) previous_end_time_ms = max(previous_end_time_ms, intended_start_time + current_audio_duration) max_end_time_ms = max(max_end_time_ms, previous_end_time_ms) elif audio_paths: for path in audio_paths: try: os.remove(path) except FileNotFoundError: pass # Clean up even if no timestamp i += 1 if not timed_audio_segments: return None, "No processable audio segments found." final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000) for segment in timed_audio_segments: final_audio = final_audio.overlay(segment['audio'], position=segment['start']) combined_audio_path = tempfile.mktemp(suffix=".mp3") final_audio.export(combined_audio_path, format="mp3") return combined_audio_path, None @spaces.GPU def tts_interface(transcript, voice, rate, pitch): audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch)) return audio, warning async def create_demo(): voices = await get_voices() default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)" description = """ Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes. Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "1F Different Voice" Example: ``` 00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default. 00:00:05,000 "1C Yes," said the child, "it is fun!" ``` *************************************************************************************************** 1M = en-AU-WilliamNeural - en-AU (Male) 1F = en-GB-SoniaNeural - en-GB (Female) 2M = en-GB-RyanNeural - en-GB (Male) 2F = en-US-JennyNeural - en-US (Female) 3M = en-US-BrianMultilingualNeural - en-US (Male) 3F = en-HK-YanNeural - en-HK (Female) 4M = en-GB-ThomasNeural - en-GB (Male) 4F = en-US-EmmaNeural - en-US (Female) 1O = en-GB-RyanNeural - en-GB (Male) # Old Man 1C = en-GB-MaisieNeural - en-GB (Female) # Child 1V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) 2V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) 3V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female) 4V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male) **************************************************************************************************** """ demo = gr.Interface( fn=tts_interface, inputs=[ gr.Textbox(label="Timestamped Text with Voice Changes", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'), gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice), gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1), gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1) # Removed the duplicate value argument ], outputs=[ gr.Audio(label="Generated Audio", type="filepath"), gr.Markdown(label="Warning", visible=False) ], title="TTS with HH:MM:SS,milliseconds and In-Quote Voice Switching", description=description, analytics_enabled=False, allow_flagging=False ) return demo if __name__ == "__main__": demo = asyncio.run(create_demo()) demo.launch()