Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
from lumaai import AsyncLumaAI | |
import asyncio | |
import aiohttp | |
from transformers import pipeline | |
# ๋ฒ์ญ ํ์ดํ๋ผ์ธ ์ด๊ธฐํ | |
translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ko-en") | |
async def get_camera_motions(): | |
api_key = os.getenv("LMGEN_KEY") | |
if not api_key: | |
raise gr.Error("LMGEN_KEY environment variable is not set.") | |
client = AsyncLumaAI(auth_token=api_key) | |
try: | |
motions = await client.generations.camera_motion.list() | |
return [str(motion) for motion in motions] # Convert each motion to a string | |
except Exception as e: | |
print(f"Error fetching camera motions: {str(e)}") | |
return [] | |
async def generate_video(client, prompt, loop=False, aspect_ratio="16:9", camera_motion=None, extend_id=None, reverse_extend_id=None, interpolate_ids=None, progress=gr.Progress()): | |
# aspect_ratio์์ ์ค์ ๋น์จ๋ง ์ถ์ถ | |
aspect_ratio = aspect_ratio.split()[0] | |
generation_params = { | |
"prompt": prompt, | |
"loop": loop, | |
"aspect_ratio": aspect_ratio | |
} | |
if camera_motion: | |
generation_params["prompt"] += f" {camera_motion}" | |
if extend_id: | |
generation_params["keyframes"] = { | |
"frame0": { | |
"type": "generation", | |
"id": extend_id | |
} | |
} | |
elif reverse_extend_id: | |
generation_params["keyframes"] = { | |
"frame1": { | |
"type": "generation", | |
"id": reverse_extend_id | |
} | |
} | |
elif interpolate_ids: | |
generation_params["keyframes"] = { | |
"frame0": { | |
"type": "generation", | |
"id": interpolate_ids[0] | |
}, | |
"frame1": { | |
"type": "generation", | |
"id": interpolate_ids[1] | |
} | |
} | |
progress(0, desc="๋น๋์ค ์์ฑ ์์ ์ค...") | |
generation = await client.generations.create(**generation_params) | |
progress(0.1, desc="๋น๋์ค ์์ฑ ์์๋จ. ์๋ฃ ๋๊ธฐ ์ค...") | |
# Poll for completion | |
start_time = asyncio.get_event_loop().time() | |
while True: | |
status = await client.generations.get(id=generation.id) | |
if status.state == "completed": | |
break | |
elif status.state == "failed": | |
raise Exception("๋น๋์ค ์์ฑ ์คํจ") | |
# Update progress based on time elapsed (assuming 60 seconds total) | |
elapsed_time = asyncio.get_event_loop().time() - start_time | |
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9) | |
progress(progress_value, desc="๋น๋์ค ์์ฑ ์ค...") | |
await asyncio.sleep(5) | |
progress(0.9, desc="์์ฑ๋ ๋น๋์ค ๋ค์ด๋ก๋ ์ค...") | |
# Download the video | |
video_url = status.assets.video | |
async with aiohttp.ClientSession() as session: | |
async with session.get(video_url) as resp: | |
if resp.status == 200: | |
file_name = f"luma_ai_generated_{generation.id}.mp4" | |
with open(file_name, 'wb') as fd: | |
while True: | |
chunk = await resp.content.read(1024) | |
if not chunk: | |
break | |
fd.write(chunk) | |
progress(1.0, desc="๋น๋์ค ์์ฑ ์๋ฃ!") | |
return file_name, generation.id | |
async def translate_prompt(prompt): | |
try: | |
translated = translator(prompt, max_length=512)[0]['translation_text'] | |
return translated | |
except Exception as e: | |
print(f"๋ฒ์ญ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
return prompt # ๋ฒ์ญ ์คํจ ์ ์๋ณธ ํ๋กฌํํธ ๋ฐํ | |
async def text_to_video(prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2, progress=gr.Progress()): | |
api_key = os.getenv("LMGEN_KEY") | |
if not api_key: | |
raise gr.Error("LMGEN_KEY ํ๊ฒฝ ๋ณ์๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค.") | |
client = AsyncLumaAI(auth_token=api_key) | |
try: | |
# ํ๋กฌํํธ ๋ฒ์ญ | |
translated_prompt = await translate_prompt(prompt) | |
print(f"์๋ณธ ํ๋กฌํํธ: {prompt}") | |
print(f"๋ฒ์ญ๋ ํ๋กฌํํธ: {translated_prompt}") | |
# aspect_ratio์์ ์ค์ ๋น์จ๋ง ์ถ์ถ | |
aspect_ratio = aspect_ratio.split()[0] | |
interpolate_ids = None | |
if interpolate_id1 and interpolate_id2: | |
interpolate_ids = [interpolate_id1, interpolate_id2] | |
video_path, video_id = await generate_video( | |
client, translated_prompt, loop, aspect_ratio, camera_motion, | |
extend_id, reverse_extend_id, interpolate_ids, progress | |
) | |
return video_path, video_id, "" | |
except Exception as e: | |
return None, None, f"์ค๋ฅ ๋ฐ์: {str(e)}" | |
async def image_to_video(prompt, image_url, loop, aspect_ratio, camera_motion, progress=gr.Progress()): | |
api_key = os.getenv("LMGEN_KEY") | |
if not api_key: | |
raise gr.Error("LMGEN_KEY ํ๊ฒฝ ๋ณ์๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค.") | |
client = AsyncLumaAI(auth_token=api_key) | |
try: | |
# ํ๋กฌํํธ ๋ฒ์ญ | |
translated_prompt = await translate_prompt(prompt) | |
print(f"์๋ณธ ํ๋กฌํํธ: {prompt}") | |
print(f"๋ฒ์ญ๋ ํ๋กฌํํธ: {translated_prompt}") | |
# aspect_ratio์์ ์ค์ ๋น์จ๋ง ์ถ์ถ | |
aspect_ratio = aspect_ratio.split()[0] | |
generation_params = { | |
"prompt": translated_prompt + (f" {camera_motion}" if camera_motion else ""), | |
"loop": loop, | |
"aspect_ratio": aspect_ratio, | |
"keyframes": { | |
"frame0": { | |
"type": "image", | |
"url": image_url | |
} | |
} | |
} | |
progress(0, desc="์ด๋ฏธ์ง๋ก๋ถํฐ ๋น๋์ค ์์ฑ ์์ ์ค...") | |
generation = await client.generations.create(**generation_params) | |
progress(0.1, desc="๋น๋์ค ์์ฑ ์์๋จ. ์๋ฃ ๋๊ธฐ ์ค...") | |
# Poll for completion | |
start_time = asyncio.get_event_loop().time() | |
while True: | |
status = await client.generations.get(id=generation.id) | |
if status.state == "completed": | |
break | |
elif status.state == "failed": | |
raise Exception("๋น๋์ค ์์ฑ ์คํจ") | |
# Update progress based on time elapsed (assuming 60 seconds total) | |
elapsed_time = asyncio.get_event_loop().time() - start_time | |
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9) | |
progress(progress_value, desc="๋น๋์ค ์์ฑ ์ค...") | |
await asyncio.sleep(5) | |
progress(0.9, desc="์์ฑ๋ ๋น๋์ค ๋ค์ด๋ก๋ ์ค...") | |
# Download the video | |
video_url = status.assets.video | |
async with aiohttp.ClientSession() as session: | |
async with session.get(video_url) as resp: | |
if resp.status == 200: | |
file_name = f"luma_ai_generated_{generation.id}.mp4" | |
with open(file_name, 'wb') as fd: | |
while True: | |
chunk = await resp.content.read(1024) | |
if not chunk: | |
break | |
fd.write(chunk) | |
progress(1.0, desc="๋น๋์ค ์์ฑ ์๋ฃ!") | |
return file_name, generation.id, "" | |
except Exception as e: | |
return None, None, f"์ค๋ฅ ๋ฐ์: {str(e)}" | |
css = """ | |
footer { | |
visibility: hidden; | |
} | |
""" | |
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css) as demo: | |
with gr.Tab("ํ ์คํธ๋ก ๋น๋์ค ๋ง๋ค๊ธฐ"): | |
prompt = gr.Textbox(label="ํ๋กฌํํธ (๋น๋์ค์ ๋ํ ์ค๋ช ์ ์ ๋ ฅํ์ธ์)") | |
generate_btn = gr.Button("๋น๋์ค ์์ฑ") | |
video_output = gr.Video(label="์์ฑ๋ ๋น๋์ค") | |
video_id_output = gr.Textbox(label="์์ฑ๋ ๋น๋์ค ID", visible=True) | |
error_output = gr.Textbox(label="์ค๋ฅ ๋ฉ์์ง", visible=True) | |
with gr.Accordion("๊ณ ๊ธ ์ต์ ", open=False): | |
loop = gr.Checkbox(label="๋ฃจํ (๋น๋์ค๋ฅผ ๋ฐ๋ณต ์ฌ์ํ ์ง ์ค์ )", value=False) | |
aspect_ratio = gr.Dropdown( | |
label="ํ๋ฉด ๋น์จ", | |
choices=["16:9 (์์ด๋์คํฌ๋ฆฐ)", "1:1 (์ ์ฌ๊ฐํ)", "9:16 (์ธ๋ก ์์)", "4:3 (ํ์ค)", "3:4 (์ธ๋ก ํ์ค)", "21:9 (์ธํธ๋ผ์์ด๋)", "9:21 (์ธ๋ก ์ธํธ๋ผ์์ด๋)"], | |
value="16:9 (์์ด๋์คํฌ๋ฆฐ)" | |
) | |
camera_motion = gr.Dropdown(label="์นด๋ฉ๋ผ ๋ชจ์ (์นด๋ฉ๋ผ ์์ง์ ํจ๊ณผ ์ ํ)") | |
extend_id = gr.Textbox(label="ํ์ฅํ ๋น๋์ค ID (๊ธฐ์กด ๋น๋์ค๋ฅผ ์ด์ด์ ์์ฑํ ๋ ์ ๋ ฅ)") | |
reverse_extend_id = gr.Textbox(label="์ญ๋ฐฉํฅ ํ์ฅํ ๋น๋์ค ID (๊ธฐ์กด ๋น๋์ค์ ์๋ถ๋ถ์ ์์ฑํ ๋ ์ ๋ ฅ)") | |
with gr.Row(): | |
interpolate_id1 = gr.Textbox(label="๋ณด๊ฐ ๋น๋์ค ID 1 (๋ ๋น๋์ค ์ฌ์ด๋ฅผ ๋ณด๊ฐํ ๋ ์ฒซ ๋ฒ์งธ ๋น๋์ค ID)") | |
interpolate_id2 = gr.Textbox(label="๋ณด๊ฐ ๋น๋์ค ID 2 (๋ ๋น๋์ค ์ฌ์ด๋ฅผ ๋ณด๊ฐํ ๋ ๋ ๋ฒ์งธ ๋น๋์ค ID)") | |
generate_btn.click( | |
text_to_video, | |
inputs=[prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2], | |
outputs=[video_output, video_id_output, error_output] | |
) | |
with gr.Tab("์ด๋ฏธ์ง๋ก ๋น๋์ค ๋ง๋ค๊ธฐ"): | |
img_prompt = gr.Textbox(label="ํ๋กฌํํธ (์ด๋ฏธ์ง๋ฅผ ๋ฐํ์ผ๋ก ์์ฑํ ๋น๋์ค์ ๋ํ ์ค๋ช )") | |
img_url = gr.Textbox(label="์ด๋ฏธ์ง URL (๋ณํํ๊ณ ์ ํ๋ ์ด๋ฏธ์ง์ ์น ์ฃผ์)") | |
img_generate_btn = gr.Button("์ด๋ฏธ์ง๋ก ๋น๋์ค ์์ฑ") | |
img_video_output = gr.Video(label="์์ฑ๋ ๋น๋์ค") | |
img_video_id_output = gr.Textbox(label="์์ฑ๋ ๋น๋์ค ID", visible=True) | |
img_error_output = gr.Textbox(label="์ค๋ฅ ๋ฉ์์ง", visible=True) | |
with gr.Accordion("๊ณ ๊ธ ์ต์ ", open=False): | |
img_loop = gr.Checkbox(label="๋ฃจํ (๋น๋์ค๋ฅผ ๋ฐ๋ณต ์ฌ์ํ ์ง ์ค์ )", value=False) | |
img_aspect_ratio = gr.Dropdown( | |
label="ํ๋ฉด ๋น์จ", | |
choices=["16:9 (์์ด๋์คํฌ๋ฆฐ)", "1:1 (์ ์ฌ๊ฐํ)", "9:16 (์ธ๋ก ์์)", "4:3 (ํ์ค)", "3:4 (์ธ๋ก ํ์ค)", "21:9 (์ธํธ๋ผ์์ด๋)", "9:21 (์ธ๋ก ์ธํธ๋ผ์์ด๋)"], | |
value="16:9 (์์ด๋์คํฌ๋ฆฐ)" | |
) | |
img_camera_motion = gr.Dropdown(label="์นด๋ฉ๋ผ ๋ชจ์ (์นด๋ฉ๋ผ ์์ง์ ํจ๊ณผ ์ ํ)") | |
img_generate_btn.click( | |
image_to_video, | |
inputs=[img_prompt, img_url, img_loop, img_aspect_ratio, img_camera_motion], | |
outputs=[img_video_output, img_video_id_output, img_error_output] | |
) | |
async def update_camera_motions(): | |
try: | |
motions = await get_camera_motions() | |
return gr.update(choices=motions), gr.update(choices=motions) | |
except Exception as e: | |
print(f"์นด๋ฉ๋ผ ๋ชจ์ ์ ๋ฐ์ดํธ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
return gr.update(choices=[]), gr.update(choices=[]) | |
demo.load(update_camera_motions, outputs=[camera_motion, img_camera_motion]) | |
demo.queue().launch(auth=("gini","pick")) |