Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import google.generativeai as genai | |
# from elevenlabs.client import ElevenLabs # Temporarily disabled | |
from tavily import TavilyClient | |
import requests | |
import subprocess | |
import json | |
import time | |
import random | |
# --- 1. CONFIGURE API KEYS FROM HUGGING FACE SECRETS --- | |
try: | |
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"]) | |
RUNWAY_API_KEY = os.environ["RUNWAY_API_KEY"] | |
except KeyError as e: | |
if 'ELEVENLABS_API_KEY' not in str(e): | |
raise ValueError(f"API Key Error: Please set the {e} secret in your Hugging Face Space settings.") | |
# --- 2. DEFINE API ENDPOINTS AND HEADERS --- | |
RUNWAY_API_URL = "https://api.runwayml.com/v2" | |
RUNWAY_HEADERS = { | |
"Authorization": f"Bearer {RUNWAY_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
# --- 3. THE CORE VIDEO GENERATION FUNCTION --- | |
def generate_video_from_topic(topic_prompt, progress=gr.Progress(track_tqdm=True)): | |
job_id = f"{int(time.time())}_{random.randint(1000, 9999)}" | |
print(f"--- Starting New Job: {job_id} for topic: '{topic_prompt}' ---") | |
intermediate_files = [] | |
try: | |
# STEP 1: RESEARCH (Tavily) | |
progress(0.1, desc="π Researching topic with Tavily...") | |
facts = "No research data available." | |
try: | |
research_results = tavily_client.search(query=f"Key facts and interesting points about {topic_prompt}", search_depth="basic") | |
if research_results and 'results' in research_results: | |
facts = "\n".join([res['content'] for res in research_results['results']]) | |
except Exception as e: | |
print(f"Tavily API failed: {e}. Proceeding without research.") | |
# STEP 2: SCRIPT & SCENE PROMPTS (Gemini) | |
progress(0.2, desc="βοΈ Writing script with Gemini...") | |
gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
prompt = f""" | |
You are a creative director for viral short-form videos. Based on the topic '{topic_prompt}' and research, create a script. | |
Your output MUST be a valid JSON object with "narration_script" (string) and "scene_prompts" (a list of 4 detailed, cinematic prompts). | |
""" | |
response = gemini_model.generate_content(prompt) | |
try: | |
cleaned_text = response.text.strip().replace("```json", "").replace("```", "") | |
script_data = json.loads(cleaned_text) | |
narration = script_data['narration_script'] | |
scene_prompts = script_data['scene_prompts'] | |
except (json.JSONDecodeError, KeyError) as e: | |
raise gr.Error(f"Gemini did not return valid JSON. Error: {e}. Response was: {response.text}") | |
# STEP 3: MOCK VOICE OVER | |
progress(0.3, desc="ποΈ MOCKING voiceover to save credits...") | |
audio_path = f"audio_{job_id}.mp3" | |
intermediate_files.append(audio_path) | |
narration_duration = len(narration.split()) / 2.5 | |
subprocess.run([ | |
'ffmpeg', '-f', 'lavfi', '-i', f'anullsrc=r=44100:cl=mono', | |
'-t', str(narration_duration), '-q:a', '9', '-acodec', 'libmp3lame', | |
audio_path, '-y' | |
], check=True) | |
print(f"MOCK audio file saved (no credits used): {audio_path}") | |
# STEP 4: VISUALS (Runway) | |
video_clip_paths = [] | |
for i, scene_prompt in enumerate(scene_prompts): | |
progress(0.4 + (i * 0.12), desc=f"π¬ Generating video scene {i+1}/{len(scene_prompts)}...") | |
# !!!!!!!!!!! THE FINAL, FINAL, VERIFIED RUNWAY FIX !!!!!!!!!!! | |
# 1. The payload structure is now correct for V2 | |
runway_payload = { | |
"task_type": "gen2", | |
"options": { | |
"text_prompt": scene_prompt | |
} | |
} | |
# 2. The POST request goes to the base /tasks endpoint | |
post_response = requests.post(f"{RUNWAY_API_URL}/tasks", headers=RUNWAY_HEADERS, json=runway_payload) | |
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | |
if post_response.status_code != 200: | |
raise gr.Error(f"Runway API Error (start job): {post_response.status_code} - {post_response.text}") | |
task_id = post_response.json().get("uuid") | |
if not task_id: | |
raise gr.Error(f"Runway API did not return a task UUID. Response: {post_response.json()}") | |
video_url = None | |
for _ in range(60): | |
get_response = requests.get(f"{RUNWAY_API_URL}/tasks/{task_id}", headers=RUNWAY_HEADERS) | |
status_details = get_response.json() | |
status = status_details.get("status") | |
if status == "succeeded": | |
video_url = status_details.get("outputs", {}).get("video") | |
break | |
elif status == "failed": | |
raise gr.Error(f"Runway job failed. Details: {status_details.get('error_message')}") | |
print(f"Scene {i+1} status: {status}. Waiting 10 seconds...") | |
time.sleep(10) | |
if not video_url: | |
raise gr.Error(f"Runway job timed out for scene {i+1}.") | |
clip_path = f"scene_{i+1}_{job_id}.mp4" | |
intermediate_files.append(clip_path) | |
video_clip_paths.append(clip_path) | |
video_response = requests.get(video_url, stream=True) | |
with open(clip_path, "wb") as f: | |
for chunk in video_response.iter_content(chunk_size=1024): | |
if chunk: f.write(chunk) | |
print(f"Video clip saved: {clip_path}") | |
# STEP 5: STITCHING (FFmpeg) | |
progress(0.9, desc="βοΈ Assembling final video with FFmpeg...") | |
file_list_path = f"file_list_{job_id}.txt" | |
intermediate_files.append(file_list_path) | |
with open(file_list_path, "w") as f: | |
for clip in video_clip_paths: | |
f.write(f"file '{clip}'\n") | |
combined_video_path = f"combined_video_{job_id}.mp4" | |
intermediate_files.append(combined_video_path) | |
subprocess.run(['ffmpeg', '-f', 'concat', '-safe', '0', '-i', file_list_path, '-c', 'copy', combined_video_path, '-y'], check=True) | |
final_video_path = f"final_video_{job_id}.mp4" | |
subprocess.run(['ffmpeg', '-i', combined_video_path, '-i', audio_path, '-c:v', 'copy', '-c:a', 'aac', '-shortest', final_video_path, '-y'], check=True) | |
print(f"Final video created at: {final_video_path}") | |
progress(1.0, desc="β Done!") | |
return final_video_path | |
except Exception as e: | |
print(f"--- JOB {job_id} FAILED --- \nError: {e}") | |
raise gr.Error(f"An error occurred: {e}") | |
finally: | |
# STEP 6: CLEANUP | |
print("Cleaning up intermediate files...") | |
for file_path in intermediate_files: | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
print(f"Removed: {file_path}") | |
# --- 4. CREATE AND LAUNCH THE GRADIO INTERFACE --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π€ My Personal AI Video Studio") | |
gr.Markdown("Enter a topic to generate a short-form video. This private tool is used for fulfilling freelance orders.") | |
with gr.Row(): | |
topic_input = gr.Textbox(label="Video Topic", placeholder="e.g., 'The history of coffee'", scale=3) | |
generate_button = gr.Button("Generate Video", variant="primary", scale=1) | |
with gr.Row(): | |
video_output = gr.Video(label="Generated Video") | |
generate_button.click(fn=generate_video_from_topic, inputs=topic_input, outputs=video_output) | |
gr.Markdown("--- \n ### Examples of Good Topics:\n - A product: 'The new waterproof Chrono-Watch X1'\n - A concept: 'The science of sleep'") | |
if __name__ == "__main__": | |
demo.launch() |