Translate / app.py
Athspi's picture
Update app.py
c901468 verified
raw
history blame
9.12 kB
import os
import time
import tempfile
import uuid
import google.generativeai as genai
import requests
from flask import Flask, request, render_template, send_from_directory, url_for, flash, jsonify
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.fx.all import resize, speedx
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
from PIL import Image, ImageDraw, ImageFont
import numpy as np
# --- 1. INITIALIZE FLASK APP AND LOAD SECRETS ---
load_dotenv()
app = Flask(__name__)
# Load secrets from environment variables
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TTS_API_URL = os.getenv("TTS_API_URL")
# Validate required configurations
if not GEMINI_API_KEY:
raise ValueError("SECURITY ERROR: GEMINI_API_KEY not found in .env file!")
if not TTS_API_URL:
raise ValueError("CONFIGURATION ERROR: TTS_API_URL not found in .env file!")
# Configure Gemini AI
genai.configure(api_key=GEMINI_API_KEY)
# Configure directories
UPLOAD_FOLDER = 'uploads'
DOWNLOAD_FOLDER = 'downloads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB upload limit
app.secret_key = os.urandom(24) # Secure key for flash messages
# --- 2. APPLICATION CONFIGURATION ---
VOICE_CHOICES = {
"Male (Charon)": "Charon",
"Female (Zephyr)": "Zephyr"
}
EDITING_PRESETS = {
"fast_cuts": {
"speed": 1.2,
"transition_duration": 0.3,
"max_clip_duration": 5
},
"cinematic": {
"speed": 0.95,
"transition_duration": 1.0,
"black_bars": True
},
"social_media": {
"speed": 1.0,
"aspect_ratio": (9, 16),
"add_captions": True
}
}
GEMINI_PROMPT = """
You are an expert AI scriptwriter. Your task is to watch the provided video and:
1. Transcribe ALL spoken dialogue into modern, colloquial Tamil
2. Identify key moments for editing (action, emotion, important points)
3. Suggest timestamps for cuts/transitions
**OUTPUT FORMAT:**
{
"script": "Combined Tamil dialogue with performance cues",
"editing_notes": [
{"timestamp": 12.5, "type": "cut", "reason": "action moment"},
{"timestamp": 24.3, "type": "slow_mo", "reason": "emotional highlight"}
]
}
"""
# --- 3. CORE APPLICATION FUNCTIONS ---
def analyze_video(video_path):
"""Analyze video content and generate script with editing suggestions."""
print("Analyzing video with Gemini...")
video_file = genai.upload_file(video_path, mime_type="video/mp4")
# Wait for file processing
while video_file.state.name == "PROCESSING":
time.sleep(5)
video_file = genai.get_file(video_file.name)
if video_file.state.name != "ACTIVE":
raise Exception(f"Gemini file processing failed: {video_file.state.name}")
model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest")
response = model.generate_content([GEMINI_PROMPT, video_file])
genai.delete_file(video_file.name)
if hasattr(response, 'text') and response.text:
try:
return eval(response.text) # Convert string to dict
except:
return {"script": response.text, "editing_notes": []}
raise Exception("No valid analysis was generated by Gemini.")
def generate_audio(script_text, voice_name, is_cheerful):
"""Generate audio from script using TTS API."""
print(f"Generating audio (Voice: {voice_name}, Cheerful: {is_cheerful})")
payload = {
"text": script_text,
"voice_name": voice_name,
"cheerful": is_cheerful
}
response = requests.post(TTS_API_URL, json=payload, timeout=300)
if response.status_code == 200:
return response.content
raise Exception(f"TTS API Error: {response.status_code} - {response.text}")
def apply_editing(video_path, audio_data, editing_notes, preset_name):
"""Apply editing effects to video based on analysis and preset."""
print(f"Applying {preset_name} editing preset...")
preset = EDITING_PRESETS[preset_name]
# Save audio to temp file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio:
temp_audio.write(audio_data)
temp_audio_path = temp_audio.name
# Load video and audio
video = VideoFileClip(video_path)
audio = AudioFileClip(temp_audio_path)
# Apply basic preset effects
if preset.get('speed'):
video = video.fx(speedx, preset['speed'])
# Apply black bars for cinematic
if preset.get('black_bars'):
def add_black_bars(get_frame, t):
frame = get_frame(t)
height, width = frame.shape[:2]
new_height = int(height * 0.85)
bar_size = (height - new_height) // 2
# Create black image
black_bar = np.zeros((bar_size, width, 3), dtype=np.uint8)
processed_frame = np.vstack([black_bar, frame, black_bar])
return processed_frame
video = video.fl(add_black_bars)
# Apply editing notes
clips = []
current_start = 0
for note in editing_notes:
if current_start >= note['timestamp']:
continue
clip = video.subclip(current_start, note['timestamp'])
# Apply effect based on note type
if note['type'] == 'slow_mo':
clip = clip.fx(speedx, 0.5)
elif note['type'] == 'fast_cut':
clip = clip.fx(speedx, 1.5)
clips.append(clip)
current_start = note['timestamp']
# Add remaining video
if current_start < video.duration:
clips.append(video.subclip(current_start))
# Concatenate all clips
final_video = concatenate_videoclips(clips)
final_video = final_video.set_audio(audio)
# Apply aspect ratio if specified
if preset.get('aspect_ratio'):
target_ratio = preset['aspect_ratio']
final_video = final_video.resize(height=target_ratio[1])
# Generate output path
output_path = os.path.join(app.config['DOWNLOAD_FOLDER'], f"edited_{os.path.basename(video_path)}")
final_video.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
threads=4,
preset='fast'
)
# Cleanup
video.close()
audio.close()
os.unlink(temp_audio_path)
return output_path
# --- 4. FLASK ROUTES ---
@app.route('/', methods=['GET'])
def index():
"""Render the main upload page."""
return render_template('index.html', voices=VOICE_CHOICES, presets=EDITING_PRESETS.keys())
@app.route('/process', methods=['POST'])
def process_video():
"""Handle video upload and processing."""
input_video_path = None
try:
# Validate file upload
if 'video' not in request.files or request.files['video'].filename == '':
flash("Please upload a video file.", "error")
return render_template('index.html',
voices=VOICE_CHOICES,
presets=EDITING_PRESETS.keys())
# Save uploaded file
file = request.files['video']
filename = secure_filename(file.filename)
input_video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(input_video_path)
# Get processing options
voice_choice = request.form.get('voice', 'Charon')
is_cheerful = request.form.get('tone') == 'on'
preset_name = request.form.get('preset', 'fast_cuts')
# Analyze video
analysis = analyze_video(input_video_path)
script = analysis.get('script', '')
editing_notes = analysis.get('editing_notes', [])
# Generate audio
audio_data = generate_audio(script, voice_choice, is_cheerful)
# Apply editing and generate final video
final_video_path = apply_editing(input_video_path, audio_data, editing_notes, preset_name)
return jsonify({
'status': 'success',
'video_url': url_for('serve_video', filename=os.path.basename(final_video_path)),
'script': script
})
except Exception as e:
print(f"Processing error: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
finally:
# Clean up uploaded file
if input_video_path and os.path.exists(input_video_path):
os.remove(input_video_path)
@app.route('/downloads/<filename>')
def serve_video(filename):
"""Serve the processed video file."""
return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename)
# --- 5. APPLICATION ENTRY POINT ---
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860)