Spaces:
Running
Running
File size: 7,341 Bytes
952467c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""
Text-to-speech audio generation for translated subtitles.
"""
import os
import time
import shutil
import tempfile
from pathlib import Path
from tqdm import tqdm
import subprocess
from gtts import gTTS
import pysrt
from src.utils.logger import get_logger
from src.audio.extractor import create_silent_audio
from config import OUTPUT_DIR, TTS_VOICES, MAX_RETRY_ATTEMPTS
logger = get_logger(__name__)
def generate_translated_audio(srt_path, target_lang, video_duration=180):
"""
Generate translated audio using text-to-speech for each subtitle.
Args:
srt_path (str): Path to the SRT subtitle file
target_lang (str): Target language code (e.g., 'en', 'es')
video_duration (float): Duration of the original video in seconds
Returns:
Path: Path to the translated audio file
Raises:
Exception: If audio generation fails
"""
try:
srt_path = Path(srt_path)
logger.info(f"Generating translated audio for {target_lang} from {srt_path}")
# Load subtitles
subs = pysrt.open(srt_path, encoding="utf-8")
logger.info(f"Loaded {len(subs)} subtitles from SRT file")
# Create temporary directory for audio chunks
temp_dir = Path(tempfile.mkdtemp(prefix=f"audio_{target_lang}_", dir=OUTPUT_DIR / "temp"))
logger.debug(f"Created temporary directory: {temp_dir}")
# Generate TTS for each subtitle
audio_files = []
timings = []
logger.info(f"Generating speech for {len(subs)} subtitles in {target_lang}")
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")):
text = sub.text.strip()
if not text:
continue
# Get timing information
start_time = (sub.start.hours * 3600 +
sub.start.minutes * 60 +
sub.start.seconds +
sub.start.milliseconds / 1000)
end_time = (sub.end.hours * 3600 +
sub.end.minutes * 60 +
sub.end.seconds +
sub.end.milliseconds / 1000)
duration = end_time - start_time
# Generate TTS audio
tts_lang = TTS_VOICES.get(target_lang, target_lang)
audio_file = temp_dir / f"chunk_{i:04d}.mp3"
# Add a retry mechanism
retry_count = 0
while retry_count < MAX_RETRY_ATTEMPTS:
try:
# For certain languages, use slower speed which might improve reliability
slow_option = target_lang in ["hi", "ja", "zh-CN", "ar"]
tts = gTTS(text=text, lang=target_lang, slow=slow_option)
tts.save(str(audio_file))
if audio_file.exists() and audio_file.stat().st_size > 0:
break
else:
raise Exception("Generated audio file is empty")
except Exception as e:
retry_count += 1
logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}")
time.sleep(1) # Wait before retrying
# If still failing after retries, try with shorter text
if retry_count == MAX_RETRY_ATTEMPTS - 1 and len(text) > 100:
logger.warning(f"Trying with shortened text for {target_lang}")
shortened_text = text[:100] + "..."
tts = gTTS(text=shortened_text, lang=target_lang, slow=True)
tts.save(str(audio_file))
if audio_file.exists() and audio_file.stat().st_size > 0:
audio_files.append(audio_file)
timings.append((start_time, end_time, duration, audio_file))
else:
logger.warning(f"Failed to generate audio for subtitle {i}")
# Check if we generated any audio files
if not audio_files:
logger.warning(f"No audio files were generated for {target_lang}")
# Create a silent audio file as fallback
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
create_silent_audio(video_duration, silent_audio)
return silent_audio
# Create a silent audio track as base
silence_file = temp_dir / "silence.wav"
create_silent_audio(video_duration, silence_file)
# Create filter complex for audio mixing
filter_complex = []
input_count = 1 # Starting with 1 because 0 is the silence track
# Start with silent track
filter_parts = ["[0:a]"]
# Add each audio segment
for start_time, end_time, duration, audio_file in timings:
delay_ms = int(start_time * 1000)
filter_parts.append(f"[{input_count}:a]adelay={delay_ms}|{delay_ms}")
input_count += 1
# Mix all audio tracks
filter_parts.append(f"amix=inputs={input_count}:dropout_transition=0:normalize=0[aout]")
filter_complex = ";".join(filter_parts)
# Build the ffmpeg command
cmd = ['ffmpeg', '-y']
# Add silent base track
cmd.extend(['-i', str(silence_file)])
# Add all audio chunks
for audio_file in audio_files:
cmd.extend(['-i', str(audio_file)])
# Add filter complex and output
output_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
cmd.extend([
'-filter_complex', filter_complex,
'-map', '[aout]',
output_audio
])
# Run the command
logger.info(f"Combining {len(audio_files)} audio segments")
logger.debug(f"Running command: {' '.join(cmd)}")
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.error(f"Audio combination failed: {process.stderr}")
# Create a fallback silent audio
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
create_silent_audio(video_duration, silent_audio)
output_audio = silent_audio
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to clean up temp directory: {str(e)}")
logger.info(f"Successfully created translated audio: {output_audio}")
return output_audio
except Exception as e:
logger.error(f"Audio translation failed: {str(e)}", exc_info=True)
# Create an emergency fallback silent audio
try:
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav"
create_silent_audio(video_duration, silent_audio)
return silent_audio
except:
raise Exception(f"Audio translation failed: {str(e)}")
|