import os import time import tempfile import uuid import google.generativeai as genai import requests from flask import Flask, request, render_template, send_from_directory, url_for, flash, jsonify from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy.audio.io.AudioFileClip import AudioFileClip from werkzeug.utils import secure_filename from dotenv import load_dotenv import threading from datetime import datetime, timedelta # Initialize Flask app and load secrets load_dotenv() app = Flask(__name__) # Configuration GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TTS_API_URL = os.getenv("TTS_API_URL") if not GEMINI_API_KEY or not TTS_API_URL: raise ValueError("Missing required environment variables") genai.configure(api_key=GEMINI_API_KEY) # File storage setup UPLOAD_FOLDER = 'uploads' DOWNLOAD_FOLDER = 'downloads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB app.secret_key = os.urandom(24) # Processing status tracking processing_status = {} processing_times = { 'upload': 0, 'transcription': 0, 'tts': 0, 'dubbing': 0 } # Voice options VOICE_CHOICES = { "Male (Charon)": "Charon", "Female (Zephyr)": "Zephyr" } GEMINI_PROMPT = """ You are an expert AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. **CRITICAL INSTRUCTIONS:** 1. Combine all dialogue into one continuous script. 2. NO timestamps or speaker labels. 3. Add performance directions (e.g., `Say happily:`, `[laugh]`) directly in the text. """ def track_processing_time(task_id, stage, duration): """Track processing times for each stage""" processing_times[stage] = duration if task_id in processing_status: processing_status[task_id]['timings'][stage] = duration def estimate_remaining_time(task_id): """Estimate remaining processing time""" if task_id not in processing_status: return "Calculating..." status = processing_status[task_id] completed_stages = [s for s in status['timings'] if status['timings'][s] is not None] if len(completed_stages) == 0: return "Starting soon..." avg_time = sum(status['timings'][s] for s in completed_stages) / len(completed_stages) remaining_stages = 4 - len(completed_stages) # Total stages: upload, transcription, tts, dubbing return remaining_stages * avg_time def process_video_background(task_id, video_path, voice, cheerful): """Background processing function""" try: start_time = time.time() processing_status[task_id] = { 'status': 'processing', 'progress': 0, 'message': 'Starting transcription', 'timings': {'upload': None, 'transcription': None, 'tts': None, 'dubbing': None}, 'start_time': start_time } # Stage 1: Transcription processing_status[task_id]['message'] = 'Transcribing video content' script_start = time.time() script = generate_tamil_script(video_path) transcription_time = time.time() - script_start track_processing_time(task_id, 'transcription', transcription_time) processing_status[task_id]['progress'] = 25 # Stage 2: TTS Generation processing_status[task_id]['message'] = 'Generating audio narration' with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: audio_path = temp_audio.name tts_start = time.time() generate_audio_track(script, voice, cheerful, audio_path) tts_time = time.time() - tts_start track_processing_time(task_id, 'tts', tts_time) processing_status[task_id]['progress'] = 50 # Stage 3: Dubbing processing_status[task_id]['message'] = 'Creating dubbed video' final_filename = f"dubbed_{task_id}.mp4" final_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_filename) dubbing_start = time.time() replace_video_audio(video_path, audio_path, final_path) dubbing_time = time.time() - dubbing_start track_processing_time(task_id, 'dubbing', dubbing_time) processing_status[task_id]['progress'] = 75 # Cleanup os.unlink(audio_path) os.unlink(video_path) # Finalize processing_status[task_id].update({ 'status': 'complete', 'progress': 100, 'message': 'Processing complete', 'result_path': final_path, 'script': script, 'end_time': time.time() }) except Exception as e: processing_status[task_id].update({ 'status': 'error', 'message': f'Error: {str(e)}' }) raise def generate_tamil_script(video_path): """Generate Tamil script using Gemini""" video_file = genai.upload_file(video_path, mime_type="video/mp4") while video_file.state.name == "PROCESSING": time.sleep(5) video_file = genai.get_file(video_file.name) if video_file.state.name != "ACTIVE": raise Exception(f"Gemini processing failed: {video_file.state.name}") model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest") response = model.generate_content([GEMINI_PROMPT, video_file]) genai.delete_file(video_file.name) if hasattr(response, 'text') and response.text: return " ".join(response.text.strip().splitlines()) raise Exception("No valid script generated") def generate_audio_track(text, voice, cheerful, output_path): """Generate audio using TTS API""" payload = { "text": text, "voice_name": voice, "cheerful": cheerful } response = requests.post(TTS_API_URL, json=payload, timeout=300) if response.status_code != 200: raise Exception(f"TTS API error: {response.status_code}") with open(output_path, "wb") as f: f.write(response.content) def replace_video_audio(video_path, audio_path, output_path): """Replace video audio track""" video = AudioFileClip = None try: video = VideoFileClip(video_path) audio = AudioFileClip(audio_path) video.audio = audio video.write_videofile( output_path, codec="libx264", audio_codec="aac", logger=None, threads=4 ) finally: if video: video.close() if audio: audio.close() @app.route('/') def index(): """Main page""" return render_template('index.html', voices=VOICE_CHOICES) @app.route('/upload', methods=['POST']) def upload_video(): """Handle video upload and start processing""" if 'video' not in request.files: return jsonify({'error': 'No file uploaded'}), 400 file = request.files['video'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Generate unique task ID task_id = str(uuid.uuid4()) filename = secure_filename(f"{task_id}_{file.filename}") video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(video_path) # Get processing options voice = request.form.get('voice', 'Charon') cheerful = request.form.get('cheerful', 'false') == 'true' # Start background processing processing_status[task_id] = { 'status': 'uploaded', 'progress': 0, 'message': 'Starting processing', 'timings': {'upload': time.time(), 'transcription': None, 'tts': None, 'dubbing': None}, 'start_time': time.time() } thread = threading.Thread( target=process_video_background, args=(task_id, video_path, voice, cheerful) thread.start() return jsonify({'task_id': task_id}) @app.route('/status/') def get_status(task_id): """Check processing status""" if task_id not in processing_status: return jsonify({'error': 'Invalid task ID'}), 404 status = processing_status[task_id] # Calculate ETA if processing eta = None if status['status'] == 'processing': elapsed = time.time() - status['start_time'] remaining = estimate_remaining_time(task_id) if isinstance(remaining, (int, float)): eta = str(timedelta(seconds=int(remaining))) response = { 'status': status['status'], 'progress': status.get('progress', 0), 'message': status.get('message', ''), 'eta': eta } if status['status'] == 'complete': response['result_url'] = url_for('download', filename=os.path.basename(status['result_path'])) response['script'] = status.get('script', '') return jsonify(response) @app.route('/download/') def download(filename): """Serve processed video""" return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) if __name__ == '__main__': app.run(host="0.0.0.0", port=7860)