Spaces:
Sleeping
Sleeping
File size: 8,974 Bytes
06a6e91 eacc26a 6123d43 06a6e91 b1fb753 b8aa7c5 4eada0f b8aa7c5 b1fb753 6a9e2b5 a361f34 06a6e91 eacc26a 06a6e91 eacc26a 836daad 2cde650 eacc26a b8aa7c5 88d63cf eacc26a 88d63cf 836daad eacc26a 2cde650 eacc26a a6b51c5 eacc26a 836daad eacc26a b8aa7c5 eacc26a 88d63cf 836daad eacc26a 57e4050 b8aa7c5 4eada0f b8aa7c5 4eada0f b8aa7c5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
import gradio as gr
import os
from lumaai import AsyncLumaAI
import asyncio
import aiohttp
async def get_camera_motions():
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY environment variable is not set.")
client = AsyncLumaAI(auth_token=api_key)
try:
motions = await client.generations.camera_motion.list()
return [str(motion) for motion in motions] # Convert each motion to a string
except Exception as e:
print(f"Error fetching camera motions: {str(e)}")
return []
async def generate_video(client, prompt, loop=False, aspect_ratio="16:9", camera_motion=None, extend_id=None, reverse_extend_id=None, interpolate_ids=None, progress=gr.Progress()):
generation_params = {
"prompt": prompt,
"loop": loop,
"aspect_ratio": aspect_ratio
}
if camera_motion:
generation_params["prompt"] += f" {camera_motion}"
if extend_id:
generation_params["keyframes"] = {
"frame0": {
"type": "generation",
"id": extend_id
}
}
elif reverse_extend_id:
generation_params["keyframes"] = {
"frame1": {
"type": "generation",
"id": reverse_extend_id
}
}
elif interpolate_ids:
generation_params["keyframes"] = {
"frame0": {
"type": "generation",
"id": interpolate_ids[0]
},
"frame1": {
"type": "generation",
"id": interpolate_ids[1]
}
}
progress(0, desc="Initiating video generation...")
generation = await client.generations.create(**generation_params)
progress(0.1, desc="Video generation started. Waiting for completion...")
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
status = await client.generations.get(id=generation.id)
if status.state == "completed":
break
elif status.state == "failed":
raise Exception("Video generation failed")
# Update progress based on time elapsed (assuming 60 seconds total)
elapsed_time = asyncio.get_event_loop().time() - start_time
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9)
progress(progress_value, desc="Generating video...")
await asyncio.sleep(5)
progress(0.9, desc="Downloading generated video...")
# Download the video
video_url = status.assets.video
async with aiohttp.ClientSession() as session:
async with session.get(video_url) as resp:
if resp.status == 200:
file_name = f"luma_ai_generated_{generation.id}.mp4"
with open(file_name, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
progress(1.0, desc="Video generation complete!")
return file_name
async def text_to_video(prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2, progress=gr.Progress()):
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY environment variable is not set.")
client = AsyncLumaAI(auth_token=api_key)
try:
interpolate_ids = None
if interpolate_id1 and interpolate_id2:
interpolate_ids = [interpolate_id1, interpolate_id2]
video_path = await generate_video(
client, prompt, loop, aspect_ratio, camera_motion,
extend_id, reverse_extend_id, interpolate_ids, progress
)
return video_path, ""
except Exception as e:
return None, f"An error occurred: {str(e)}"
async def image_to_video(prompt, image_url, loop, aspect_ratio, camera_motion, progress=gr.Progress()):
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY environment variable is not set.")
client = AsyncLumaAI(auth_token=api_key)
try:
generation_params = {
"prompt": prompt + (f" {camera_motion}" if camera_motion else ""),
"loop": loop,
"aspect_ratio": aspect_ratio,
"keyframes": {
"frame0": {
"type": "image",
"url": image_url
}
}
}
progress(0, desc="Initiating video generation from image...")
generation = await client.generations.create(**generation_params)
progress(0.1, desc="Video generation started. Waiting for completion...")
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
status = await client.generations.get(id=generation.id)
if status.state == "completed":
break
elif status.state == "failed":
raise Exception("Video generation failed")
# Update progress based on time elapsed (assuming 60 seconds total)
elapsed_time = asyncio.get_event_loop().time() - start_time
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9)
progress(progress_value, desc="Generating video...")
await asyncio.sleep(5)
progress(0.9, desc="Downloading generated video...")
# Download the video
video_url = status.assets.video
async with aiohttp.ClientSession() as session:
async with session.get(video_url) as resp:
if resp.status == 200:
file_name = f"luma_ai_generated_{generation.id}.mp4"
with open(file_name, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
progress(1.0, desc="Video generation complete!")
return file_name, ""
except Exception as e:
return None, f"An error occurred: {str(e)}"
with gr.Blocks() as demo:
gr.Markdown("# Luma AI Text-to-Video Demo")
with gr.Tab("Text to Video"):
prompt = gr.Textbox(label="Prompt")
generate_btn = gr.Button("Generate Video")
video_output = gr.Video(label="Generated Video")
error_output = gr.Textbox(label="Error Messages", visible=True)
with gr.Accordion("Advanced Options", open=False):
loop = gr.Checkbox(label="Loop", value=False)
aspect_ratio = gr.Dropdown(label="Aspect Ratio", choices=["16:9", "1:1", "9:16", "4:3", "3:4"], value="16:9")
camera_motion = gr.Dropdown(label="Camera Motion", choices=[])
extend_id = gr.Textbox(label="Extend Video ID (optional)")
reverse_extend_id = gr.Textbox(label="Reverse Extend Video ID (optional)")
with gr.Row():
interpolate_id1 = gr.Textbox(label="Interpolate Video ID 1 (optional)")
interpolate_id2 = gr.Textbox(label="Interpolate Video ID 2 (optional)")
generate_btn.click(
text_to_video,
inputs=[prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2],
outputs=[video_output, error_output]
)
with gr.Tab("Image to Video"):
img_prompt = gr.Textbox(label="Prompt")
img_url = gr.Textbox(label="Image URL")
img_generate_btn = gr.Button("Generate Video from Image")
img_video_output = gr.Video(label="Generated Video")
img_error_output = gr.Textbox(label="Error Messages", visible=True)
with gr.Accordion("Advanced Options", open=False):
img_loop = gr.Checkbox(label="Loop", value=False)
img_aspect_ratio = gr.Dropdown(label="Aspect Ratio", choices=["16:9", "1:1", "9:16", "4:3", "3:4"], value="16:9")
img_camera_motion = gr.Dropdown(label="Camera Motion", choices=[])
img_generate_btn.click(
image_to_video,
inputs=[img_prompt, img_url, img_loop, img_aspect_ratio, img_camera_motion],
outputs=[img_video_output, img_error_output]
)
async def update_camera_motions():
try:
motions = await get_camera_motions()
return {camera_motion: gr.update(choices=motions), img_camera_motion: gr.update(choices=motions)}
except Exception as e:
print(f"Error updating camera motions: {str(e)}")
return {camera_motion: gr.update(choices=[]), img_camera_motion: gr.update(choices=[])}
demo.load(update_camera_motions)
demo.queue().launch(debug=True) |