|
import os |
|
import time |
|
import tempfile |
|
import uuid |
|
import google.generativeai as genai |
|
import requests |
|
from flask import Flask, request, render_template, send_from_directory, url_for, flash |
|
from moviepy.video.io.VideoFileClip import VideoFileClip |
|
from moviepy.audio.io.AudioFileClip import AudioFileClip |
|
from moviepy.audio.AudioClip import concatenate_audioclips |
|
from werkzeug.utils import secure_filename |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
app = Flask(__name__) |
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
TTS_API_URL = os.getenv("TTS_API_URL") |
|
TTS_MAX_TOKENS = 30000 |
|
|
|
|
|
if not GEMINI_API_KEY: |
|
raise ValueError("SECURITY ERROR: GEMINI_API_KEY not found in .env file!") |
|
if not TTS_API_URL: |
|
raise ValueError("CONFIGURATION ERROR: TTS_API_URL not found in .env file!") |
|
|
|
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
DOWNLOAD_FOLDER = 'downloads' |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER |
|
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 |
|
app.secret_key = os.urandom(24) |
|
|
|
|
|
VOICE_CHOICES = { |
|
"Male (Charon)": "Charon", |
|
"Female (Zephyr)": "Zephyr" |
|
} |
|
|
|
GEMINI_PROMPT = """ |
|
You are an expert AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. |
|
|
|
**CRITICAL INSTRUCTIONS:** |
|
1. **Single Script:** Combine all dialogue from all speakers into one continuous script. |
|
2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers. |
|
3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text for an expressive narration. |
|
|
|
**EXAMPLE OUTPUT:** |
|
Say happily: வணக்கம்! [laugh] எப்படி இருக்கீங்க? Whisper mysteriously: அந்த ரகசியம் எனக்கு மட்டும் தான் தெரியும். |
|
""" |
|
|
|
|
|
|
|
def generate_tamil_script(video_file_path): |
|
"""Generates a Tamil script from the video using Gemini AI.""" |
|
print("Uploading video to Gemini for transcription...") |
|
video_file = genai.upload_file(video_file_path, mime_type="video/mp4") |
|
|
|
|
|
while video_file.state.name == "PROCESSING": |
|
time.sleep(5) |
|
video_file = genai.get_file(video_file.name) |
|
|
|
if video_file.state.name != "ACTIVE": |
|
raise Exception(f"Gemini file processing failed: {video_file.state.name}") |
|
|
|
print("Generating script...") |
|
model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest") |
|
response = model.generate_content([GEMINI_PROMPT, video_file]) |
|
genai.delete_file(video_file.name) |
|
|
|
if hasattr(response, 'text') and response.text: |
|
return " ".join(response.text.strip().splitlines()) |
|
raise Exception("No valid script was generated by Gemini.") |
|
|
|
def split_text_for_tts(text, max_tokens=TTS_MAX_TOKENS): |
|
"""Splits text into chunks that fit within TTS token limits.""" |
|
words = text.split() |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for word in words: |
|
word_length = len(word) + 1 |
|
if current_length + word_length > max_tokens: |
|
chunks.append(" ".join(current_chunk)) |
|
current_chunk = [word] |
|
current_length = word_length |
|
else: |
|
current_chunk.append(word) |
|
current_length += word_length |
|
|
|
if current_chunk: |
|
chunks.append(" ".join(current_chunk)) |
|
|
|
return chunks |
|
|
|
def generate_audio_with_retry(text_chunk, voice_name, is_cheerful, max_retries=3, retry_delay=2): |
|
"""Generates audio with retry logic for API failures.""" |
|
for attempt in range(max_retries): |
|
try: |
|
payload = { |
|
"text": text_chunk, |
|
"voice_name": voice_name, |
|
"cheerful": is_cheerful |
|
} |
|
|
|
response = requests.post(TTS_API_URL, json=payload, timeout=300) |
|
response.raise_for_status() |
|
|
|
if response.status_code == 200: |
|
return response.content |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"TTS API attempt {attempt + 1} failed: {str(e)}") |
|
if attempt < max_retries - 1: |
|
time.sleep(retry_delay * (attempt + 1)) |
|
else: |
|
raise Exception(f"TTS API failed after {max_retries} attempts: {str(e)}") |
|
|
|
def generate_long_audio(script_text, voice_name, is_cheerful, output_path): |
|
"""Handles long audio generation by splitting text and combining results.""" |
|
print("Processing long audio generation...") |
|
text_chunks = split_text_for_tts(script_text) |
|
audio_clips = [] |
|
temp_files = [] |
|
|
|
try: |
|
for i, chunk in enumerate(text_chunks): |
|
print(f"Processing chunk {i+1}/{len(text_chunks)}") |
|
chunk_audio = generate_audio_with_retry(chunk, voice_name, is_cheerful) |
|
|
|
|
|
temp_file = f"temp_chunk_{i}.wav" |
|
with open(temp_file, "wb") as f: |
|
f.write(chunk_audio) |
|
temp_files.append(temp_file) |
|
|
|
|
|
audio_clip = AudioFileClip(temp_file) |
|
audio_clips.append(audio_clip) |
|
|
|
|
|
print("Combining audio chunks...") |
|
final_audio = concatenate_audioclips(audio_clips) |
|
final_audio.write_audiofile(output_path) |
|
|
|
finally: |
|
|
|
for temp_file in temp_files: |
|
if os.path.exists(temp_file): |
|
os.remove(temp_file) |
|
|
|
|
|
for clip in audio_clips: |
|
clip.close() |
|
|
|
def replace_video_audio(video_path, new_audio_path, output_path): |
|
"""Replaces the audio track of a video file.""" |
|
print("Replacing video audio...") |
|
video_clip = None |
|
audio_clip = None |
|
|
|
try: |
|
video_clip = VideoFileClip(video_path) |
|
audio_clip = AudioFileClip(new_audio_path) |
|
video_clip.audio = audio_clip |
|
video_clip.write_videofile( |
|
output_path, |
|
codec="libx264", |
|
audio_codec="aac", |
|
logger='bar' |
|
) |
|
finally: |
|
if audio_clip: |
|
audio_clip.close() |
|
if video_clip: |
|
video_clip.close() |
|
|
|
|
|
|
|
@app.route('/', methods=['GET']) |
|
def index(): |
|
"""Render the main upload page.""" |
|
return render_template('index.html', voices=VOICE_CHOICES) |
|
|
|
@app.route('/process', methods=['POST']) |
|
def process_video(): |
|
"""Handle video upload and processing.""" |
|
input_video_path = None |
|
temp_audio_path = None |
|
|
|
try: |
|
|
|
if 'video' not in request.files or request.files['video'].filename == '': |
|
flash("Please upload a video file.", "error") |
|
return render_template('index.html', voices=VOICE_CHOICES) |
|
|
|
|
|
file = request.files['video'] |
|
filename = secure_filename(file.filename) |
|
input_video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(input_video_path) |
|
|
|
|
|
voice_choice = request.form.get('voice', 'Charon') |
|
is_cheerful = request.form.get('tone') == 'on' |
|
|
|
|
|
script = generate_tamil_script(input_video_path) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: |
|
temp_audio_path = temp_audio.name |
|
|
|
|
|
generate_long_audio(script, voice_choice, is_cheerful, temp_audio_path) |
|
|
|
|
|
final_video_name = f"dubbed_{filename}" |
|
final_video_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_video_name) |
|
replace_video_audio(input_video_path, temp_audio_path, final_video_path) |
|
|
|
flash("Video processing complete!", "success") |
|
return render_template( |
|
'index.html', |
|
voices=VOICE_CHOICES, |
|
result_video=url_for('serve_video', filename=final_video_name), |
|
script=script |
|
) |
|
|
|
except Exception as e: |
|
print(f"Processing error: {str(e)}") |
|
flash(f"An error occurred: {str(e)}", "error") |
|
return render_template('index.html', voices=VOICE_CHOICES) |
|
|
|
finally: |
|
|
|
if input_video_path and os.path.exists(input_video_path): |
|
os.remove(input_video_path) |
|
if temp_audio_path and os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
|
|
@app.route('/downloads/<filename>') |
|
def serve_video(filename): |
|
"""Serve the processed video file.""" |
|
return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) |
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860) |