Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import json | |
import time | |
import random | |
import subprocess | |
from pathlib import Path | |
import google.generativeai as genai | |
from tavily import TavilyClient | |
from runwayml import RunwayML, TaskFailedError | |
from PIL import Image, ImageDraw, ImageFont | |
# ============================================================= | |
# AI VIDEO STUDIO (Gen-4 Turbo ImageโVideo compliant rewrite) | |
# ============================================================= | |
# Key changes: | |
# 1. Added *required* prompt_image for Gen-4 / gen4_turbo image_to_video tasks (was missing -> error). | |
# 2. Added UI input for an optional user keyframe image; if absent we auto-generate a placeholder. | |
# 3. Included prompt_text together with prompt_image for better guidance. | |
# 4. Added more robust polling / retry & explicit exception surfaces. | |
# 5. Added structured logging + deterministic temp directory per job. | |
# 6. Wrapped cleanup in finally; kept mock VO approach. | |
# 7. Added basic safety guardrails. | |
# | |
# Gen-4 requires an input image plus text prompt (cannot be pure text alone) โ if you want pure text-to-video, switch to Gen-3 Alpha text mode. See docs. | |
# ============================================================= | |
# --- 1. CONFIGURE API KEYS --- | |
try: | |
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"]) | |
RUNWAY_API_KEY = os.environ["RUNWAY_API_KEY"] | |
runway_client = RunwayML(api_key=RUNWAY_API_KEY) | |
except KeyError as e: | |
raise ValueError(f"API Key Error: Please set the {e} secret in your environment.") | |
# --- 2. CONSTANTS / SETTINGS --- | |
GEN4_MODEL = "gen4_turbo" # adjust to "gen4" if you prefer (slower / potentially higher fidelity) | |
SCENE_COUNT = 4 | |
SCENE_DURATION_SECONDS = 5 # Gen-4 supports 5 or 10 seconds | |
VIDEO_RATIO = "1280:720" # 16:9 | |
WORDS_PER_SEC = 2.5 # Used for mock narration length | |
MAX_POLL_SECONDS = 180 # Per scene | |
POLL_INTERVAL = 5 | |
# --- 3. UTILITIES --- | |
def _log(msg: str): | |
print(f"[AI-STUDIO] {msg}") | |
def create_placeholder_image(text: str, path: Path, size=(1280, 720)) -> Path: | |
"""Create a simple placeholder keyframe if user supplies none. | |
You can later replace this with a real text-to-image generation step.""" | |
img = Image.new("RGB", size, (10, 10, 10)) | |
draw = ImageDraw.Draw(img) | |
try: | |
font = ImageFont.truetype("DejaVuSans-Bold.ttf", 60) | |
except Exception: | |
font = ImageFont.load_default() | |
wrapped = [] | |
line = "" | |
for word in text.split(): | |
test = f"{line} {word}".strip() | |
if len(test) > 28: # naive wrap | |
wrapped.append(line) | |
line = word | |
else: | |
line = test | |
if line: | |
wrapped.append(line) | |
y = size[1] // 2 - (len(wrapped) * 35) // 2 | |
for w in wrapped: | |
w_width, w_height = draw.textsize(w, font=font) | |
draw.text(((size[0]-w_width)//2, y), w, fill=(240, 240, 240), font=font) | |
y += w_height + 10 | |
img.save(path) | |
return path | |
def generate_mock_voiceover(narration: str, out_path: Path): | |
duration = len(narration.split()) / WORDS_PER_SEC | |
subprocess.run([ | |
'ffmpeg', '-f', 'lavfi', '-i', 'anullsrc=r=44100:cl=mono', | |
'-t', str(duration), '-q:a', '9', '-acodec', 'libmp3lame', str(out_path), '-y' | |
], check=True) | |
return duration | |
def poll_runway_task(task_obj, max_seconds=MAX_POLL_SECONDS, interval=POLL_INTERVAL): | |
start = time.time() | |
while True: | |
task_obj.refresh() | |
status = task_obj.status | |
if status == 'SUCCEEDED': | |
return task_obj | |
if status == 'FAILED': | |
raise TaskFailedError(task_details=task_obj) | |
if time.time() - start > max_seconds: | |
raise TimeoutError(f"Runway task timed out after {max_seconds}s (status={status})") | |
time.sleep(interval) | |
# --- 4. CORE PIPELINE --- | |
def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress(track_tqdm=True)): | |
job_id = f"{int(time.time())}_{random.randint(1000, 9999)}" | |
_log(f"Starting job {job_id} :: topic='{topic_prompt}'") | |
# Working directory for this job | |
workdir = Path(f"job_{job_id}") | |
workdir.mkdir(exist_ok=True) | |
intermediates = [] | |
try: | |
# STEP 1: Research | |
progress(0.05, desc="๐ Researching topic ...") | |
facts = "No research data available." | |
try: | |
research_results = tavily_client.search( | |
query=f"Key facts and interesting points about {topic_prompt}", | |
search_depth="basic" | |
) | |
if research_results and 'results' in research_results: | |
facts = "\n".join([res['content'] for res in research_results['results']]) | |
except Exception as e: | |
_log(f"Tavily failed: {e}") | |
# STEP 2: Script | |
progress(0.15, desc="โ๏ธ Writing script ...") | |
gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
script_prompt = f""" | |
You are a creative director for viral short-form videos. | |
Topic: {topic_prompt} | |
Research (may contain noise):\n{facts}\n\n | |
Produce JSON with keys: | |
narration_script: overall narration (concise, energetic, ~85-110 words per 5 scenes). Maintain coherence. | |
scene_prompts: list of {SCENE_COUNT} *visual* prompts. Each should be cinematic, 1-2 sentences, include style / camera / lighting cues and keep characters consistent. | |
Return ONLY JSON. | |
""" | |
response = gemini_model.generate_content(script_prompt) | |
try: | |
cleaned = response.text.strip().replace("```json", "").replace("```", "") | |
data = json.loads(cleaned) | |
narration = data['narration_script'] | |
scene_prompts = data['scene_prompts'] | |
if len(scene_prompts) != SCENE_COUNT: | |
raise ValueError(f"Expected {SCENE_COUNT} scene prompts, got {len(scene_prompts)}") | |
except Exception as e: | |
raise gr.Error(f"Gemini JSON parse error: {e}. Raw: {response.text[:400]}") | |
# STEP 3: Mock VO | |
progress(0.25, desc="๐๏ธ Generating mock VO ...") | |
audio_path = workdir / f"narration_{job_id}.mp3" | |
generate_mock_voiceover(narration, audio_path) | |
intermediates.append(audio_path) | |
# STEP 4: Prepare keyframe image (required for Gen-4 image_to_video) | |
progress(0.30, desc="๐ผ๏ธ Preparing keyframe image ...") | |
if keyframe_image is not None: | |
keyframe_path = Path(keyframe_image) | |
else: | |
keyframe_path = workdir / "auto_keyframe.png" | |
create_placeholder_image(topic_prompt, keyframe_path) | |
intermediates.append(keyframe_path) | |
# STEP 5: Generate scenes | |
clip_paths = [] | |
for idx, scene_prompt in enumerate(scene_prompts, start=1): | |
base_progress = 0.30 + (idx * 0.12) | |
progress(min(base_progress, 0.85), desc=f"๐ฌ Scene {idx}/{len(scene_prompts)} ...") | |
_log(f"Submitting scene {idx}: {scene_prompt[:90]}...") | |
try: | |
task = runway_client.image_to_video.create( | |
model=GEN4_MODEL, | |
prompt_image=str(keyframe_path), # required param | |
prompt_text=scene_prompt, | |
duration=SCENE_DURATION_SECONDS, | |
ratio=VIDEO_RATIO, | |
) | |
task = poll_runway_task(task) | |
video_url = task.output[0] | |
except TaskFailedError as e: | |
raise gr.Error(f"Runway failed scene {idx}: {getattr(e, 'task_details', 'No details')}") | |
# Download clip | |
clip_path = workdir / f"scene_{idx}.mp4" | |
r = runway_client._session.get(video_url, stream=True) | |
with open(clip_path, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
if chunk: f.write(chunk) | |
clip_paths.append(clip_path) | |
intermediates.append(clip_path) | |
_log(f"Downloaded scene {idx} -> {clip_path}") | |
# STEP 6: Concatenate video | |
progress(0.90, desc="โ๏ธ Concatenating scenes ...") | |
list_file = workdir / "clips.txt" | |
with open(list_file, 'w') as lf: | |
for p in clip_paths: | |
lf.write(f"file '{p}'\n") | |
intermediates.append(list_file) | |
concat_path = workdir / f"concat_{job_id}.mp4" | |
subprocess.run([ | |
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', str(list_file), '-c', 'copy', str(concat_path), '-y' | |
], check=True) | |
intermediates.append(concat_path) | |
# STEP 7: Mux audio | |
final_path = workdir / f"final_{job_id}.mp4" | |
progress(0.95, desc="๐ Merging audio ...") | |
subprocess.run([ | |
'ffmpeg', '-i', str(concat_path), '-i', str(audio_path), '-c:v', 'copy', '-c:a', 'aac', '-shortest', str(final_path), '-y' | |
], check=True) | |
progress(1.0, desc="โ Done") | |
_log(f"FINAL VIDEO: {final_path}") | |
return str(final_path) | |
except Exception as e: | |
_log(f"JOB {job_id} FAILED: {e}") | |
raise gr.Error(f"An error occurred: {e}") | |
finally: | |
# Keep workdir for debugging; comment out next block to remove entire directory | |
pass | |
# --- 5. GRADIO UI --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# ๐ค My Personal AI Video Studio (Gen-4 Turbo)") | |
gr.Markdown("Enter a topic and (optionally) upload a keyframe image. Without an image, a simple placeholder is generated.") | |
with gr.Row(): | |
topic_input = gr.Textbox(label="Video Topic", placeholder="e.g., 'The history of coffee'", scale=3) | |
image_input = gr.Image(label="Keyframe Image (optional)", type="filepath") | |
with gr.Row(): | |
generate_button = gr.Button("Generate Video", variant="primary") | |
with gr.Row(): | |
video_output = gr.Video(label="Generated Video") | |
generate_button.click( | |
fn=generate_video_from_topic, | |
inputs=[topic_input, image_input], | |
outputs=video_output | |
) | |
gr.Markdown("---\n### Tips\n- Supply a consistent character/style image for more coherent scenes.\n- For pure *text-only* generation, switch to a Gen-3 Alpha text-to-video flow (not implemented here).\n- Replace placeholder keyframe logic with a real T2I model for higher quality.") | |
if __name__ == "__main__": | |
demo.launch() |