import os import time import tempfile import uuid import google.generativeai as genai import requests from flask import Flask, request, render_template, send_from_directory, url_for, flash from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.audio.AudioClip import concatenate_audioclips from werkzeug.utils import secure_filename from dotenv import load_dotenv # --- 1. INITIALIZE FLASK APP AND LOAD SECRETS --- load_dotenv() app = Flask(__name__) # Load secrets from environment variables GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TTS_API_URL = os.getenv("TTS_API_URL") TTS_MAX_TOKENS = 30000 # Conservative limit below 32k token threshold # Validate required configurations if not GEMINI_API_KEY: raise ValueError("SECURITY ERROR: GEMINI_API_KEY not found in .env file!") if not TTS_API_URL: raise ValueError("CONFIGURATION ERROR: TTS_API_URL not found in .env file!") # Configure Gemini AI genai.configure(api_key=GEMINI_API_KEY) # Configure directories UPLOAD_FOLDER = 'uploads' DOWNLOAD_FOLDER = 'downloads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB upload limit app.secret_key = os.urandom(24) # Secure key for flash messages # --- 2. APPLICATION CONFIGURATION --- VOICE_CHOICES = { "Male (Charon)": "Charon", "Female (Zephyr)": "Zephyr" } GEMINI_PROMPT = """ You are an expert AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. **CRITICAL INSTRUCTIONS:** 1. **Single Script:** Combine all dialogue from all speakers into one continuous script. 2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers. 3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text for an expressive narration. **EXAMPLE OUTPUT:** Say happily: வணக்கம்! [laugh] எப்படி இருக்கீங்க? Whisper mysteriously: அந்த ரகசியம் எனக்கு மட்டும் தான் தெரியும். """ # --- 3. CORE APPLICATION FUNCTIONS --- def generate_tamil_script(video_file_path): """Generates a Tamil script from the video using Gemini AI.""" print("Uploading video to Gemini for transcription...") video_file = genai.upload_file(video_file_path, mime_type="video/mp4") # Wait for file processing while video_file.state.name == "PROCESSING": time.sleep(5) video_file = genai.get_file(video_file.name) if video_file.state.name != "ACTIVE": raise Exception(f"Gemini file processing failed: {video_file.state.name}") print("Generating script...") model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest") response = model.generate_content([GEMINI_PROMPT, video_file]) genai.delete_file(video_file.name) if hasattr(response, 'text') and response.text: return " ".join(response.text.strip().splitlines()) raise Exception("No valid script was generated by Gemini.") def split_text_for_tts(text, max_tokens=TTS_MAX_TOKENS): """Splits text into chunks that fit within TTS token limits.""" words = text.split() chunks = [] current_chunk = [] current_length = 0 for word in words: word_length = len(word) + 1 # +1 for space if current_length + word_length > max_tokens: chunks.append(" ".join(current_chunk)) current_chunk = [word] current_length = word_length else: current_chunk.append(word) current_length += word_length if current_chunk: chunks.append(" ".join(current_chunk)) return chunks def generate_audio_with_retry(text_chunk, voice_name, is_cheerful, max_retries=3, retry_delay=2): """Generates audio with retry logic for API failures.""" for attempt in range(max_retries): try: payload = { "text": text_chunk, "voice_name": voice_name, "cheerful": is_cheerful } response = requests.post(TTS_API_URL, json=payload, timeout=300) response.raise_for_status() if response.status_code == 200: return response.content except requests.exceptions.RequestException as e: print(f"TTS API attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) # Exponential backoff else: raise Exception(f"TTS API failed after {max_retries} attempts: {str(e)}") def generate_long_audio(script_text, voice_name, is_cheerful, output_path): """Handles long audio generation by splitting text and combining results.""" print("Processing long audio generation...") text_chunks = split_text_for_tts(script_text) audio_clips = [] temp_files = [] try: for i, chunk in enumerate(text_chunks): print(f"Processing chunk {i+1}/{len(text_chunks)}") chunk_audio = generate_audio_with_retry(chunk, voice_name, is_cheerful) # Save chunk to temporary file temp_file = f"temp_chunk_{i}.wav" with open(temp_file, "wb") as f: f.write(chunk_audio) temp_files.append(temp_file) # Load audio clip audio_clip = AudioFileClip(temp_file) audio_clips.append(audio_clip) # Combine all audio clips print("Combining audio chunks...") final_audio = concatenate_audioclips(audio_clips) final_audio.write_audiofile(output_path) finally: # Clean up temporary files for temp_file in temp_files: if os.path.exists(temp_file): os.remove(temp_file) # Close audio clips for clip in audio_clips: clip.close() def replace_video_audio(video_path, new_audio_path, output_path): """Replaces the audio track of a video file.""" print("Replacing video audio...") video_clip = None audio_clip = None try: video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(new_audio_path) video_clip.audio = audio_clip video_clip.write_videofile( output_path, codec="libx264", audio_codec="aac", logger='bar' ) finally: if audio_clip: audio_clip.close() if video_clip: video_clip.close() # --- 4. FLASK ROUTES --- @app.route('/', methods=['GET']) def index(): """Render the main upload page.""" return render_template('index.html', voices=VOICE_CHOICES) @app.route('/process', methods=['POST']) def process_video(): """Handle video upload and processing.""" input_video_path = None temp_audio_path = None try: # Validate file upload if 'video' not in request.files or request.files['video'].filename == '': flash("Please upload a video file.", "error") return render_template('index.html', voices=VOICE_CHOICES) # Save uploaded file file = request.files['video'] filename = secure_filename(file.filename) input_video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(input_video_path) # Get processing options voice_choice = request.form.get('voice', 'Charon') is_cheerful = request.form.get('tone') == 'on' # Generate script script = generate_tamil_script(input_video_path) # Create temporary audio file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: temp_audio_path = temp_audio.name # Generate audio with retry and chunking generate_long_audio(script, voice_choice, is_cheerful, temp_audio_path) # Create dubbed video final_video_name = f"dubbed_{filename}" final_video_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_video_name) replace_video_audio(input_video_path, temp_audio_path, final_video_path) flash("Video processing complete!", "success") return render_template( 'index.html', voices=VOICE_CHOICES, result_video=url_for('serve_video', filename=final_video_name), script=script ) except Exception as e: print(f"Processing error: {str(e)}") flash(f"An error occurred: {str(e)}", "error") return render_template('index.html', voices=VOICE_CHOICES) finally: # Clean up temporary files if input_video_path and os.path.exists(input_video_path): os.remove(input_video_path) if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) @app.route('/downloads/') def serve_video(filename): """Serve the processed video file.""" return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) # --- 5. APPLICATION ENTRY POINT --- if __name__ == '__main__': app.run(host="0.0.0.0", port=7860)