import os import time import tempfile import uuid import google.generativeai as genai import requests from flask import Flask, request, render_template, send_from_directory, url_for, flash, jsonify from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.video.compositing.concatenate import concatenate_videoclips from moviepy.video.VideoClip import ImageClip from werkzeug.utils import secure_filename from dotenv import load_dotenv import threading from datetime import datetime, timedelta import logging import schedule import time as t # Initialize Flask app and load secrets load_dotenv() app = Flask(__name__) # Configuration GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TTS_API_URL = os.getenv("TTS_API_URL") if not GEMINI_API_KEY or not TTS_API_URL: raise ValueError("Missing required environment variables") genai.configure(api_key=GEMINI_API_KEY) # File storage setup UPLOAD_FOLDER = 'uploads' DOWNLOAD_FOLDER = 'downloads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB app.secret_key = os.urandom(24) # Processing status tracking processing_status = {} # Voice options VOICE_CHOICES = { "Male (Charon)": "Charon", "Female (Zephyr)": "Zephyr" } # Gemini Prompt GEMINI_PROMPT = """ You are an AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. **CRITICAL INSTRUCTIONS:** 1. **Single Script:** Combine all dialogue into one continuous script. 2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers. 3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text for an expressive narration. """ # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def get_video_duration(video_path): try: with VideoFileClip(video_path) as video: return video.duration except: return 0 def generate_tamil_script(video_path): max_retries = 3 retry_delay = 10 for attempt in range(max_retries): try: video_file = genai.upload_file(video_path, mime_type="video/mp4") start_wait = time.time() while video_file.state.name == "PROCESSING": if time.time() - start_wait > 300: raise TimeoutError("Gemini processing timed out") time.sleep(5) video_file = genai.get_file(video_file.name) if video_file.state.name != "ACTIVE": raise Exception(f"Gemini processing failed: {video_file.state.name}") model = genai.GenerativeModel(model_name="models/gemini-2.5-flash") response = model.generate_content([GEMINI_PROMPT, video_file]) genai.delete_file(video_file.name) if hasattr(response, 'text') and response.text: return " ".join(response.text.strip().splitlines()) raise Exception("No valid script generated") except Exception as e: if attempt < max_retries - 1: logger.warning(f"Gemini error (attempt {attempt+1}/{max_retries}): {str(e)}") time.sleep(retry_delay * (attempt + 1)) else: raise def generate_audio_track(text, voice, cheerful, output_path): max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: payload = {"text": text, "voice_name": voice, "cheerful": cheerful} response = requests.post(TTS_API_URL, json=payload, timeout=300) if response.status_code != 200: raise Exception(f"TTS API error: {response.status_code} - {response.text}") with open(output_path, "wb") as f: f.write(response.content) return except Exception as e: if attempt < max_retries - 1: logger.warning(f"TTS error (attempt {attempt+1}/{max_retries}): {str(e)}") time.sleep(retry_delay * (attempt + 1)) else: raise def replace_video_audio(video_path, audio_path, output_path): """Replace video audio track, extending video if audio is longer""" video = None audio = None try: video = VideoFileClip(video_path) audio = AudioFileClip(audio_path) # If audio is longer, extend video with last frame if audio.duration > video.duration: freeze_duration = audio.duration - video.duration last_frame = video.to_ImageClip(t=video.duration - 0.04) last_frame = last_frame.set_duration(freeze_duration) final_video = concatenate_videoclips([video, last_frame]) else: final_video = video final_video = final_video.set_audio(audio) final_video.write_videofile( output_path, codec="libx264", audio_codec="aac", logger=None, threads=4, preset='medium', ffmpeg_params=['-crf', '23', '-movflags', '+faststart'] ) except Exception as e: logger.error(f"Video processing error: {str(e)}") if os.path.exists(output_path): os.unlink(output_path) raise finally: if video: video.close() if audio: audio.close() def process_video_background(task_id, video_path, voice, cheerful): try: processing_status[task_id] = { 'status': 'processing', 'progress': 0, 'message': 'Starting transcription', 'start_time': time.time(), 'video_duration': get_video_duration(video_path) } script = generate_tamil_script(video_path) processing_status[task_id]['progress'] = 25 processing_status[task_id]['script'] = script with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: audio_path = temp_audio.name generate_audio_track(script, voice, cheerful, audio_path) processing_status[task_id]['progress'] = 50 final_filename = f"dubbed_{task_id}.mp4" final_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_filename) replace_video_audio(video_path, audio_path, final_path) processing_status[task_id]['progress'] = 100 processing_status[task_id].update({ 'status': 'complete', 'result_path': final_path, 'message': 'Processing complete' }) os.unlink(audio_path) except Exception as e: logger.error(f"Task {task_id} failed: {str(e)}") processing_status[task_id].update({ 'status': 'error', 'message': str(e) }) if 'audio_path' in locals() and os.path.exists(audio_path): os.unlink(audio_path) @app.route('/') def index(): return render_template('index.html', voices=VOICE_CHOICES) @app.route('/upload', methods=['POST']) def upload_video(): if 'video' not in request.files: return jsonify({'error': 'No file uploaded'}), 400 file = request.files['video'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 task_id = str(uuid.uuid4()) filename = secure_filename(f"{task_id}_{file.filename}") video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(video_path) voice = request.form.get('voice', 'Charon') cheerful = request.form.get('cheerful', 'false') == 'true' processing_status[task_id] = { 'status': 'uploaded', 'progress': 0, 'message': 'Starting processing', 'start_time': time.time(), 'video_duration': get_video_duration(video_path) } thread = threading.Thread(target=process_video_background, args=(task_id, video_path, voice, cheerful)) thread.start() return jsonify({'task_id': task_id, 'video_duration': processing_status[task_id]['video_duration']}) @app.route('/status/') def get_status(task_id): if task_id not in processing_status: return jsonify({'error': 'Invalid task ID'}), 404 status = processing_status[task_id] response = { 'status': status['status'], 'progress': status.get('progress', 0), 'message': status.get('message', '') } if status['status'] == 'complete': response['result_url'] = url_for('download', filename=os.path.basename(status['result_path'])) response['script'] = status.get('script', '') return jsonify(response) @app.route('/download/') def download(filename): return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) @app.route('/cleanup', methods=['POST']) def cleanup(): try: for folder, age_limit in [(UPLOAD_FOLDER, 3600), (DOWNLOAD_FOLDER, 86400)]: for filename in os.listdir(folder): file_path = os.path.join(folder, filename) if os.path.getmtime(file_path) < time.time() - age_limit: os.unlink(file_path) return jsonify({'status': 'success'}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 def cleanup_job(): with app.app_context(): app.test_client().post('/cleanup') schedule.every().hour.do(cleanup_job) def scheduler_thread(): while True: schedule.run_pending() t.sleep(1) threading.Thread(target=scheduler_thread, daemon=True).start() if __name__ == '__main__': app.run(host="0.0.0.0", port=7860, threaded=True)