lumapi2 / app.py
seawolf2357's picture
Update app.py
297ade1 verified
raw
history blame
10.8 kB
import gradio as gr
import os
from lumaai import AsyncLumaAI
import asyncio
import aiohttp
from transformers import pipeline
# λ²ˆμ—­ νŒŒμ΄ν”„λΌμΈ μ΄ˆκΈ°ν™”
translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ko-en")
async def get_camera_motions():
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY environment variable is not set.")
client = AsyncLumaAI(auth_token=api_key)
try:
motions = await client.generations.camera_motion.list()
return [str(motion) for motion in motions] # Convert each motion to a string
except Exception as e:
print(f"Error fetching camera motions: {str(e)}")
return []
async def generate_video(client, prompt, loop=False, aspect_ratio="16:9", camera_motion=None, extend_id=None, reverse_extend_id=None, interpolate_ids=None, progress=gr.Progress()):
generation_params = {
"prompt": prompt,
"loop": loop,
"aspect_ratio": aspect_ratio
}
if camera_motion:
generation_params["prompt"] += f" {camera_motion}"
if extend_id:
generation_params["keyframes"] = {
"frame0": {
"type": "generation",
"id": extend_id
}
}
elif reverse_extend_id:
generation_params["keyframes"] = {
"frame1": {
"type": "generation",
"id": reverse_extend_id
}
}
elif interpolate_ids:
generation_params["keyframes"] = {
"frame0": {
"type": "generation",
"id": interpolate_ids[0]
},
"frame1": {
"type": "generation",
"id": interpolate_ids[1]
}
}
progress(0, desc="λΉ„λ””μ˜€ 생성 μ‹œμž‘ 쀑...")
generation = await client.generations.create(**generation_params)
progress(0.1, desc="λΉ„λ””μ˜€ 생성 μ‹œμž‘λ¨. μ™„λ£Œ λŒ€κΈ° 쀑...")
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
status = await client.generations.get(id=generation.id)
if status.state == "completed":
break
elif status.state == "failed":
raise Exception("λΉ„λ””μ˜€ 생성 μ‹€νŒ¨")
# Update progress based on time elapsed (assuming 60 seconds total)
elapsed_time = asyncio.get_event_loop().time() - start_time
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9)
progress(progress_value, desc="λΉ„λ””μ˜€ 생성 쀑...")
await asyncio.sleep(5)
progress(0.9, desc="μƒμ„±λœ λΉ„λ””μ˜€ λ‹€μš΄λ‘œλ“œ 쀑...")
# Download the video
video_url = status.assets.video
async with aiohttp.ClientSession() as session:
async with session.get(video_url) as resp:
if resp.status == 200:
file_name = f"luma_ai_generated_{generation.id}.mp4"
with open(file_name, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
progress(1.0, desc="λΉ„λ””μ˜€ 생성 μ™„λ£Œ!")
return file_name
async def translate_prompt(prompt):
try:
translated = translator(prompt, max_length=512)[0]['translation_text']
return translated
except Exception as e:
print(f"λ²ˆμ—­ 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
return prompt # λ²ˆμ—­ μ‹€νŒ¨ μ‹œ 원본 ν”„λ‘¬ν”„νŠΈ λ°˜ν™˜
async def text_to_video(prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2, progress=gr.Progress()):
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY ν™˜κ²½ λ³€μˆ˜κ°€ μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
client = AsyncLumaAI(auth_token=api_key)
try:
# ν”„λ‘¬ν”„νŠΈ λ²ˆμ—­
translated_prompt = await translate_prompt(prompt)
print(f"원본 ν”„λ‘¬ν”„νŠΈ: {prompt}")
print(f"λ²ˆμ—­λœ ν”„λ‘¬ν”„νŠΈ: {translated_prompt}")
interpolate_ids = None
if interpolate_id1 and interpolate_id2:
interpolate_ids = [interpolate_id1, interpolate_id2]
video_path = await generate_video(
client, translated_prompt, loop, aspect_ratio, camera_motion,
extend_id, reverse_extend_id, interpolate_ids, progress
)
return video_path, ""
except Exception as e:
return None, f"였λ₯˜ λ°œμƒ: {str(e)}"
async def image_to_video(prompt, image_url, loop, aspect_ratio, camera_motion, progress=gr.Progress()):
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY ν™˜κ²½ λ³€μˆ˜κ°€ μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
client = AsyncLumaAI(auth_token=api_key)
try:
# ν”„λ‘¬ν”„νŠΈ λ²ˆμ—­
translated_prompt = await translate_prompt(prompt)
print(f"원본 ν”„λ‘¬ν”„νŠΈ: {prompt}")
print(f"λ²ˆμ—­λœ ν”„λ‘¬ν”„νŠΈ: {translated_prompt}")
generation_params = {
"prompt": translated_prompt + (f" {camera_motion}" if camera_motion else ""),
"loop": loop,
"aspect_ratio": aspect_ratio,
"keyframes": {
"frame0": {
"type": "image",
"url": image_url
}
}
}
progress(0, desc="μ΄λ―Έμ§€λ‘œλΆ€ν„° λΉ„λ””μ˜€ 생성 μ‹œμž‘ 쀑...")
generation = await client.generations.create(**generation_params)
progress(0.1, desc="λΉ„λ””μ˜€ 생성 μ‹œμž‘λ¨. μ™„λ£Œ λŒ€κΈ° 쀑...")
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
status = await client.generations.get(id=generation.id)
if status.state == "completed":
break
elif status.state == "failed":
raise Exception("λΉ„λ””μ˜€ 생성 μ‹€νŒ¨")
# Update progress based on time elapsed (assuming 60 seconds total)
elapsed_time = asyncio.get_event_loop().time() - start_time
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9)
progress(progress_value, desc="λΉ„λ””μ˜€ 생성 쀑...")
await asyncio.sleep(5)
progress(0.9, desc="μƒμ„±λœ λΉ„λ””μ˜€ λ‹€μš΄λ‘œλ“œ 쀑...")
# Download the video
video_url = status.assets.video
async with aiohttp.ClientSession() as session:
async with session.get(video_url) as resp:
if resp.status == 200:
file_name = f"luma_ai_generated_{generation.id}.mp4"
with open(file_name, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
progress(1.0, desc="λΉ„λ””μ˜€ 생성 μ™„λ£Œ!")
return file_name, ""
except Exception as e:
return None, f"였λ₯˜ λ°œμƒ: {str(e)}"
with gr.Blocks() as demo:
gr.Markdown("# Luma AI ν…μŠ€νŠΈ-λΉ„λ””μ˜€ 생성 데λͺ¨")
with gr.Tab("ν…μŠ€νŠΈλ‘œ λΉ„λ””μ˜€ λ§Œλ“€κΈ°"):
prompt = gr.Textbox(label="ν”„λ‘¬ν”„νŠΈ (λΉ„λ””μ˜€μ— λŒ€ν•œ μ„€λͺ…을 μž…λ ₯ν•˜μ„Έμš”)")
generate_btn = gr.Button("λΉ„λ””μ˜€ 생성")
video_output = gr.Video(label="μƒμ„±λœ λΉ„λ””μ˜€")
error_output = gr.Textbox(label="였λ₯˜ λ©”μ‹œμ§€", visible=True)
with gr.Accordion("κ³ κΈ‰ μ˜΅μ…˜", open=False):
loop = gr.Checkbox(label="루프 (λΉ„λ””μ˜€λ₯Ό 반볡 μž¬μƒν• μ§€ μ„€μ •)", value=False)
aspect_ratio = gr.Dropdown(
label="ν™”λ©΄ λΉ„μœ¨",
choices=["16:9 (μ™€μ΄λ“œμŠ€ν¬λ¦°)", "1:1 (μ •μ‚¬κ°ν˜•)", "9:16 (μ„Έλ‘œ μ˜μƒ)", "4:3 (ν‘œμ€€)", "3:4 (μ„Έλ‘œ ν‘œμ€€)"],
value="16:9 (μ™€μ΄λ“œμŠ€ν¬λ¦°)"
)
camera_motion = gr.Dropdown(label="카메라 λͺ¨μ…˜ (카메라 μ›€μ§μž„ 효과 선택)")
extend_id = gr.Textbox(label="ν™•μž₯ν•  λΉ„λ””μ˜€ ID (κΈ°μ‘΄ λΉ„λ””μ˜€λ₯Ό μ΄μ–΄μ„œ 생성할 λ•Œ μž…λ ₯)")
reverse_extend_id = gr.Textbox(label="μ—­λ°©ν–₯ ν™•μž₯ν•  λΉ„λ””μ˜€ ID (κΈ°μ‘΄ λΉ„λ””μ˜€μ˜ μ•žλΆ€λΆ„μ„ 생성할 λ•Œ μž…λ ₯)")
with gr.Row():
interpolate_id1 = gr.Textbox(label="보간 λΉ„λ””μ˜€ ID 1 (두 λΉ„λ””μ˜€ 사이λ₯Ό 보간할 λ•Œ 첫 번째 λΉ„λ””μ˜€ ID)")
interpolate_id2 = gr.Textbox(label="보간 λΉ„λ””μ˜€ ID 2 (두 λΉ„λ””μ˜€ 사이λ₯Ό 보간할 λ•Œ 두 번째 λΉ„λ””μ˜€ ID)")
generate_btn.click(
text_to_video,
inputs=[prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2],
outputs=[video_output, error_output]
)
with gr.Tab("μ΄λ―Έμ§€λ‘œ λΉ„λ””μ˜€ λ§Œλ“€κΈ°"):
img_prompt = gr.Textbox(label="ν”„λ‘¬ν”„νŠΈ (이미지λ₯Ό λ°”νƒ•μœΌλ‘œ 생성할 λΉ„λ””μ˜€μ— λŒ€ν•œ μ„€λͺ…)")
img_url = gr.Textbox(label="이미지 URL (λ³€ν™˜ν•˜κ³ μž ν•˜λŠ” μ΄λ―Έμ§€μ˜ μ›Ή μ£Όμ†Œ)")
img_generate_btn = gr.Button("μ΄λ―Έμ§€λ‘œ λΉ„λ””μ˜€ 생성")
img_video_output = gr.Video(label="μƒμ„±λœ λΉ„λ””μ˜€")
img_error_output = gr.Textbox(label="였λ₯˜ λ©”μ‹œμ§€", visible=True)
with gr.Accordion("κ³ κΈ‰ μ˜΅μ…˜", open=False):
img_loop = gr.Checkbox(label="루프 (λΉ„λ””μ˜€λ₯Ό 반볡 μž¬μƒν• μ§€ μ„€μ •)", value=False)
img_aspect_ratio = gr.Dropdown(
label="ν™”λ©΄ λΉ„μœ¨",
choices=["16:9 (μ™€μ΄λ“œμŠ€ν¬λ¦°)", "1:1 (μ •μ‚¬κ°ν˜•)", "9:16 (μ„Έλ‘œ μ˜μƒ)", "4:3 (ν‘œμ€€)", "3:4 (μ„Έλ‘œ ν‘œμ€€)"],
value="16:9 (μ™€μ΄λ“œμŠ€ν¬λ¦°)"
)
img_camera_motion = gr.Dropdown(label="카메라 λͺ¨μ…˜ (카메라 μ›€μ§μž„ 효과 선택)")
img_generate_btn.click(
image_to_video,
inputs=[img_prompt, img_url, img_loop, img_aspect_ratio, img_camera_motion],
outputs=[img_video_output, img_error_output]
)
async def update_camera_motions():
try:
motions = await get_camera_motions()
return gr.update(choices=motions), gr.update(choices=motions)
except Exception as e:
print(f"카메라 λͺ¨μ…˜ μ—…λ°μ΄νŠΈ 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
return gr.update(choices=[]), gr.update(choices=[])
demo.load(update_camera_motions, outputs=[camera_motion, img_camera_motion])
demo.queue().launch(debug=True)