Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -5,6 +5,7 @@ import time
|
|
5 |
import random
|
6 |
import subprocess
|
7 |
from pathlib import Path
|
|
|
8 |
|
9 |
import google.generativeai as genai
|
10 |
from tavily import TavilyClient
|
@@ -12,18 +13,18 @@ from runwayml import RunwayML, TaskFailedError
|
|
12 |
from PIL import Image, ImageDraw, ImageFont
|
13 |
|
14 |
# =============================================================
|
15 |
-
# AI VIDEO STUDIO (Gen-4 Turbo Image→Video
|
16 |
# =============================================================
|
17 |
-
#
|
18 |
-
#
|
19 |
-
#
|
20 |
-
#
|
21 |
-
#
|
22 |
-
#
|
23 |
-
#
|
24 |
-
#
|
25 |
-
#
|
26 |
-
#
|
27 |
# =============================================================
|
28 |
|
29 |
# --- 1. CONFIGURE API KEYS ---
|
@@ -36,13 +37,15 @@ except KeyError as e:
|
|
36 |
raise ValueError(f"API Key Error: Please set the {e} secret in your environment.")
|
37 |
|
38 |
# --- 2. CONSTANTS / SETTINGS ---
|
39 |
-
GEN4_MODEL = "gen4_turbo" # adjust to "gen4"
|
40 |
SCENE_COUNT = 4
|
41 |
-
SCENE_DURATION_SECONDS = 5
|
42 |
-
VIDEO_RATIO = "1280:720"
|
43 |
-
WORDS_PER_SEC = 2.5
|
44 |
-
MAX_POLL_SECONDS = 180
|
45 |
POLL_INTERVAL = 5
|
|
|
|
|
46 |
|
47 |
# --- 3. UTILITIES ---
|
48 |
def _log(msg: str):
|
@@ -50,20 +53,22 @@ def _log(msg: str):
|
|
50 |
|
51 |
|
52 |
def create_placeholder_image(text: str, path: Path, size=(1280, 720)) -> Path:
|
53 |
-
"""Create a simple placeholder keyframe if user supplies none.
|
54 |
-
You can later replace this with a real text-to-image generation step."""
|
55 |
img = Image.new("RGB", size, (10, 10, 10))
|
56 |
draw = ImageDraw.Draw(img)
|
57 |
try:
|
58 |
font = ImageFont.truetype("DejaVuSans-Bold.ttf", 60)
|
59 |
except Exception:
|
60 |
font = ImageFont.load_default()
|
61 |
-
|
|
|
|
|
62 |
line = ""
|
63 |
-
for word in
|
64 |
test = f"{line} {word}".strip()
|
65 |
-
if len(test) > 28:
|
66 |
-
|
|
|
67 |
line = word
|
68 |
else:
|
69 |
line = test
|
@@ -100,12 +105,82 @@ def poll_runway_task(task_obj, max_seconds=MAX_POLL_SECONDS, interval=POLL_INTER
|
|
100 |
raise TimeoutError(f"Runway task timed out after {max_seconds}s (status={status})")
|
101 |
time.sleep(interval)
|
102 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
103 |
# --- 4. CORE PIPELINE ---
|
|
|
104 |
def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress(track_tqdm=True)):
|
105 |
job_id = f"{int(time.time())}_{random.randint(1000, 9999)}"
|
106 |
_log(f"Starting job {job_id} :: topic='{topic_prompt}'")
|
107 |
|
108 |
-
# Working directory for this job
|
109 |
workdir = Path(f"job_{job_id}")
|
110 |
workdir.mkdir(exist_ok=True)
|
111 |
|
@@ -121,32 +196,15 @@ def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress
|
|
121 |
search_depth="basic"
|
122 |
)
|
123 |
if research_results and 'results' in research_results:
|
124 |
-
facts = "
|
|
|
125 |
except Exception as e:
|
126 |
_log(f"Tavily failed: {e}")
|
127 |
|
128 |
# STEP 2: Script
|
129 |
progress(0.15, desc="✍️ Writing script ...")
|
130 |
-
|
131 |
-
|
132 |
-
You are a creative director for viral short-form videos.
|
133 |
-
Topic: {topic_prompt}
|
134 |
-
Research (may contain noise):\n{facts}\n\n
|
135 |
-
Produce JSON with keys:
|
136 |
-
narration_script: overall narration (concise, energetic, ~85-110 words per 5 scenes). Maintain coherence.
|
137 |
-
scene_prompts: list of {SCENE_COUNT} *visual* prompts. Each should be cinematic, 1-2 sentences, include style / camera / lighting cues and keep characters consistent.
|
138 |
-
Return ONLY JSON.
|
139 |
-
"""
|
140 |
-
response = gemini_model.generate_content(script_prompt)
|
141 |
-
try:
|
142 |
-
cleaned = response.text.strip().replace("```json", "").replace("```", "")
|
143 |
-
data = json.loads(cleaned)
|
144 |
-
narration = data['narration_script']
|
145 |
-
scene_prompts = data['scene_prompts']
|
146 |
-
if len(scene_prompts) != SCENE_COUNT:
|
147 |
-
raise ValueError(f"Expected {SCENE_COUNT} scene prompts, got {len(scene_prompts)}")
|
148 |
-
except Exception as e:
|
149 |
-
raise gr.Error(f"Gemini JSON parse error: {e}. Raw: {response.text[:400]}")
|
150 |
|
151 |
# STEP 3: Mock VO
|
152 |
progress(0.25, desc="🎙️ Generating mock VO ...")
|
@@ -154,7 +212,7 @@ def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress
|
|
154 |
generate_mock_voiceover(narration, audio_path)
|
155 |
intermediates.append(audio_path)
|
156 |
|
157 |
-
# STEP 4:
|
158 |
progress(0.30, desc="🖼️ Preparing keyframe image ...")
|
159 |
if keyframe_image is not None:
|
160 |
keyframe_path = Path(keyframe_image)
|
@@ -164,15 +222,15 @@ def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress
|
|
164 |
intermediates.append(keyframe_path)
|
165 |
|
166 |
# STEP 5: Generate scenes
|
167 |
-
clip_paths = []
|
168 |
for idx, scene_prompt in enumerate(scene_prompts, start=1):
|
169 |
base_progress = 0.30 + (idx * 0.12)
|
170 |
progress(min(base_progress, 0.85), desc=f"🎬 Scene {idx}/{len(scene_prompts)} ...")
|
171 |
-
_log(f"Submitting scene {idx}: {scene_prompt[:
|
172 |
try:
|
173 |
task = runway_client.image_to_video.create(
|
174 |
model=GEN4_MODEL,
|
175 |
-
prompt_image=str(keyframe_path), # required
|
176 |
prompt_text=scene_prompt,
|
177 |
duration=SCENE_DURATION_SECONDS,
|
178 |
ratio=VIDEO_RATIO,
|
@@ -182,12 +240,12 @@ def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress
|
|
182 |
except TaskFailedError as e:
|
183 |
raise gr.Error(f"Runway failed scene {idx}: {getattr(e, 'task_details', 'No details')}")
|
184 |
|
185 |
-
# Download clip
|
186 |
clip_path = workdir / f"scene_{idx}.mp4"
|
187 |
r = runway_client._session.get(video_url, stream=True)
|
188 |
with open(clip_path, 'wb') as f:
|
189 |
for chunk in r.iter_content(chunk_size=8192):
|
190 |
-
if chunk:
|
|
|
191 |
clip_paths.append(clip_path)
|
192 |
intermediates.append(clip_path)
|
193 |
_log(f"Downloaded scene {idx} -> {clip_path}")
|
@@ -197,7 +255,8 @@ def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress
|
|
197 |
list_file = workdir / "clips.txt"
|
198 |
with open(list_file, 'w') as lf:
|
199 |
for p in clip_paths:
|
200 |
-
lf.write(f"file '{p}'
|
|
|
201 |
intermediates.append(list_file)
|
202 |
|
203 |
concat_path = workdir / f"concat_{job_id}.mp4"
|
@@ -221,7 +280,7 @@ def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress
|
|
221 |
_log(f"JOB {job_id} FAILED: {e}")
|
222 |
raise gr.Error(f"An error occurred: {e}")
|
223 |
finally:
|
224 |
-
# Keep workdir for debugging;
|
225 |
pass
|
226 |
|
227 |
# --- 5. GRADIO UI ---
|
@@ -243,7 +302,12 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
243 |
outputs=video_output
|
244 |
)
|
245 |
|
246 |
-
gr.Markdown("
|
|
|
|
|
|
|
|
|
|
|
247 |
|
248 |
if __name__ == "__main__":
|
249 |
-
demo.launch()
|
|
|
5 |
import random
|
6 |
import subprocess
|
7 |
from pathlib import Path
|
8 |
+
from typing import List, Any
|
9 |
|
10 |
import google.generativeai as genai
|
11 |
from tavily import TavilyClient
|
|
|
13 |
from PIL import Image, ImageDraw, ImageFont
|
14 |
|
15 |
# =============================================================
|
16 |
+
# AI VIDEO STUDIO (Gen-4 Turbo Image→Video) – Robust Version
|
17 |
# =============================================================
|
18 |
+
# Improvements in this revision:
|
19 |
+
# - Normalizes narration if model returns list (was causing list.split() AttributeError).
|
20 |
+
# - Defensive checks & type coercion for scene_prompts.
|
21 |
+
# - Safer JSON extraction (optionally attempts a JSON substring if extra text present).
|
22 |
+
# - Fixed accidental newline handling for 'facts'.
|
23 |
+
# - Added explicit JSON enforcement hint to Gemini.
|
24 |
+
# - Added helper to truncate overly long narration.
|
25 |
+
# - Added more granular progress steps & logging.
|
26 |
+
# - Added retry for Gemini (transient failures) and for Runway polling.
|
27 |
+
# - Added validate_scene_prompts() to guarantee list[str] length == SCENE_COUNT.
|
28 |
# =============================================================
|
29 |
|
30 |
# --- 1. CONFIGURE API KEYS ---
|
|
|
37 |
raise ValueError(f"API Key Error: Please set the {e} secret in your environment.")
|
38 |
|
39 |
# --- 2. CONSTANTS / SETTINGS ---
|
40 |
+
GEN4_MODEL = "gen4_turbo" # adjust to "gen4" for non‑turbo
|
41 |
SCENE_COUNT = 4
|
42 |
+
SCENE_DURATION_SECONDS = 5 # 5 or 10 supported
|
43 |
+
VIDEO_RATIO = "1280:720" # 16:9
|
44 |
+
WORDS_PER_SEC = 2.5
|
45 |
+
MAX_POLL_SECONDS = 180 # per scene
|
46 |
POLL_INTERVAL = 5
|
47 |
+
GEMINI_MAX_RETRIES = 2
|
48 |
+
MAX_NARRATION_WORDS = 520 # safeguard length
|
49 |
|
50 |
# --- 3. UTILITIES ---
|
51 |
def _log(msg: str):
|
|
|
53 |
|
54 |
|
55 |
def create_placeholder_image(text: str, path: Path, size=(1280, 720)) -> Path:
|
56 |
+
"""Create a simple placeholder keyframe if user supplies none."""
|
|
|
57 |
img = Image.new("RGB", size, (10, 10, 10))
|
58 |
draw = ImageDraw.Draw(img)
|
59 |
try:
|
60 |
font = ImageFont.truetype("DejaVuSans-Bold.ttf", 60)
|
61 |
except Exception:
|
62 |
font = ImageFont.load_default()
|
63 |
+
# naive wrap
|
64 |
+
words = text.split()
|
65 |
+
wrapped: List[str] = []
|
66 |
line = ""
|
67 |
+
for word in words:
|
68 |
test = f"{line} {word}".strip()
|
69 |
+
if len(test) > 28:
|
70 |
+
if line:
|
71 |
+
wrapped.append(line)
|
72 |
line = word
|
73 |
else:
|
74 |
line = test
|
|
|
105 |
raise TimeoutError(f"Runway task timed out after {max_seconds}s (status={status})")
|
106 |
time.sleep(interval)
|
107 |
|
108 |
+
|
109 |
+
def extract_json_block(text: str) -> str:
|
110 |
+
"""Attempt to isolate a JSON object in a noisy response."""
|
111 |
+
first = text.find('{')
|
112 |
+
last = text.rfind('}')
|
113 |
+
if first != -1 and last != -1 and last > first:
|
114 |
+
candidate = text[first:last+1]
|
115 |
+
return candidate
|
116 |
+
return text
|
117 |
+
|
118 |
+
|
119 |
+
def coerce_narration(narr: Any) -> str:
|
120 |
+
if isinstance(narr, list):
|
121 |
+
narr = ' '.join(str(x) for x in narr)
|
122 |
+
if not isinstance(narr, str):
|
123 |
+
narr = str(narr)
|
124 |
+
words = narr.split()
|
125 |
+
if len(words) > MAX_NARRATION_WORDS:
|
126 |
+
narr = ' '.join(words[:MAX_NARRATION_WORDS])
|
127 |
+
return narr.strip()
|
128 |
+
|
129 |
+
|
130 |
+
def validate_scene_prompts(sp: Any) -> List[str]:
|
131 |
+
if not isinstance(sp, list):
|
132 |
+
sp = [sp]
|
133 |
+
flat: List[str] = []
|
134 |
+
for item in sp:
|
135 |
+
if isinstance(item, list):
|
136 |
+
flat.extend(str(x) for x in item)
|
137 |
+
else:
|
138 |
+
flat.append(str(item))
|
139 |
+
# Trim or pad
|
140 |
+
if len(flat) < SCENE_COUNT:
|
141 |
+
flat.extend([flat[-1]] * (SCENE_COUNT - len(flat)))
|
142 |
+
if len(flat) > SCENE_COUNT:
|
143 |
+
flat = flat[:SCENE_COUNT]
|
144 |
+
return [s.strip() for s in flat]
|
145 |
+
|
146 |
+
|
147 |
+
def call_gemini_script(topic: str, facts: str) -> tuple[str, List[str]]:
|
148 |
+
gemini_model = genai.GenerativeModel('gemini-1.5-flash')
|
149 |
+
script_prompt = f"""
|
150 |
+
You are a creative director for viral short-form educational videos.
|
151 |
+
Topic: {topic}
|
152 |
+
Research (may contain noise):
|
153 |
+
{facts}
|
154 |
+
|
155 |
+
|
156 |
+
STRICT JSON OUTPUT ONLY. Do not add commentary or markdown fences.
|
157 |
+
Schema: {{"narration_script": string, "scene_prompts": list[{SCENE_COUNT}]}}
|
158 |
+
narration_script rules: energetic, cohesive, <= {MAX_NARRATION_WORDS} words total, no scene numbers.
|
159 |
+
scene_prompts: exactly {SCENE_COUNT} cinematic visual descriptions (1-2 sentences each) including style, camera, lighting.
|
160 |
+
Return JSON ONLY.
|
161 |
+
"""
|
162 |
+
last_error = None
|
163 |
+
for attempt in range(GEMINI_MAX_RETRIES):
|
164 |
+
try:
|
165 |
+
response = gemini_model.generate_content(script_prompt)
|
166 |
+
raw = response.text.strip()
|
167 |
+
raw = raw.replace('```json', '').replace('```', '').strip()
|
168 |
+
raw = extract_json_block(raw)
|
169 |
+
data = json.loads(raw)
|
170 |
+
narration = coerce_narration(data.get('narration_script', ''))
|
171 |
+
scene_prompts = validate_scene_prompts(data.get('scene_prompts', []))
|
172 |
+
return narration, scene_prompts
|
173 |
+
except Exception as e:
|
174 |
+
last_error = e
|
175 |
+
time.sleep(1 + attempt)
|
176 |
+
raise ValueError(f"Gemini JSON parse failed after {GEMINI_MAX_RETRIES} attempts: {last_error}")
|
177 |
+
|
178 |
# --- 4. CORE PIPELINE ---
|
179 |
+
|
180 |
def generate_video_from_topic(topic_prompt, keyframe_image, progress=gr.Progress(track_tqdm=True)):
|
181 |
job_id = f"{int(time.time())}_{random.randint(1000, 9999)}"
|
182 |
_log(f"Starting job {job_id} :: topic='{topic_prompt}'")
|
183 |
|
|
|
184 |
workdir = Path(f"job_{job_id}")
|
185 |
workdir.mkdir(exist_ok=True)
|
186 |
|
|
|
196 |
search_depth="basic"
|
197 |
)
|
198 |
if research_results and 'results' in research_results:
|
199 |
+
facts = "
|
200 |
+
".join(res.get('content', '') for res in research_results['results'])
|
201 |
except Exception as e:
|
202 |
_log(f"Tavily failed: {e}")
|
203 |
|
204 |
# STEP 2: Script
|
205 |
progress(0.15, desc="✍️ Writing script ...")
|
206 |
+
narration, scene_prompts = call_gemini_script(topic_prompt, facts)
|
207 |
+
_log(f"Narration words: {len(narration.split())}; scenes: {len(scene_prompts)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
208 |
|
209 |
# STEP 3: Mock VO
|
210 |
progress(0.25, desc="🎙️ Generating mock VO ...")
|
|
|
212 |
generate_mock_voiceover(narration, audio_path)
|
213 |
intermediates.append(audio_path)
|
214 |
|
215 |
+
# STEP 4: Keyframe image (required for Gen-4 image_to_video)
|
216 |
progress(0.30, desc="🖼️ Preparing keyframe image ...")
|
217 |
if keyframe_image is not None:
|
218 |
keyframe_path = Path(keyframe_image)
|
|
|
222 |
intermediates.append(keyframe_path)
|
223 |
|
224 |
# STEP 5: Generate scenes
|
225 |
+
clip_paths: List[Path] = []
|
226 |
for idx, scene_prompt in enumerate(scene_prompts, start=1):
|
227 |
base_progress = 0.30 + (idx * 0.12)
|
228 |
progress(min(base_progress, 0.85), desc=f"🎬 Scene {idx}/{len(scene_prompts)} ...")
|
229 |
+
_log(f"Submitting scene {idx}: {scene_prompt[:100]} ...")
|
230 |
try:
|
231 |
task = runway_client.image_to_video.create(
|
232 |
model=GEN4_MODEL,
|
233 |
+
prompt_image=str(keyframe_path), # required
|
234 |
prompt_text=scene_prompt,
|
235 |
duration=SCENE_DURATION_SECONDS,
|
236 |
ratio=VIDEO_RATIO,
|
|
|
240 |
except TaskFailedError as e:
|
241 |
raise gr.Error(f"Runway failed scene {idx}: {getattr(e, 'task_details', 'No details')}")
|
242 |
|
|
|
243 |
clip_path = workdir / f"scene_{idx}.mp4"
|
244 |
r = runway_client._session.get(video_url, stream=True)
|
245 |
with open(clip_path, 'wb') as f:
|
246 |
for chunk in r.iter_content(chunk_size=8192):
|
247 |
+
if chunk:
|
248 |
+
f.write(chunk)
|
249 |
clip_paths.append(clip_path)
|
250 |
intermediates.append(clip_path)
|
251 |
_log(f"Downloaded scene {idx} -> {clip_path}")
|
|
|
255 |
list_file = workdir / "clips.txt"
|
256 |
with open(list_file, 'w') as lf:
|
257 |
for p in clip_paths:
|
258 |
+
lf.write(f"file '{p}'
|
259 |
+
")
|
260 |
intermediates.append(list_file)
|
261 |
|
262 |
concat_path = workdir / f"concat_{job_id}.mp4"
|
|
|
280 |
_log(f"JOB {job_id} FAILED: {e}")
|
281 |
raise gr.Error(f"An error occurred: {e}")
|
282 |
finally:
|
283 |
+
# Keep workdir for debugging; remove manually when satisfied.
|
284 |
pass
|
285 |
|
286 |
# --- 5. GRADIO UI ---
|
|
|
302 |
outputs=video_output
|
303 |
)
|
304 |
|
305 |
+
gr.Markdown("---
|
306 |
+
### Tips
|
307 |
+
- Supply a consistent character/style image for more coherent scenes.
|
308 |
+
- Gen-4 requires an input image + (optional) text prompt; pure text alone is not supported in this flow.
|
309 |
+
- For pure text-to-video consider a Gen-3 text model.
|
310 |
+
- Replace placeholder keyframe logic with a real T2I model for higher quality.")
|
311 |
|
312 |
if __name__ == "__main__":
|
313 |
+
demo.launch()
|