Spaces:
Runtime error
Runtime error
| # General | |
| import os | |
| from os.path import join as opj | |
| import argparse | |
| import datetime | |
| from pathlib import Path | |
| import torch | |
| import gradio as gr | |
| import tempfile | |
| import yaml | |
| # from t2v_enhanced.model.video_ldm import VideoLDM | |
| from typing import List, Optional | |
| # from model.callbacks import SaveConfigCallback | |
| from PIL.Image import Image, fromarray | |
| # from einops import rearrange, repeat | |
| import sys | |
| from ... import MODEL_PATH | |
| sys.path.append("thirdparty") | |
| # from modelscope.pipelines import pipeline | |
| # from modelscope.outputs import OutputKeys | |
| import imageio | |
| import pathlib | |
| import numpy as np | |
| # Utilities | |
| from .inference_utils import * | |
| from .model_init import ( | |
| init_modelscope, | |
| init_animatediff, | |
| init_svd, | |
| init_sdxl, | |
| init_v2v_model, | |
| init_streamingt2v_model, | |
| ) | |
| from .model_func import * | |
| def pipeline(prompt, size, seconds, fps, seed): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--prompt", | |
| type=str, | |
| default=prompt, | |
| help="The prompt to guide video generation.", | |
| ) | |
| parser.add_argument( | |
| "--image", type=str, default="", help="Path to image conditioning." | |
| ) | |
| # parser.add_argument('--video', type=str, default="", help="Path to video conditioning.") | |
| parser.add_argument( | |
| "--base_model", | |
| type=str, | |
| default="ModelscopeT2V", | |
| help="Base model to generate first chunk from", | |
| choices=["ModelscopeT2V", "AnimateDiff", "SVD"], | |
| ) | |
| parser.add_argument( | |
| "--num_frames", | |
| type=int, | |
| default=seconds * fps, | |
| help="The number of video frames to generate.", | |
| ) | |
| parser.add_argument( | |
| "--negative_prompt", | |
| type=str, | |
| default="", | |
| help="The prompt to guide what to not include in video generation.", | |
| ) | |
| parser.add_argument( | |
| "--negative_prompt_enhancer", | |
| type=str, | |
| default=None, | |
| help="The prompt to guide what to not include in video enhancement. " | |
| "By default is the same as --negative_prompt", | |
| ) | |
| parser.add_argument( | |
| "--num_steps", type=int, default=50, help="The number of denoising steps." | |
| ) | |
| parser.add_argument( | |
| "--image_guidance", type=float, default=9.0, help="The guidance scale." | |
| ) | |
| parser.add_argument( | |
| "--output_dir", | |
| type=str, | |
| default="results", | |
| help="Path where to save the generated videos.", | |
| ) | |
| parser.add_argument("--device", type=str, default="cpu") | |
| parser.add_argument("--seed", type=int, default=seed, help="Random seed") | |
| parser.add_argument( | |
| "--chunk", type=int, default=24, help="chunk_size for randomized blending" | |
| ) | |
| parser.add_argument( | |
| "--overlap", type=int, default=8, help="overlap_size for randomized blending" | |
| ) | |
| parser.add_argument( | |
| "--offload_models", | |
| action="store_true", | |
| help="Load/Offload models to gpu/cpu before and after inference", | |
| ) | |
| args = parser.parse_args() | |
| Path(args.output_dir).mkdir(parents=True, exist_ok=True) | |
| result_fol = Path(args.output_dir).absolute() | |
| device = args.device | |
| # -------------------------- | |
| # ----- Configurations ----- | |
| # -------------------------- | |
| ckpt_file_streaming_t2v = os.path.join(MODEL_PATH, "streamingtv2", "streaming_t2v.ckpt") | |
| cfg_v2v = { | |
| "downscale": 1, | |
| "upscale_size": size, | |
| "model_id": "damo/Video-to-Video", | |
| "pad": True, | |
| } | |
| # -------------------------- | |
| # ----- Initialization ----- | |
| # -------------------------- | |
| if args.base_model == "ModelscopeT2V": | |
| if args.offload_models: | |
| model = init_modelscope("cpu") | |
| else: | |
| model = init_modelscope(device) | |
| elif args.base_model == "AnimateDiff": | |
| if args.offload_models: | |
| model = init_animatediff("cpu") | |
| else: | |
| model = init_animatediff(device) | |
| elif args.base_model == "SVD": | |
| if args.offload_models: | |
| model = init_svd("cpu") | |
| sdxl_model = init_sdxl("cpu") | |
| else: | |
| model = init_svd(device) | |
| sdxl_model = init_sdxl(device) | |
| if args.offload_models: | |
| msxl_model = init_v2v_model(cfg_v2v, "cpu") | |
| else: | |
| msxl_model = init_v2v_model(cfg_v2v, device) | |
| stream_cli, stream_model = init_streamingt2v_model( | |
| ckpt_file_streaming_t2v, result_fol, "cuda" | |
| ) | |
| if args.offload_models: | |
| stream_model = st2v_to_device(stream_model, "cpu") | |
| inference_generator = torch.Generator(device="cuda") | |
| # ------------------ | |
| # ----- Inputs ----- | |
| # ------------------ | |
| now = datetime.datetime.now() | |
| name = ( | |
| args.prompt[:100].replace(" ", "_") | |
| + "_" | |
| + str(now.time()).replace(":", "_").replace(".", "_") | |
| ) | |
| inference_generator = torch.Generator(device="cuda") | |
| inference_generator.manual_seed(args.seed) | |
| if args.offload_models: | |
| model = model.to(device) | |
| if args.base_model == "ModelscopeT2V": | |
| short_video = ms_short_gen(args.prompt, model, inference_generator) | |
| elif args.base_model == "AnimateDiff": | |
| short_video = ad_short_gen(args.prompt, model, inference_generator) | |
| elif args.base_model == "SVD": | |
| if args.offload_models: | |
| sdxl_model = sdxl_model.to(device) | |
| short_video = svd_short_gen( | |
| args.image, args.prompt, model, sdxl_model, inference_generator | |
| ) | |
| if args.offload_models: | |
| sdxl_model = sdxl_model.to("cpu") | |
| if args.offload_models: | |
| model = model.to("cpu") | |
| n_autoreg_gen = (args.num_frames - 8) // 8 | |
| stream_long_gen( | |
| args.prompt, | |
| short_video, | |
| n_autoreg_gen, | |
| args.negative_prompt, | |
| args.seed, | |
| args.num_steps, | |
| args.image_guidance, | |
| name, | |
| stream_cli, | |
| stream_model, | |
| ) | |
| if args.offload_models: | |
| stream_model = st2v_to_device(stream_model, "cpu") | |
| args.negative_prompt_enhancer = ( | |
| args.negative_prompt_enhancer | |
| if args.negative_prompt_enhancer is not None | |
| else args.negative_prompt | |
| ) | |
| if args.offload_models: | |
| msxl_model = v2v_to_device(msxl_model, device) | |
| return video2video_randomized( | |
| args.prompt, | |
| opj(result_fol, name + ".mp4"), | |
| result_fol, | |
| cfg_v2v, | |
| msxl_model, | |
| chunk_size=args.chunk, | |
| overlap_size=args.overlap, | |
| negative_prompt=args.negative_prompt_enhancer, | |
| ) | |
| # if args.offload_models: | |
| # msxl_model = v2v_to_device(msxl_model, "cpu") | |