import os import time import tempfile import uuid import google.generativeai as genai import requests import yt_dlp from flask import Flask, request, render_template, send_from_directory, url_for, flash from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip from werkzeug.utils import secure_filename from dotenv import load_dotenv # --- 1. INITIALIZE FLASK APP AND LOAD SECRETS --- load_dotenv() app = Flask(__name__) GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TTS_API_URL = os.getenv("TTS_API_URL") if not GEMINI_API_KEY: raise ValueError("SECURITY ERROR: GEMINI_API_KEY not found in .env file!") if not TTS_API_URL: raise ValueError("CONFIGURATION ERROR: TTS_API_URL not found in .env file!") genai.configure(api_key=GEMINI_API_KEY) UPLOAD_FOLDER = 'uploads' DOWNLOAD_FOLDER = 'downloads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 app.secret_key = os.urandom(24) # --- 2. VOICE CHOICES & GEMINI PROMPT --- VOICE_CHOICES = {"Male (Charon)": "Charon", "Female (Zephyr)": "Zephyr"} GEMINI_PROMPT = """ You are an AI scriptwriter... (The full prompt remains the same as before) """ # --- 3. CORE LOGIC HELPER FUNCTIONS --- def download_youtube_video(url, output_folder): """Downloads a YouTube video to a specified folder using the yt-dlp library.""" print(f"📥 Downloading video from YouTube URL: {url}") # Generate a unique filename to avoid collisions unique_filename = f"{uuid.uuid4()}.mp4" output_path_template = os.path.join(output_folder, os.path.splitext(unique_filename)[0]) ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': output_path_template, 'merge_output_format': 'mp4', 'quiet': True, 'noplaylist': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # The actual filename will have a .mp4 extension downloaded_file_path = f"{output_path_template}.mp4" if os.path.exists(downloaded_file_path): print(f"✅ Download complete! File saved to: {downloaded_file_path}") return downloaded_file_path else: # Fallback for some edge cases where the extension might differ for f in os.listdir(output_folder): if f.startswith(os.path.splitext(unique_filename)[0]): return os.path.join(output_folder, f) raise FileNotFoundError("Downloaded video file not found.") except Exception as e: print(f"❌ An error occurred during YouTube download: {e}") raise # Re-raise the exception to be caught by the main processing block def generate_tamil_script(video_file_path): # (This function remains the same as the previous version) print("Uploading file to Gemini for transcription...") video_file = genai.upload_file(video_file_path, mime_type="video/mp4") print("Waiting for Gemini file processing...") while video_file.state.name == "PROCESSING": time.sleep(5) video_file = genai.get_file(video_file.name) if video_file.state.name != "ACTIVE": raise Exception(f"Gemini file processing failed: {video_file.state.name}") print("Generating narrator script...") model = genai.GenerativeModel(model_name="models/gemini-2.5-flash") response = model.generate_content([GEMINI_PROMPT, video_file]) genai.delete_file(video_file.name) if hasattr(response, 'text') and response.text: return " ".join(response.text.strip().splitlines()) raise Exception("No valid script was generated by Gemini.") def generate_single_audio_track(dialogue_text, voice_name, is_cheerful, output_path): # (This function remains the same) print(f"Requesting audio from TTS API (Voice: {voice_name}, Cheerful: {is_cheerful})") payload = {"text": dialogue_text, "voice_name": voice_name, "cheerful": is_cheerful} response = requests.post(TTS_API_URL, json=payload, timeout=300) if response.status_code == 200: with open(output_path, "wb") as f: f.write(response.content) return True raise Exception(f"Error from TTS API: {response.status_code} - {response.text}") def replace_video_audio(video_path, new_audio_path, output_path): # (This function remains the same, using the correct .audio attribute) print("Replacing video audio using MoviePy...") video_clip = None audio_clip = None try: video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(new_audio_path) video_clip.audio = audio_clip video_clip.write_videofile(output_path, codec="libx264", audio_codec="aac", logger='bar') finally: if audio_clip: audio_clip.close() if video_clip: video_clip.close() # --- 4. FLASK WEB ROUTES --- @app.route('/', methods=['GET']) def index(): """Renders the main upload page.""" return render_template('index.html') @app.route('/process', methods=['POST']) def process_video(): """Handles video input (upload or URL), processing, and renders the result.""" input_video_path = None temp_audio_path = None try: # Determine input source: YouTube URL or File Upload youtube_url = request.form.get('youtube_url', '').strip() if youtube_url: input_video_path = download_youtube_video(youtube_url, app.config['UPLOAD_FOLDER']) # Generate a secure filename based on the unique downloaded file name filename = os.path.basename(input_video_path) elif 'video' in request.files and request.files['video'].filename != '': file = request.files['video'] filename = secure_filename(file.filename) input_video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(input_video_path) else: flash("Please either upload a video file or provide a YouTube URL.") return render_template('index.html') # Continue with the processing pipeline voice_choice = request.form['voice_choice'] is_cheerful = 'cheerful' in request.form voice_name = VOICE_CHOICES[voice_choice] script = generate_tamil_script(input_video_path) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: temp_audio_path = temp_audio.name generate_single_audio_track(script, voice_name, is_cheerful, temp_audio_path) final_video_name = f"dubbed_{filename}" final_video_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_video_name) replace_video_audio(input_video_path, temp_audio_path, final_video_path) return render_template('index.html', result_video=url_for('serve_video', filename=final_video_name), script=script) except Exception as e: print(f"An error occurred during processing: {e}") flash(f"An unexpected error occurred: {e}. Please check the console and try again.") return render_template('index.html') finally: # Clean up all temporary files if input_video_path and os.path.exists(input_video_path): os.remove(input_video_path) if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) @app.route('/downloads/') def serve_video(filename): """Serves the final dubbed video.""" return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) # --- 5. APPLICATION ENTRY POINT --- if __name__ == '__main__': app.run(host="0.0.0.0", port=7860)