Spaces:
Running
Running
""" | |
Text-to-speech audio generation for translated subtitles. | |
""" | |
import os | |
import time | |
import shutil | |
import tempfile | |
from pathlib import Path | |
from tqdm import tqdm | |
import subprocess | |
from gtts import gTTS | |
import pysrt | |
from src.utils.logger import get_logger | |
from src.audio.extractor import create_silent_audio | |
from config import OUTPUT_DIR, TTS_VOICES, MAX_RETRY_ATTEMPTS | |
logger = get_logger(__name__) | |
def generate_translated_audio(srt_path, target_lang, video_duration=180): | |
""" | |
Generate translated audio using text-to-speech for each subtitle. | |
Args: | |
srt_path (str): Path to the SRT subtitle file | |
target_lang (str): Target language code (e.g., 'en', 'es') | |
video_duration (float): Duration of the original video in seconds | |
Returns: | |
Path: Path to the translated audio file | |
Raises: | |
Exception: If audio generation fails | |
""" | |
try: | |
srt_path = Path(srt_path) | |
logger.info(f"Generating translated audio for {target_lang} from {srt_path}") | |
# Load subtitles | |
subs = pysrt.open(srt_path, encoding="utf-8") | |
logger.info(f"Loaded {len(subs)} subtitles from SRT file") | |
# Create temporary directory for audio chunks | |
temp_dir = Path(tempfile.mkdtemp(prefix=f"audio_{target_lang}_", dir=OUTPUT_DIR / "temp")) | |
logger.debug(f"Created temporary directory: {temp_dir}") | |
# Generate TTS for each subtitle | |
audio_files = [] | |
timings = [] | |
logger.info(f"Generating speech for {len(subs)} subtitles in {target_lang}") | |
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")): | |
text = sub.text.strip() | |
if not text: | |
continue | |
# Get timing information | |
start_time = (sub.start.hours * 3600 + | |
sub.start.minutes * 60 + | |
sub.start.seconds + | |
sub.start.milliseconds / 1000) | |
end_time = (sub.end.hours * 3600 + | |
sub.end.minutes * 60 + | |
sub.end.seconds + | |
sub.end.milliseconds / 1000) | |
duration = end_time - start_time | |
# Generate TTS audio | |
tts_lang = TTS_VOICES.get(target_lang, target_lang) | |
audio_file = temp_dir / f"chunk_{i:04d}.mp3" | |
# Add a retry mechanism | |
retry_count = 0 | |
while retry_count < MAX_RETRY_ATTEMPTS: | |
try: | |
# For certain languages, use slower speed which might improve reliability | |
slow_option = target_lang in ["hi", "ja", "zh-CN", "ar"] | |
tts = gTTS(text=text, lang=target_lang, slow=slow_option) | |
tts.save(str(audio_file)) | |
if audio_file.exists() and audio_file.stat().st_size > 0: | |
break | |
else: | |
raise Exception("Generated audio file is empty") | |
except Exception as e: | |
retry_count += 1 | |
logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}") | |
time.sleep(1) # Wait before retrying | |
# If still failing after retries, try with shorter text | |
if retry_count == MAX_RETRY_ATTEMPTS - 1 and len(text) > 100: | |
logger.warning(f"Trying with shortened text for {target_lang}") | |
shortened_text = text[:100] + "..." | |
tts = gTTS(text=shortened_text, lang=target_lang, slow=True) | |
tts.save(str(audio_file)) | |
if audio_file.exists() and audio_file.stat().st_size > 0: | |
audio_files.append(audio_file) | |
timings.append((start_time, end_time, duration, audio_file)) | |
else: | |
logger.warning(f"Failed to generate audio for subtitle {i}") | |
# Check if we generated any audio files | |
if not audio_files: | |
logger.warning(f"No audio files were generated for {target_lang}") | |
# Create a silent audio file as fallback | |
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
create_silent_audio(video_duration, silent_audio) | |
return silent_audio | |
# Create a silent audio track as base | |
silence_file = temp_dir / "silence.wav" | |
create_silent_audio(video_duration, silence_file) | |
# Create filter complex for audio mixing | |
filter_complex = [] | |
input_count = 1 # Starting with 1 because 0 is the silence track | |
# Start with silent track | |
filter_parts = ["[0:a]"] | |
# Add each audio segment | |
for start_time, end_time, duration, audio_file in timings: | |
delay_ms = int(start_time * 1000) | |
filter_parts.append(f"[{input_count}:a]adelay={delay_ms}|{delay_ms}") | |
input_count += 1 | |
# Mix all audio tracks | |
filter_parts.append(f"amix=inputs={input_count}:dropout_transition=0:normalize=0[aout]") | |
filter_complex = ";".join(filter_parts) | |
# Build the ffmpeg command | |
cmd = ['ffmpeg', '-y'] | |
# Add silent base track | |
cmd.extend(['-i', str(silence_file)]) | |
# Add all audio chunks | |
for audio_file in audio_files: | |
cmd.extend(['-i', str(audio_file)]) | |
# Add filter complex and output | |
output_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
cmd.extend([ | |
'-filter_complex', filter_complex, | |
'-map', '[aout]', | |
output_audio | |
]) | |
# Run the command | |
logger.info(f"Combining {len(audio_files)} audio segments") | |
logger.debug(f"Running command: {' '.join(cmd)}") | |
process = subprocess.run(cmd, capture_output=True, text=True) | |
if process.returncode != 0: | |
logger.error(f"Audio combination failed: {process.stderr}") | |
# Create a fallback silent audio | |
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
create_silent_audio(video_duration, silent_audio) | |
output_audio = silent_audio | |
# Clean up temporary files | |
try: | |
shutil.rmtree(temp_dir) | |
logger.debug(f"Cleaned up temporary directory: {temp_dir}") | |
except Exception as e: | |
logger.warning(f"Failed to clean up temp directory: {str(e)}") | |
logger.info(f"Successfully created translated audio: {output_audio}") | |
return output_audio | |
except Exception as e: | |
logger.error(f"Audio translation failed: {str(e)}", exc_info=True) | |
# Create an emergency fallback silent audio | |
try: | |
silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
create_silent_audio(video_duration, silent_audio) | |
return silent_audio | |
except: | |
raise Exception(f"Audio translation failed: {str(e)}") | |