|
import os |
|
import time |
|
import tempfile |
|
import uuid |
|
import google.generativeai as genai |
|
import requests |
|
import re |
|
from flask import Flask, request, render_template, send_from_directory, jsonify |
|
from moviepy.video.io.VideoFileClip import VideoFileClip |
|
from moviepy.audio.io.AudioFileClip import AudioFileClip |
|
from werkzeug.utils import secure_filename |
|
from dotenv import load_dotenv |
|
import threading |
|
import logging |
|
from gtts import gTTS |
|
import io |
|
from pathlib import Path |
|
|
|
|
|
load_dotenv() |
|
app = Flask(__name__) |
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
TTS_API_URL = os.getenv("TTS_API_URL", "") |
|
MAX_CONTENT_LENGTH = 500 * 1024 * 1024 |
|
MAX_TTS_RETRIES = 3 |
|
TTS_CHUNK_SIZE = 2000 |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
DOWNLOAD_FOLDER = 'downloads' |
|
Path(UPLOAD_FOLDER).mkdir(exist_ok=True) |
|
Path(DOWNLOAD_FOLDER).mkdir(exist_ok=True) |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER |
|
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH |
|
app.secret_key = os.urandom(24) |
|
|
|
|
|
processing_status = {} |
|
|
|
|
|
LANGUAGE_MAPPING = { |
|
"Arabic (Egyptian)": "ar-EG", |
|
"English (US)": "en-US", |
|
"Hindi (India)": "hi-IN", |
|
"Tamil (India)": "ta-IN", |
|
"Telugu (India)": "te-IN" |
|
} |
|
|
|
VOICE_TYPES = { |
|
"Male": "male", |
|
"Female": "female" |
|
} |
|
|
|
GEMINI_PROMPTS = { |
|
"api": """ |
|
You are an AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. |
|
|
|
**CRITICAL INSTRUCTIONS:** |
|
1. **Single Script:** Combine all dialogue into one continuous script. |
|
2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers. |
|
3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text for an expressive narration. |
|
|
|
**EXAMPLE OUTPUT:** |
|
Say happily: வணக்கம்! [laugh] எப்படி இருக்கீங்க? Whisper mysteriously: அந்த ரகசியம் எனக்கு மட்டும் தான் தெரியும் |
|
""", |
|
"gtts": """ |
|
You are an expert AI scriptwriter. Transcribe ALL spoken dialogue into a SINGLE, |
|
CONTINUOUS block of modern {language}. Return ONLY the clean transcribed text. |
|
""" |
|
} |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
def split_text_into_chunks(text, chunk_size=TTS_CHUNK_SIZE): |
|
"""Split text into chunks respecting sentence boundaries""" |
|
sentences = re.split(r'(?<=[.!?])\s+', text) |
|
chunks = [] |
|
current_chunk = "" |
|
|
|
for sentence in sentences: |
|
if len(current_chunk) + len(sentence) < chunk_size: |
|
current_chunk += sentence + " " |
|
else: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence + " " |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
return chunks |
|
|
|
def generate_tts_audio(text, language_code, voice_type, tts_provider): |
|
"""Generate TTS audio using selected provider with retry logic""" |
|
chunks = split_text_into_chunks(text) |
|
audio_segments = [] |
|
|
|
for chunk in chunks: |
|
for attempt in range(MAX_TTS_RETRIES): |
|
try: |
|
if tts_provider == "api": |
|
|
|
payload = { |
|
"text": chunk, |
|
"language": language_code, |
|
"voice_type": voice_type |
|
} |
|
response = requests.post(TTS_API_URL, json=payload, timeout=300) |
|
|
|
if response.status_code == 200: |
|
audio_segments.append(io.BytesIO(response.content)) |
|
break |
|
elif response.status_code == 429: |
|
retry_after = int(response.headers.get('Retry-After', 5)) |
|
logger.warning(f"TTS API rate limited. Retrying after {retry_after}s") |
|
time.sleep(retry_after) |
|
continue |
|
else: |
|
raise Exception(f"TTS API error: {response.status_code}") |
|
else: |
|
|
|
tts = gTTS( |
|
text=chunk, |
|
lang=language_code.split('-')[0], |
|
slow=False |
|
) |
|
buffer = io.BytesIO() |
|
tts.write_to_fp(buffer) |
|
buffer.seek(0) |
|
audio_segments.append(buffer) |
|
break |
|
|
|
except Exception as e: |
|
logger.warning(f"TTS attempt {attempt + 1} failed: {str(e)}") |
|
if attempt == MAX_TTS_RETRIES - 1: |
|
raise Exception(f"Failed to generate TTS after {MAX_TTS_RETRIES} attempts") |
|
time.sleep(2 ** attempt) |
|
|
|
|
|
combined_audio = io.BytesIO() |
|
for segment in audio_segments: |
|
combined_audio.write(segment.getvalue()) |
|
combined_audio.seek(0) |
|
return combined_audio |
|
|
|
def generate_transcription(video_path, prompt): |
|
"""Generate transcript using Gemini with retry logic""" |
|
max_retries = 3 |
|
for attempt in range(max_retries): |
|
try: |
|
video_file = genai.upload_file(video_path, mime_type="video/mp4") |
|
model = genai.GenerativeModel("models/gemini-pro-vision") |
|
response = model.generate_content([prompt, video_file]) |
|
genai.delete_file(video_file.name) |
|
|
|
if hasattr(response, 'text'): |
|
return response.text.strip() |
|
raise Exception("No valid transcription generated") |
|
|
|
except Exception as e: |
|
if attempt == max_retries - 1: |
|
raise |
|
logger.warning(f"Transcription attempt {attempt + 1} failed: {str(e)}") |
|
time.sleep(5 * (attempt + 1)) |
|
|
|
def dub_video(video_path, audio_buffer): |
|
"""Dub video with new audio""" |
|
video = None |
|
audio = None |
|
temp_audio_path = None |
|
|
|
try: |
|
|
|
temp_audio_path = f"temp_audio_{uuid.uuid4().hex}.mp3" |
|
with open(temp_audio_path, 'wb') as f: |
|
f.write(audio_buffer.read()) |
|
|
|
|
|
video = VideoFileClip(video_path) |
|
audio = AudioFileClip(temp_audio_path) |
|
|
|
|
|
if audio.duration > video.duration: |
|
audio = audio.subclip(0, video.duration) |
|
|
|
video = video.set_audio(audio) |
|
|
|
|
|
output_filename = f"dubbed_{uuid.uuid4().hex}.mp4" |
|
output_path = os.path.join(app.config['DOWNLOAD_FOLDER'], output_filename) |
|
|
|
video.write_videofile( |
|
output_path, |
|
codec="libx264", |
|
audio_codec="aac", |
|
threads=4, |
|
verbose=False, |
|
preset='medium', |
|
ffmpeg_params=['-crf', '23', '-movflags', '+faststart'] |
|
) |
|
|
|
return output_path |
|
|
|
finally: |
|
|
|
if video: |
|
video.close() |
|
if audio: |
|
audio.close() |
|
if temp_audio_path and os.path.exists(temp_audio_path): |
|
os.unlink(temp_audio_path) |
|
|
|
def process_video_background(task_id, video_path, language, voice_type, tts_provider): |
|
"""Background video processing""" |
|
try: |
|
processing_status[task_id] = { |
|
'status': 'processing', |
|
'progress': 0, |
|
'message': 'Starting transcription', |
|
'start_time': time.time() |
|
} |
|
|
|
|
|
processing_status[task_id]['message'] = 'Transcribing video content' |
|
prompt = GEMINI_PROMPTS[tts_provider].format(language=language) |
|
script = generate_transcription(video_path, prompt) |
|
processing_status[task_id]['progress'] = 33 |
|
processing_status[task_id]['script'] = script |
|
|
|
|
|
processing_status[task_id]['message'] = 'Generating audio narration' |
|
language_code = LANGUAGE_MAPPING.get(language, "en-US") |
|
audio_buffer = generate_tts_audio(script, language_code, voice_type, tts_provider) |
|
processing_status[task_id]['progress'] = 66 |
|
|
|
|
|
processing_status[task_id]['message'] = 'Creating dubbed video' |
|
output_path = dub_video(video_path, audio_buffer) |
|
processing_status[task_id]['progress'] = 100 |
|
processing_status[task_id]['status'] = 'complete' |
|
processing_status[task_id]['result_path'] = output_path |
|
|
|
except Exception as e: |
|
processing_status[task_id]['status'] = 'error' |
|
processing_status[task_id]['message'] = str(e) |
|
logger.error(f"Processing failed: {str(e)}") |
|
|
|
finally: |
|
|
|
if os.path.exists(video_path): |
|
os.unlink(video_path) |
|
|
|
@app.route('/') |
|
def index(): |
|
"""Render main page""" |
|
return render_template( |
|
'index.html', |
|
languages=list(LANGUAGE_MAPPING.keys()), |
|
voice_types=list(VOICE_TYPES.keys()), |
|
default_language="English (US)", |
|
tts_api_available=bool(TTS_API_URL) |
|
) |
|
|
|
@app.route('/upload', methods=['POST']) |
|
def upload_video(): |
|
"""Handle video upload""" |
|
if 'video' not in request.files: |
|
return jsonify({'error': 'No file uploaded'}), 400 |
|
|
|
file = request.files['video'] |
|
if file.filename == '': |
|
return jsonify({'error': 'No file selected'}), 400 |
|
|
|
|
|
allowed_extensions = {'mp4', 'mov', 'webm', 'avi'} |
|
if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: |
|
return jsonify({'error': 'Invalid file type'}), 400 |
|
|
|
|
|
task_id = uuid.uuid4().hex |
|
filename = secure_filename(f"{task_id}_{file.filename}") |
|
video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
|
|
try: |
|
file.save(video_path) |
|
except Exception as e: |
|
return jsonify({'error': f'Failed to save file: {str(e)}'}), 500 |
|
|
|
|
|
language = request.form.get('language', 'English (US)') |
|
voice_type = request.form.get('voice_type', 'Male') |
|
tts_provider = request.form.get('tts_provider', 'gtts') |
|
|
|
|
|
if tts_provider == "api" and not TTS_API_URL: |
|
return jsonify({'error': 'TTS API is not configured'}), 400 |
|
|
|
|
|
processing_status[task_id] = { |
|
'status': 'uploaded', |
|
'progress': 0, |
|
'message': 'Starting processing', |
|
'start_time': time.time() |
|
} |
|
|
|
thread = threading.Thread( |
|
target=process_video_background, |
|
args=(task_id, video_path, language, voice_type, tts_provider) |
|
) |
|
thread.start() |
|
|
|
return jsonify({'task_id': task_id}) |
|
|
|
@app.route('/status/<task_id>') |
|
def get_status(task_id): |
|
"""Check processing status""" |
|
if task_id not in processing_status: |
|
return jsonify({'error': 'Invalid task ID'}), 404 |
|
|
|
status = processing_status[task_id] |
|
response = { |
|
'status': status['status'], |
|
'progress': status.get('progress', 0), |
|
'message': status.get('message', ''), |
|
} |
|
|
|
if status['status'] == 'complete': |
|
response['result_url'] = url_for( |
|
'download', |
|
filename=os.path.basename(status['result_path']) |
|
) |
|
response['script'] = status.get('script', '') |
|
elif status['status'] == 'error': |
|
response['error_details'] = status.get('message', 'Unknown error') |
|
|
|
return jsonify(response) |
|
|
|
@app.route('/download/<filename>') |
|
def download(filename): |
|
"""Serve processed video with security checks""" |
|
try: |
|
|
|
if not filename.startswith('dubbed_') or not filename.endswith('.mp4'): |
|
return "Invalid file", 400 |
|
|
|
|
|
download_path = Path(app.config['DOWNLOAD_FOLDER']) / filename |
|
if not download_path.exists(): |
|
return "File not found", 404 |
|
|
|
return send_from_directory( |
|
app.config['DOWNLOAD_FOLDER'], |
|
filename, |
|
as_attachment=True, |
|
mimetype='video/mp4' |
|
) |
|
except Exception as e: |
|
logger.error(f"Download failed: {str(e)}") |
|
return "Download error", 500 |
|
|
|
if __name__ == '__main__': |
|
if not GEMINI_API_KEY: |
|
raise ValueError("GEMINI_API_KEY is required in .env file") |
|
app.run(host="0.0.0.0", port=7860, threaded=True) |