import os import time import tempfile import uuid import google.generativeai as genai import requests from flask import Flask, request, render_template, send_from_directory, url_for, flash, jsonify from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip from moviepy.video.fx.all import resize, speedx from werkzeug.utils import secure_filename from dotenv import load_dotenv from PIL import Image, ImageDraw, ImageFont import numpy as np # --- 1. INITIALIZE FLASK APP AND LOAD SECRETS --- load_dotenv() app = Flask(__name__) # Load secrets from environment variables GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TTS_API_URL = os.getenv("TTS_API_URL") # Validate required configurations if not GEMINI_API_KEY: raise ValueError("SECURITY ERROR: GEMINI_API_KEY not found in .env file!") if not TTS_API_URL: raise ValueError("CONFIGURATION ERROR: TTS_API_URL not found in .env file!") # Configure Gemini AI genai.configure(api_key=GEMINI_API_KEY) # Configure directories UPLOAD_FOLDER = 'uploads' DOWNLOAD_FOLDER = 'downloads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB upload limit app.secret_key = os.urandom(24) # Secure key for flash messages # --- 2. APPLICATION CONFIGURATION --- VOICE_CHOICES = { "Male (Charon)": "Charon", "Female (Zephyr)": "Zephyr" } EDITING_PRESETS = { "fast_cuts": { "speed": 1.2, "transition_duration": 0.3, "max_clip_duration": 5 }, "cinematic": { "speed": 0.95, "transition_duration": 1.0, "black_bars": True }, "social_media": { "speed": 1.0, "aspect_ratio": (9, 16), "add_captions": True } } GEMINI_PROMPT = """ You are an expert AI scriptwriter. Your task is to watch the provided video and: 1. Transcribe ALL spoken dialogue into modern, colloquial Tamil 2. Identify key moments for editing (action, emotion, important points) 3. Suggest timestamps for cuts/transitions **OUTPUT FORMAT:** { "script": "Combined Tamil dialogue with performance cues", "editing_notes": [ {"timestamp": 12.5, "type": "cut", "reason": "action moment"}, {"timestamp": 24.3, "type": "slow_mo", "reason": "emotional highlight"} ] } """ # --- 3. CORE APPLICATION FUNCTIONS --- def analyze_video(video_path): """Analyze video content and generate script with editing suggestions.""" print("Analyzing video with Gemini...") video_file = genai.upload_file(video_path, mime_type="video/mp4") # Wait for file processing while video_file.state.name == "PROCESSING": time.sleep(5) video_file = genai.get_file(video_file.name) if video_file.state.name != "ACTIVE": raise Exception(f"Gemini file processing failed: {video_file.state.name}") model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest") response = model.generate_content([GEMINI_PROMPT, video_file]) genai.delete_file(video_file.name) if hasattr(response, 'text') and response.text: try: return eval(response.text) # Convert string to dict except: return {"script": response.text, "editing_notes": []} raise Exception("No valid analysis was generated by Gemini.") def generate_audio(script_text, voice_name, is_cheerful): """Generate audio from script using TTS API.""" print(f"Generating audio (Voice: {voice_name}, Cheerful: {is_cheerful})") payload = { "text": script_text, "voice_name": voice_name, "cheerful": is_cheerful } response = requests.post(TTS_API_URL, json=payload, timeout=300) if response.status_code == 200: return response.content raise Exception(f"TTS API Error: {response.status_code} - {response.text}") def apply_editing(video_path, audio_data, editing_notes, preset_name): """Apply editing effects to video based on analysis and preset.""" print(f"Applying {preset_name} editing preset...") preset = EDITING_PRESETS[preset_name] # Save audio to temp file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: temp_audio.write(audio_data) temp_audio_path = temp_audio.name # Load video and audio video = VideoFileClip(video_path) audio = AudioFileClip(temp_audio_path) # Apply basic preset effects if preset.get('speed'): video = video.fx(speedx, preset['speed']) # Apply black bars for cinematic if preset.get('black_bars'): def add_black_bars(get_frame, t): frame = get_frame(t) height, width = frame.shape[:2] new_height = int(height * 0.85) bar_size = (height - new_height) // 2 # Create black image black_bar = np.zeros((bar_size, width, 3), dtype=np.uint8) processed_frame = np.vstack([black_bar, frame, black_bar]) return processed_frame video = video.fl(add_black_bars) # Apply editing notes clips = [] current_start = 0 for note in editing_notes: if current_start >= note['timestamp']: continue clip = video.subclip(current_start, note['timestamp']) # Apply effect based on note type if note['type'] == 'slow_mo': clip = clip.fx(speedx, 0.5) elif note['type'] == 'fast_cut': clip = clip.fx(speedx, 1.5) clips.append(clip) current_start = note['timestamp'] # Add remaining video if current_start < video.duration: clips.append(video.subclip(current_start)) # Concatenate all clips final_video = concatenate_videoclips(clips) final_video = final_video.set_audio(audio) # Apply aspect ratio if specified if preset.get('aspect_ratio'): target_ratio = preset['aspect_ratio'] final_video = final_video.resize(height=target_ratio[1]) # Generate output path output_path = os.path.join(app.config['DOWNLOAD_FOLDER'], f"edited_{os.path.basename(video_path)}") final_video.write_videofile( output_path, codec="libx264", audio_codec="aac", threads=4, preset='fast' ) # Cleanup video.close() audio.close() os.unlink(temp_audio_path) return output_path # --- 4. FLASK ROUTES --- @app.route('/', methods=['GET']) def index(): """Render the main upload page.""" return render_template('index.html', voices=VOICE_CHOICES, presets=EDITING_PRESETS.keys()) @app.route('/process', methods=['POST']) def process_video(): """Handle video upload and processing.""" input_video_path = None try: # Validate file upload if 'video' not in request.files or request.files['video'].filename == '': flash("Please upload a video file.", "error") return render_template('index.html', voices=VOICE_CHOICES, presets=EDITING_PRESETS.keys()) # Save uploaded file file = request.files['video'] filename = secure_filename(file.filename) input_video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(input_video_path) # Get processing options voice_choice = request.form.get('voice', 'Charon') is_cheerful = request.form.get('tone') == 'on' preset_name = request.form.get('preset', 'fast_cuts') # Analyze video analysis = analyze_video(input_video_path) script = analysis.get('script', '') editing_notes = analysis.get('editing_notes', []) # Generate audio audio_data = generate_audio(script, voice_choice, is_cheerful) # Apply editing and generate final video final_video_path = apply_editing(input_video_path, audio_data, editing_notes, preset_name) return jsonify({ 'status': 'success', 'video_url': url_for('serve_video', filename=os.path.basename(final_video_path)), 'script': script }) except Exception as e: print(f"Processing error: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 finally: # Clean up uploaded file if input_video_path and os.path.exists(input_video_path): os.remove(input_video_path) @app.route('/downloads/') def serve_video(filename): """Serve the processed video file.""" return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) # --- 5. APPLICATION ENTRY POINT --- if __name__ == '__main__': app.run(host="0.0.0.0", port=7860)