Spaces:
Running
on
Zero
Running
on
Zero
# PyTorch 2.8 (temporary hack) | |
import os | |
os.system('pip install --upgrade --pre --extra-index-url https://download.pytorch.org/whl/nightly/cu126 "torch<2.9" spaces') | |
import logging | |
# Actual demo code | |
import spaces | |
import torch | |
from diffusers import WanPipeline, AutoencoderKLWan | |
from diffusers.models.transformers.transformer_wan import WanTransformer3DModel | |
from diffusers.utils.export_utils import export_to_video | |
import gradio as gr | |
import tempfile | |
import numpy as np | |
from PIL import Image | |
import random | |
import gc | |
from optimization import optimize_pipeline_ | |
import ffmpeg | |
import tempfile | |
import os | |
MODEL_ID = "Wan-AI/Wan2.2-T2V-A14B-Diffusers" | |
LANDSCAPE_WIDTH = 832 | |
LANDSCAPE_HEIGHT = 480 | |
MAX_SEED = np.iinfo(np.int32).max | |
FIXED_FPS = 16 | |
MIN_FRAMES_MODEL = 8 | |
MAX_FRAMES_MODEL = 81 | |
MIN_DURATION = round(MIN_FRAMES_MODEL/FIXED_FPS,1) | |
MAX_DURATION = round(MAX_FRAMES_MODEL/FIXED_FPS,1) | |
vae = AutoencoderKLWan.from_pretrained("Wan-AI/Wan2.2-T2V-A14B-Diffusers", subfolder="vae", torch_dtype=torch.float32) | |
pipe = WanPipeline.from_pretrained(MODEL_ID, | |
transformer=WanTransformer3DModel.from_pretrained('linoyts/Wan2.2-T2V-A14B-Diffusers-BF16', | |
subfolder='transformer', | |
torch_dtype=torch.bfloat16, | |
device_map='cuda', | |
), | |
transformer_2=WanTransformer3DModel.from_pretrained('linoyts/Wan2.2-T2V-A14B-Diffusers-BF16', | |
subfolder='transformer_2', | |
torch_dtype=torch.bfloat16, | |
device_map='cuda', | |
), | |
vae=vae, | |
torch_dtype=torch.bfloat16, | |
).to('cuda') | |
for i in range(3): | |
gc.collect() | |
torch.cuda.synchronize() | |
torch.cuda.empty_cache() | |
optimize_pipeline_(pipe, | |
prompt='prompt', | |
height=LANDSCAPE_HEIGHT, | |
width=LANDSCAPE_WIDTH, | |
num_frames=MAX_FRAMES_MODEL, | |
) | |
default_prompt_t2v = "Two anthropomorphic cats in comfy boxing gear and bright gloves fight intensely on a spotlighted stage." | |
default_negative_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走" | |
from huggingface_hub import HfApi, upload_file | |
import os | |
import uuid | |
import os | |
import uuid | |
import logging | |
from datetime import datetime | |
from huggingface_hub import HfApi, upload_file | |
import tempfile | |
import random | |
import logging | |
from datetime import datetime | |
import uuid | |
import numpy as np | |
import torch | |
import ffmpeg | |
HF_MODEL = os.environ.get("HF_UPLOAD_REPO", "rahul7star/VideoExplain") | |
def upload_to_hf(video_path: str, summary_text: str): | |
api = HfApi() | |
# Create date-based folder | |
today_str = datetime.now().strftime("%Y-%m-%d") | |
unique_subfolder = f"WANT2V-EXP-upload_{uuid.uuid4().hex[:8]}" | |
hf_folder = f"{today_str}/{unique_subfolder}" | |
logging.info(f"Uploading to HF folder: {hf_folder}") | |
# Upload video | |
video_filename = os.path.basename(video_path) | |
video_hf_path = f"{hf_folder}/{video_filename}" | |
upload_file( | |
path_or_fileobj=video_path, | |
path_in_repo=video_hf_path, | |
repo_id=HF_MODEL, | |
repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
) | |
logging.info(f"✅ Uploaded video: {video_hf_path}") | |
# Upload summary | |
summary_file = os.path.join(tempfile.gettempdir(), "summary.txt") | |
with open(summary_file, "w", encoding="utf-8") as f: | |
f.write(summary_text) | |
summary_hf_path = f"{hf_folder}/summary.txt" | |
upload_file( | |
path_or_fileobj=summary_file, | |
path_in_repo=summary_hf_path, | |
repo_id=HF_MODEL, | |
repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
) | |
logging.info(f"✅ Uploaded summary: {summary_hf_path}") | |
return hf_folder | |
import subprocess | |
import tempfile | |
import logging | |
import shutil | |
import os | |
from huggingface_hub import HfApi, upload_file | |
from datetime import datetime | |
import uuid | |
HF_MODEL = os.environ.get("HF_UPLOAD_REPO", "rahul7star/VideoExplain") | |
def upscale_and_upload_4k(input_video_path: str, summary_text: str) -> str: | |
""" | |
Upscale a video to 4K and upload it to Hugging Face Hub without replacing the original file. | |
Args: | |
input_video_path (str): Path to the original video. | |
summary_text (str): Text summary to upload alongside the video. | |
Returns: | |
str: Hugging Face folder path where the video and summary were uploaded. | |
""" | |
logging.info(f"Upscaling video to 4K for upload: {input_video_path}") | |
# Create a temporary file for the upscaled video | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_upscaled: | |
upscaled_path = tmp_upscaled.name | |
# FFmpeg upscale command | |
cmd = [ | |
"ffmpeg", | |
"-i", input_video_path, | |
"-vf", "scale=3840:2160:flags=lanczos", | |
"-c:v", "libx264", | |
"-crf", "18", | |
"-preset", "slow", | |
"-y", | |
upscaled_path, | |
] | |
try: | |
subprocess.run(cmd, check=True, capture_output=True) | |
logging.info(f"✅ Upscaled video created at: {upscaled_path}") | |
except subprocess.CalledProcessError as e: | |
logging.error(f"FFmpeg failed:\n{e.stderr.decode()}") | |
raise | |
# Create a date-based folder on HF | |
today_str = datetime.now().strftime("%Y-%m-%d") | |
unique_subfolder = f"Upload-4K-{uuid.uuid4().hex[:8]}" | |
hf_folder = f"{today_str}/{unique_subfolder}" | |
# Upload video | |
video_filename = os.path.basename(input_video_path) | |
video_hf_path = f"{hf_folder}/{video_filename}" | |
upload_file( | |
path_or_fileobj=upscaled_path, | |
path_in_repo=video_hf_path, | |
repo_id=HF_MODEL, | |
repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
) | |
logging.info(f"✅ Uploaded 4K video to HF: {video_hf_path}") | |
# Upload summary.txt | |
summary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name | |
with open(summary_file, "w", encoding="utf-8") as f: | |
f.write(summary_text) | |
summary_hf_path = f"{hf_folder}/summary.txt" | |
upload_file( | |
path_or_fileobj=summary_file, | |
path_in_repo=summary_hf_path, | |
repo_id=HF_MODEL, | |
repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
) | |
logging.info(f"✅ Uploaded summary to HF: {summary_hf_path}") | |
# Cleanup temporary files | |
os.remove(upscaled_path) | |
os.remove(summary_file) | |
return hf_folder | |
def save_video_ffmpeg(frames: list, video_path: str, fps: int = FIXED_FPS): | |
h, w, c = frames[0].shape | |
process = ( | |
ffmpeg | |
.input( | |
'pipe:', format='rawvideo', pix_fmt='rgb24', | |
s=f'{w}x{h}', framerate=fps | |
) | |
.output( | |
video_path, | |
pix_fmt='yuv420p', | |
vcodec='libx264', | |
crf=18, | |
preset='slow' | |
) | |
.overwrite_output() | |
.run_async(pipe_stdin=True) | |
) | |
for frame in frames: | |
process.stdin.write(frame.astype(np.uint8).tobytes()) | |
process.stdin.close() | |
process.wait() | |
logging.info(f"✅ Video saved to {video_path}") | |
def upload_to_hf0(video_path, summary_text): | |
api = HfApi() | |
# Create a date-based folder (YYYY-MM-DD) | |
today_str = datetime.now().strftime("%Y-%m-%d") | |
date_folder = today_str | |
# Generate a unique subfolder for this upload | |
unique_subfolder = f"WANT2V-EXP-upload_{uuid.uuid4().hex[:8]}" | |
hf_folder = f"{date_folder}/{unique_subfolder}" | |
logging.info(f"Uploading files to HF folder: {hf_folder} in repo {HF_MODEL}") | |
# Upload video | |
video_filename = os.path.basename(video_path) | |
video_hf_path = f"{hf_folder}/{video_filename}" | |
upload_file( | |
path_or_fileobj=video_path, | |
path_in_repo=video_hf_path, | |
repo_id=HF_MODEL, | |
repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
) | |
logging.info(f"✅ Uploaded video to HF: {video_hf_path}") | |
# Upload summary.txt | |
summary_file = "/tmp/summary.txt" | |
with open(summary_file, "w", encoding="utf-8") as f: | |
f.write(summary_text) | |
summary_hf_path = f"{hf_folder}/summary.txt" | |
upload_file( | |
path_or_fileobj=summary_file, | |
path_in_repo=summary_hf_path, | |
repo_id=HF_MODEL, | |
repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
) | |
logging.info(f"✅ Uploaded summary to HF: {summary_hf_path}") | |
return hf_folder | |
def get_duration( | |
prompt, | |
negative_prompt, | |
duration_seconds, | |
guidance_scale, | |
guidance_scale_2, | |
steps, | |
seed, | |
randomize_seed, | |
progress, | |
): | |
return steps * 15 | |
def generate_video( | |
prompt, | |
negative_prompt=default_negative_prompt, | |
duration_seconds=MAX_DURATION, | |
guidance_scale=1, | |
guidance_scale_2=3, | |
steps=4, | |
seed=42, | |
randomize_seed=False, | |
progress=gr.Progress(track_tqdm=True), | |
): | |
print("Prompt:", prompt) | |
num_frames = np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL) | |
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) | |
# Generate frames | |
output_frames_list = pipe( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
height=LANDSCAPE_HEIGHT, | |
width=LANDSCAPE_WIDTH, | |
num_frames=num_frames, | |
guidance_scale=float(guidance_scale), | |
guidance_scale_2=float(guidance_scale_2), | |
num_inference_steps=int(steps), | |
generator=torch.Generator(device="cuda").manual_seed(current_seed), | |
).frames[0] | |
video_path = os.path.join(tempfile.gettempdir(), f"video_{current_seed}.mp4") | |
# Export frames to video (this is the high-quality video you see in Gradio) | |
export_to_video(output_frames_list, video_path, fps=FIXED_FPS) | |
#hf_folder = upload_to_hf(video_path, prompt) | |
upscale_and_upload_4k(video_path,prompt) | |
return video_path, current_seed | |
with gr.Blocks() as demo: | |
gr.Markdown("# Fast 4 steps Wan 2.2 T2V (14B) with Lightning LoRA") | |
gr.Markdown("run Wan 2.2 in just 4-8 steps, with [Wan 2.2 Lightning LoRA](https://huggingface.co/Kijai/WanVideo_comfy/tree/main/Wan22-Lightning), fp8 quantization & AoT compilation - compatible with 🧨 diffusers and ZeroGPU⚡️") | |
with gr.Row(): | |
with gr.Column(): | |
prompt_input = gr.Textbox(label="Prompt", value=default_prompt_t2v) | |
duration_seconds_input = gr.Slider(minimum=MIN_DURATION, maximum=MAX_DURATION, step=0.1, value=MAX_DURATION, label="Duration (seconds)", info=f"Clamped to model's {MIN_FRAMES_MODEL}-{MAX_FRAMES_MODEL} frames at {FIXED_FPS}fps.") | |
with gr.Accordion("Advanced Settings", open=False): | |
negative_prompt_input = gr.Textbox(label="Negative Prompt", value=default_negative_prompt, lines=3) | |
seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True) | |
randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True) | |
steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=4, label="Inference Steps") | |
guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale - high noise stage") | |
guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=3, label="Guidance Scale 2 - low noise stage") | |
generate_button = gr.Button("Generate Video", variant="primary") | |
with gr.Column(): | |
video_output = gr.Video(label="Generated Video", autoplay=True, interactive=False) | |
ui_inputs = [ | |
prompt_input, | |
negative_prompt_input, duration_seconds_input, | |
guidance_scale_input, guidance_scale_2_input, steps_slider, seed_input, randomize_seed_checkbox | |
] | |
generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, seed_input]) | |
gr.Examples( | |
examples=[ | |
[ | |
"POV selfie video, white cat with sunglasses standing on surfboard, relaxed smile, tropical beach behind (clear water, green hills, blue sky with clouds). Surfboard tips, cat falls into ocean, camera plunges underwater with bubbles and sunlight beams. Brief underwater view of cat’s face, then cat resurfaces, still filming selfie, playful summer vacation mood.", | |
], | |
[ | |
"Two anthropomorphic cats in comfy boxing gear and bright gloves fight intensely on a spotlighted stage.", | |
], | |
[ | |
"A cinematic shot of a boat sailing on a calm sea at sunset.", | |
], | |
[ | |
"Drone footage flying over a futuristic city with flying cars.", | |
], | |
], | |
inputs=[prompt_input], outputs=[video_output, seed_input], fn=generate_video, cache_examples="lazy" | |
) | |
if __name__ == "__main__": | |
demo.queue().launch(mcp_server=True) | |