|
import os |
|
import tempfile |
|
import uuid |
|
from fastapi import FastAPI, UploadFile, File, Form, HTTPException |
|
from fastapi.responses import FileResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from moviepy.editor import VideoFileClip, AudioFileClip |
|
import google.generativeai as genai |
|
import requests |
|
from dotenv import load_dotenv |
|
from pathlib import Path |
|
|
|
|
|
load_dotenv() |
|
|
|
app = FastAPI() |
|
|
|
|
|
UPLOAD_DIR = "uploads" |
|
DOWNLOAD_DIR = "downloads" |
|
Path(UPLOAD_DIR).mkdir(exist_ok=True) |
|
Path(DOWNLOAD_DIR).mkdir(exist_ok=True) |
|
|
|
|
|
app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads") |
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
TTS_API_URL = os.getenv("TTS_API_URL") |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
VOICE_CHOICES = { |
|
"male": "Charon", |
|
"female": "Zephyr" |
|
} |
|
|
|
GEMINI_PROMPT = """ |
|
You are an expert AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. |
|
|
|
**CRITICAL INSTRUCTIONS:** |
|
1. Combine all dialogue into one continuous script. |
|
2. NO timestamps or speaker labels. |
|
3. Add performance cues (e.g., [laugh], [sigh]) and directions (e.g., "Say happily:"). |
|
""" |
|
|
|
@app.post("/process") |
|
async def process_video( |
|
file: UploadFile = File(...), |
|
voice: str = Form("male"), |
|
cheerful: bool = Form(False) |
|
): |
|
try: |
|
|
|
file_ext = Path(file.filename).suffix |
|
file_name = f"{uuid.uuid4()}{file_ext}" |
|
file_path = os.path.join(UPLOAD_DIR, file_name) |
|
|
|
with open(file_path, "wb") as buffer: |
|
buffer.write(await file.read()) |
|
|
|
|
|
script = await generate_script(file_path) |
|
|
|
|
|
audio_path = os.path.join(UPLOAD_DIR, f"audio_{uuid.uuid4()}.wav") |
|
await generate_audio(script, voice, cheerful, audio_path) |
|
|
|
|
|
output_name = f"dubbed_{file_name}" |
|
output_path = os.path.join(DOWNLOAD_DIR, output_name) |
|
await create_dubbed_video(file_path, audio_path, output_path) |
|
|
|
|
|
os.remove(file_path) |
|
os.remove(audio_path) |
|
|
|
return { |
|
"video_url": f"/downloads/{output_name}", |
|
"script": script |
|
} |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
async def generate_script(video_path: str) -> str: |
|
try: |
|
video_file = genai.upload_file(video_path, mime_type="video/mp4") |
|
|
|
while video_file.state.name == "PROCESSING": |
|
video_file = genai.get_file(video_file.name) |
|
|
|
if video_file.state.name != "ACTIVE": |
|
raise Exception("Gemini processing failed") |
|
|
|
model = genai.GenerativeModel("models/gemini-1.5-pro-latest") |
|
response = model.generate_content([GEMINI_PROMPT, video_file]) |
|
genai.delete_file(video_file.name) |
|
|
|
if hasattr(response, 'text'): |
|
return " ".join(response.text.strip().splitlines()) |
|
raise Exception("No script generated") |
|
except Exception as e: |
|
raise Exception(f"Script generation failed: {str(e)}") |
|
|
|
async def generate_audio(text: str, voice: str, cheerful: bool, output_path: str): |
|
try: |
|
voice_name = VOICE_CHOICES.get(voice, "Charon") |
|
payload = { |
|
"text": text, |
|
"voice_name": voice_name, |
|
"cheerful": cheerful |
|
} |
|
|
|
response = requests.post(TTS_API_URL, json=payload, timeout=300) |
|
if response.status_code != 200: |
|
raise Exception(f"TTS API error: {response.text}") |
|
|
|
with open(output_path, "wb") as f: |
|
f.write(response.content) |
|
except Exception as e: |
|
raise Exception(f"Audio generation failed: {str(e)}") |
|
|
|
async def create_dubbed_video(video_path: str, audio_path: str, output_path: str): |
|
try: |
|
video = VideoFileClip(video_path) |
|
audio = AudioFileClip(audio_path) |
|
|
|
|
|
if audio.duration > video.duration: |
|
audio = audio.subclip(0, video.duration) |
|
|
|
video = video.set_audio(audio) |
|
video.write_videofile( |
|
output_path, |
|
codec="libx264", |
|
audio_codec="aac", |
|
threads=4, |
|
preset="fast" |
|
) |
|
|
|
video.close() |
|
audio.close() |
|
except Exception as e: |
|
raise Exception(f"Video processing failed: {str(e)}") |
|
|
|
@app.get("/downloads/{file_name}") |
|
async def download_file(file_name: str): |
|
file_path = os.path.join(DOWNLOAD_DIR, file_name) |
|
if not os.path.exists(file_path): |
|
raise HTTPException(status_code=404, detail="File not found") |
|
return FileResponse(file_path) |