""" Text-to-speech audio generation for translated subtitles. """ import os import time import shutil import tempfile from pathlib import Path from tqdm import tqdm import subprocess from gtts import gTTS import pysrt from src.utils.logger import get_logger from src.audio.extractor import create_silent_audio from config import OUTPUT_DIR, TTS_VOICES, MAX_RETRY_ATTEMPTS logger = get_logger(__name__) def generate_translated_audio(srt_path, target_lang, video_duration=180): """ Generate translated audio using text-to-speech for each subtitle. Args: srt_path (str): Path to the SRT subtitle file target_lang (str): Target language code (e.g., 'en', 'es') video_duration (float): Duration of the original video in seconds Returns: Path: Path to the translated audio file Raises: Exception: If audio generation fails """ try: srt_path = Path(srt_path) logger.info(f"Generating translated audio for {target_lang} from {srt_path}") # Load subtitles subs = pysrt.open(srt_path, encoding="utf-8") logger.info(f"Loaded {len(subs)} subtitles from SRT file") # Create temporary directory for audio chunks temp_dir = Path(tempfile.mkdtemp(prefix=f"audio_{target_lang}_", dir=OUTPUT_DIR / "temp")) logger.debug(f"Created temporary directory: {temp_dir}") # Generate TTS for each subtitle audio_files = [] timings = [] logger.info(f"Generating speech for {len(subs)} subtitles in {target_lang}") for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")): text = sub.text.strip() if not text: continue # Get timing information start_time = (sub.start.hours * 3600 + sub.start.minutes * 60 + sub.start.seconds + sub.start.milliseconds / 1000) end_time = (sub.end.hours * 3600 + sub.end.minutes * 60 + sub.end.seconds + sub.end.milliseconds / 1000) duration = end_time - start_time # Generate TTS audio tts_lang = TTS_VOICES.get(target_lang, target_lang) audio_file = temp_dir / f"chunk_{i:04d}.mp3" # Add a retry mechanism retry_count = 0 while retry_count < MAX_RETRY_ATTEMPTS: try: # For certain languages, use slower speed slow_option = target_lang in ["hi", "ja", "zh-CN", "ar"] tts = gTTS(text=text, lang=target_lang, slow=slow_option) tts.save(str(audio_file)) logger.info(f"Generated TTS file size for chunk {i}: {audio_file.stat().st_size} bytes") if audio_file.exists() and audio_file.stat().st_size > 0: break else: raise Exception("Generated audio file is empty") except Exception as e: retry_count += 1 logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}") time.sleep(1) # Fallback to shortened text if retry_count == MAX_RETRY_ATTEMPTS - 1 and len(text) > 100: logger.warning(f"Trying with shortened text for {target_lang}") shortened_text = text[:100] + "..." tts = gTTS(text=shortened_text, lang=target_lang, slow=True) tts.save(str(audio_file)) if audio_file.exists() and audio_file.stat().st_size > 0: audio_files.append(audio_file) timings.append((start_time, end_time, duration, audio_file)) else: logger.warning(f"Failed to generate audio for subtitle {i}") # Fallback if no audio generated if not audio_files: logger.warning(f"No audio files generated for {target_lang}") silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" create_silent_audio(video_duration, silent_audio) return silent_audio # Output configuration output_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.mp3" silence_file = temp_dir / "silence.wav" create_silent_audio(video_duration, silence_file) # Validate input files for f in [silence_file, *audio_files]: if not f.exists(): logger.error(f"Missing input file: {f}") return create_silent_audio(video_duration, output_audio) # Build FFmpeg command with volume boost and timing cmd = ['ffmpeg', '-y'] cmd += ['-i', str(silence_file)] # Add all audio chunks as inputs for audio_file in audio_files: cmd += ['-i', str(audio_file)] # Create filter chain for each audio chunk filter_chains = [] for i, (start_time, _, _, _) in enumerate(timings): delay_ms = int(start_time * 1000) filter_chains.append( f"[{i+1}:a]volume=12dB,adelay={delay_ms}|{delay_ms},apad=whole_dur={video_duration}[a{i}]" ) # Mix all audio streams with normalization mix_inputs = ''.join([f"[a{i}]" for i in range(len(timings))]) filter_complex = ";".join(filter_chains) + \ f";{mix_inputs}amix=inputs={len(timings)}:duration=longest:normalize=0,volume=3dB[aout]" cmd += [ '-filter_complex', filter_complex, '-map', '[aout]', '-c:a', 'libmp3lame', # Changed to MP3 codec '-b:a', '192k', str(output_audio) ] logger.debug(f"Running FFmpeg command: {' '.join(cmd)}") # Execute audio mixing process = subprocess.run(cmd, capture_output=True, text=True) if process.returncode != 0: logger.error(f"Audio mixing failed: {process.stderr}") silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" create_silent_audio(video_duration, silent_audio) return silent_audio logger.info(f"Final audio file size: {output_audio.stat().st_size} bytes") # Cleanup temporary files try: shutil.rmtree(temp_dir) logger.debug(f"Cleaned temporary directory: {temp_dir}") except Exception as e: logger.warning(f"Temp cleanup failed: {str(e)}") return output_audio except Exception as e: logger.error(f"Audio generation failed: {str(e)}", exc_info=True) silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" create_silent_audio(video_duration, silent_audio) return silent_audio