Translate / app.py
Athspi's picture
Update app.py
358d8c6 verified
raw
history blame
12.4 kB
import os
import time
import tempfile
import uuid
import google.generativeai as genai
import requests
import re
from flask import Flask, request, render_template, send_from_directory, jsonify
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
import threading
import logging
from gtts import gTTS
import io
from pathlib import Path
# Initialize Flask app
load_dotenv()
app = Flask(__name__)
# Configuration
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TTS_API_URL = os.getenv("TTS_API_URL", "") # Optional
MAX_CONTENT_LENGTH = 500 * 1024 * 1024 # 500MB
MAX_TTS_RETRIES = 3
TTS_CHUNK_SIZE = 2000 # Characters per chunk
# File storage setup
UPLOAD_FOLDER = 'uploads'
DOWNLOAD_FOLDER = 'downloads'
Path(UPLOAD_FOLDER).mkdir(exist_ok=True)
Path(DOWNLOAD_FOLDER).mkdir(exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
app.secret_key = os.urandom(24)
# Processing status tracking
processing_status = {}
# Language and voice options
LANGUAGE_MAPPING = {
"Arabic (Egyptian)": "ar-EG",
"English (US)": "en-US",
"Hindi (India)": "hi-IN",
"Tamil (India)": "ta-IN",
"Telugu (India)": "te-IN"
}
VOICE_TYPES = {
"Male": "male",
"Female": "female"
}
GEMINI_PROMPTS = {
"api": """
You are an expert AI scriptwriter. Transcribe ALL spoken dialogue into a SINGLE,
CONTINUOUS block of modern {language}. Include natural speech patterns and
performance directions (e.g., [pause], [laugh]) where appropriate.
""",
"gtts": """
You are an expert AI scriptwriter. Transcribe ALL spoken dialogue into a SINGLE,
CONTINUOUS block of modern {language}. Return ONLY the clean transcribed text.
"""
}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def split_text_into_chunks(text, chunk_size=TTS_CHUNK_SIZE):
"""Split text into chunks respecting sentence boundaries"""
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < chunk_size:
current_chunk += sentence + " "
else:
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def generate_tts_audio(text, language_code, voice_type, tts_provider):
"""Generate TTS audio using selected provider with retry logic"""
chunks = split_text_into_chunks(text)
audio_segments = []
for chunk in chunks:
for attempt in range(MAX_TTS_RETRIES):
try:
if tts_provider == "api":
# Use custom TTS API
payload = {
"text": chunk,
"language": language_code,
"voice_type": voice_type
}
response = requests.post(TTS_API_URL, json=payload, timeout=300)
if response.status_code == 200:
audio_segments.append(io.BytesIO(response.content))
break
elif response.status_code == 429: # Rate limit
retry_after = int(response.headers.get('Retry-After', 5))
logger.warning(f"TTS API rate limited. Retrying after {retry_after}s")
time.sleep(retry_after)
continue
else:
raise Exception(f"TTS API error: {response.status_code}")
else:
# Use gTTS
tts = gTTS(
text=chunk,
lang=language_code.split('-')[0],
slow=False
)
buffer = io.BytesIO()
tts.write_to_fp(buffer)
buffer.seek(0)
audio_segments.append(buffer)
break
except Exception as e:
logger.warning(f"TTS attempt {attempt + 1} failed: {str(e)}")
if attempt == MAX_TTS_RETRIES - 1:
raise Exception(f"Failed to generate TTS after {MAX_TTS_RETRIES} attempts")
time.sleep(2 ** attempt) # Exponential backoff
# Combine audio segments
combined_audio = io.BytesIO()
for segment in audio_segments:
combined_audio.write(segment.getvalue())
combined_audio.seek(0)
return combined_audio
def generate_transcription(video_path, prompt):
"""Generate transcript using Gemini with retry logic"""
max_retries = 3
for attempt in range(max_retries):
try:
video_file = genai.upload_file(video_path, mime_type="video/mp4")
model = genai.GenerativeModel("models/gemini-pro-vision")
response = model.generate_content([prompt, video_file])
genai.delete_file(video_file.name)
if hasattr(response, 'text'):
return response.text.strip()
raise Exception("No valid transcription generated")
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Transcription attempt {attempt + 1} failed: {str(e)}")
time.sleep(5 * (attempt + 1))
def dub_video(video_path, audio_buffer):
"""Dub video with new audio"""
video = None
audio = None
temp_audio_path = None
try:
# Save audio buffer to temp file
temp_audio_path = f"temp_audio_{uuid.uuid4().hex}.mp3"
with open(temp_audio_path, 'wb') as f:
f.write(audio_buffer.read())
# Process video
video = VideoFileClip(video_path)
audio = AudioFileClip(temp_audio_path)
# Ensure audio length matches video
if audio.duration > video.duration:
audio = audio.subclip(0, video.duration)
video = video.set_audio(audio)
# Save output
output_filename = f"dubbed_{uuid.uuid4().hex}.mp4"
output_path = os.path.join(app.config['DOWNLOAD_FOLDER'], output_filename)
video.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
threads=4,
verbose=False,
preset='medium',
ffmpeg_params=['-crf', '23', '-movflags', '+faststart']
)
return output_path
finally:
# Cleanup resources
if video:
video.close()
if audio:
audio.close()
if temp_audio_path and os.path.exists(temp_audio_path):
os.unlink(temp_audio_path)
def process_video_background(task_id, video_path, language, voice_type, tts_provider):
"""Background video processing"""
try:
processing_status[task_id] = {
'status': 'processing',
'progress': 0,
'message': 'Starting transcription',
'start_time': time.time()
}
# Stage 1: Transcription
processing_status[task_id]['message'] = 'Transcribing video content'
prompt = GEMINI_PROMPTS[tts_provider].format(language=language)
script = generate_transcription(video_path, prompt)
processing_status[task_id]['progress'] = 33
processing_status[task_id]['script'] = script
# Stage 2: Audio Generation
processing_status[task_id]['message'] = 'Generating audio narration'
language_code = LANGUAGE_MAPPING.get(language, "en-US")
audio_buffer = generate_tts_audio(script, language_code, voice_type, tts_provider)
processing_status[task_id]['progress'] = 66
# Stage 3: Video Dubbing
processing_status[task_id]['message'] = 'Creating dubbed video'
output_path = dub_video(video_path, audio_buffer)
processing_status[task_id]['progress'] = 100
processing_status[task_id]['status'] = 'complete'
processing_status[task_id]['result_path'] = output_path
except Exception as e:
processing_status[task_id]['status'] = 'error'
processing_status[task_id]['message'] = str(e)
logger.error(f"Processing failed: {str(e)}")
finally:
# Cleanup
if os.path.exists(video_path):
os.unlink(video_path)
@app.route('/')
def index():
"""Render main page"""
return render_template(
'index.html',
languages=list(LANGUAGE_MAPPING.keys()),
voice_types=list(VOICE_TYPES.keys()),
default_language="English (US)",
tts_api_available=bool(TTS_API_URL)
)
@app.route('/upload', methods=['POST'])
def upload_video():
"""Handle video upload"""
if 'video' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['video']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Validate file extension
allowed_extensions = {'mp4', 'mov', 'webm', 'avi'}
if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions:
return jsonify({'error': 'Invalid file type'}), 400
# Save file with unique name
task_id = uuid.uuid4().hex
filename = secure_filename(f"{task_id}_{file.filename}")
video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
try:
file.save(video_path)
except Exception as e:
return jsonify({'error': f'Failed to save file: {str(e)}'}), 500
# Get processing options
language = request.form.get('language', 'English (US)')
voice_type = request.form.get('voice_type', 'Male')
tts_provider = request.form.get('tts_provider', 'gtts')
# Validate TTS provider selection
if tts_provider == "api" and not TTS_API_URL:
return jsonify({'error': 'TTS API is not configured'}), 400
# Start background processing
processing_status[task_id] = {
'status': 'uploaded',
'progress': 0,
'message': 'Starting processing',
'start_time': time.time()
}
thread = threading.Thread(
target=process_video_background,
args=(task_id, video_path, language, voice_type, tts_provider)
)
thread.start()
return jsonify({'task_id': task_id})
@app.route('/status/<task_id>')
def get_status(task_id):
"""Check processing status"""
if task_id not in processing_status:
return jsonify({'error': 'Invalid task ID'}), 404
status = processing_status[task_id]
response = {
'status': status['status'],
'progress': status.get('progress', 0),
'message': status.get('message', ''),
}
if status['status'] == 'complete':
response['result_url'] = url_for(
'download',
filename=os.path.basename(status['result_path'])
)
response['script'] = status.get('script', '')
elif status['status'] == 'error':
response['error_details'] = status.get('message', 'Unknown error')
return jsonify(response)
@app.route('/download/<filename>')
def download(filename):
"""Serve processed video with security checks"""
try:
# Security check
if not filename.startswith('dubbed_') or not filename.endswith('.mp4'):
return "Invalid file", 400
# Validate path
download_path = Path(app.config['DOWNLOAD_FOLDER']) / filename
if not download_path.exists():
return "File not found", 404
return send_from_directory(
app.config['DOWNLOAD_FOLDER'],
filename,
as_attachment=True,
mimetype='video/mp4'
)
except Exception as e:
logger.error(f"Download failed: {str(e)}")
return "Download error", 500
if __name__ == '__main__':
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY is required in .env file")
app.run(host="0.0.0.0", port=7860, threaded=True)