File size: 6,648 Bytes
d24a2f3 8a409a5 d24a2f3 8a409a5 5c69bfc 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 d24a2f3 8a409a5 4ae7788 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import os
import tempfile
import google.generativeai as genai
import requests
from flask import Flask, request, render_template, send_from_directory, url_for
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from pydub import AudioSegment
from werkzeug.utils import secure_filename
from google.generativeai.types import HarmCategory, HarmBlockThreshold
# --- 1. INITIALIZE FLASK APP AND LOAD SECRETS ---
app = Flask(__name__)
# Load secrets from environment variables
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TTS_API_URL = os.getenv("TTS_API_URL")
# Check if secrets were loaded correctly
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY secret not found! Please set it as an environment variable.")
if not TTS_API_URL:
raise ValueError("TTS_API_URL secret not found! Please set it as an environment variable.")
# Configure the Gemini API with the loaded key
genai.configure(api_key=GEMINI_API_KEY)
# Configure directories for file uploads and downloads
UPLOAD_FOLDER = 'uploads'
DOWNLOAD_FOLDER = 'downloads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER
# --- 2. DEFINE VOICE CHOICES AND GEMINI PROMPT ---
VOICE_CHOICES = {
"Male (Charon)": "Charon",
"Female (Zephyr)": "Zephyr"
}
GEMINI_PROMPT = """
You are an AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil.
**CRITICAL INSTRUCTIONS:**
1. **Single Script:** Combine all dialogue from all speakers into one continuous script.
2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers.
3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text.
**EXAMPLE OUTPUT:**
Say happily: வணக்கம்! [laugh] எப்படி இருக்கீங்க? Whisper mysteriously: அந்த ரகசியம் எனக்கு மட்டும் தான் தெரியும்.
"""
# --- 3. HELPER FUNCTIONS (CORE LOGIC) ---
def generate_tamil_script(video_file_path):
"""Generates a single, continuous Tamil script from the video."""
print("Uploading file to Gemini for transcription...")
video_file = genai.upload_file(video_file_path, mime_type="video/mp4")
print("Waiting for file processing...")
while video_file.state.name == "PROCESSING":
time.sleep(10)
video_file = genai.get_file(video_file.name)
if video_file.state.name != "ACTIVE":
raise Exception(f"File {video_file.name} failed to process")
print("Generating single narrator script...")
model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest")
response = model.generate_content([GEMINI_PROMPT, video_file])
genai.delete_file(video_file.name)
print("Deleted file from Gemini.")
if response.text:
return " ".join(response.text.strip().splitlines())
raise Exception("No valid script was generated by Gemini.")
def generate_single_audio_track(dialogue_text, voice_name, is_cheerful, output_path):
"""Generates one continuous audio track for the entire script."""
print(f"Generating audio with voice '{voice_name}' | Cheerful: {is_cheerful}")
payload = {"text": dialogue_text, "voice_name": voice_name, "cheerful": is_cheerful}
response = requests.post(TTS_API_URL, json=payload)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
print(f"Audio track saved successfully to {output_path}")
return True
raise Exception(f"Error from TTS API: {response.status_code} - {response.text}")
def replace_video_audio(video_path, new_audio_path, output_path):
"""Replaces the audio of a video with a new audio file."""
print("Replacing video audio...")
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(new_audio_path)
final_clip = video_clip.set_audio(audio_clip)
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
video_clip.close()
audio_clip.close()
final_clip.close()
print(f"Final video saved to {output_path}")
# --- 4. FLASK ROUTES ---
@app.route('/', methods=['GET'])
def index():
"""Renders the main upload page."""
return render_template('index.html')
@app.route('/process', methods=['POST'])
def process_video():
"""Handles the video upload and dubbing process."""
if 'video' not in request.files:
return "No video file part", 400
file = request.files['video']
if file.filename == '':
return "No selected file", 400
if file:
filename = secure_filename(file.filename)
upload_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(upload_path)
voice_choice = request.form['voice_choice']
is_cheerful = 'cheerful' in request.form
voice_name = VOICE_CHOICES[voice_choice]
try:
# Generate the script
script = generate_tamil_script(upload_path)
# Generate the audio track
temp_audio_path = tempfile.mktemp(suffix=".wav")
generate_single_audio_track(script, voice_name, is_cheerful, temp_audio_path)
# Create the final video
final_video_name = f"dubbed_{filename}"
final_video_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_video_name)
replace_video_audio(upload_path, temp_audio_path, final_video_path)
# Clean up temporary audio file
os.remove(temp_audio_path)
# Render the same page but now with the results
return render_template('index.html',
result_video=url_for('serve_video', filename=final_video_name),
script=script)
except Exception as e:
print(f"An error occurred: {e}")
return f"An error occurred during processing: {e}", 500
@app.route('/downloads/<filename>')
def serve_video(filename):
"""Serves the final dubbed video from the downloads directory."""
return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename)
if __name__ == '__main__':
# Use host='0.0.0.0' to make it accessible on your local network
app.run(host="0.0.0.0", port=7860) |