Spaces:
Running
on
Zero
Running
on
Zero
| import json | |
| import subprocess | |
| import uuid | |
| from pathlib import Path | |
| import comfy.model_management as model_management | |
| import comfy.utils | |
| import folder_paths | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from ..log import log | |
| from ..utils import PIL_FILTER_MAP, output_dir, session_id, tensor2np | |
| def get_playlist_path(playlist_name: str, persistant_playlist=False): | |
| if persistant_playlist: | |
| return output_dir / "playlists" / f"{playlist_name}.json" | |
| return output_dir / "playlists" / session_id / f"{playlist_name}.json" | |
| class MTB_ReadPlaylist: | |
| """Read a playlist""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "enable": ("BOOLEAN", {"default": True}), | |
| "persistant_playlist": ("BOOLEAN", {"default": False}), | |
| "playlist_name": ( | |
| "STRING", | |
| {"default": "playlist_{index:04d}"}, | |
| ), | |
| "index": ("INT", {"default": 0, "min": 0}), | |
| } | |
| } | |
| RETURN_TYPES = ("PLAYLIST",) | |
| FUNCTION = "read_playlist" | |
| CATEGORY = "mtb/IO" | |
| EXPERIMENTAL = True | |
| def read_playlist( | |
| self, | |
| enable: bool, | |
| persistant_playlist: bool, | |
| playlist_name: str, | |
| index: int, | |
| ): | |
| playlist_name = playlist_name.format(index=index) | |
| playlist_path = get_playlist_path(playlist_name, persistant_playlist) | |
| if not enable: | |
| return (None,) | |
| if not playlist_path.exists(): | |
| log.warning(f"Playlist {playlist_path} does not exist, skipping") | |
| return (None,) | |
| log.debug(f"Reading playlist {playlist_path}") | |
| return (json.loads(playlist_path.read_text(encoding="utf-8")),) | |
| class MTB_AddToPlaylist: | |
| """Add a video to the playlist""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "relative_paths": ("BOOLEAN", {"default": False}), | |
| "persistant_playlist": ("BOOLEAN", {"default": False}), | |
| "playlist_name": ( | |
| "STRING", | |
| {"default": "playlist_{index:04d}"}, | |
| ), | |
| "index": ("INT", {"default": 0, "min": 0}), | |
| } | |
| } | |
| RETURN_TYPES = () | |
| OUTPUT_NODE = True | |
| FUNCTION = "add_to_playlist" | |
| CATEGORY = "mtb/IO" | |
| EXPERIMENTAL = True | |
| def add_to_playlist( | |
| self, | |
| relative_paths: bool, | |
| persistant_playlist: bool, | |
| playlist_name: str, | |
| index: int, | |
| **kwargs, | |
| ): | |
| playlist_name = playlist_name.format(index=index) | |
| playlist_path = get_playlist_path(playlist_name, persistant_playlist) | |
| if not playlist_path.parent.exists(): | |
| playlist_path.parent.mkdir(parents=True, exist_ok=True) | |
| playlist = [] | |
| if not playlist_path.exists(): | |
| playlist_path.write_text("[]") | |
| else: | |
| playlist = json.loads(playlist_path.read_text()) | |
| log.debug(f"Playlist {playlist_path} has {len(playlist)} items") | |
| for video in kwargs.values(): | |
| if relative_paths: | |
| video = Path(video).relative_to(output_dir).as_posix() | |
| log.debug(f"Adding {video} to playlist") | |
| playlist.append(video) | |
| log.debug(f"Writing playlist {playlist_path}") | |
| playlist_path.write_text(json.dumps(playlist), encoding="utf-8") | |
| return () | |
| class MTB_ExportWithFfmpeg: | |
| """Export with FFmpeg (Experimental). | |
| [DEPRACATED] Use VHS nodes instead | |
| """ | |
| def INPUT_TYPES(cls): | |
| return { | |
| "optional": { | |
| "images": ("IMAGE",), | |
| "playlist": ("PLAYLIST",), | |
| }, | |
| "required": { | |
| "fps": ("FLOAT", {"default": 24, "min": 1}), | |
| "prefix": ("STRING", {"default": "export"}), | |
| "format": ( | |
| ["mov", "mp4", "mkv", "gif", "avi"], | |
| {"default": "mov"}, | |
| ), | |
| "codec": ( | |
| ["prores_ks", "libx264", "libx265", "gif"], | |
| {"default": "prores_ks"}, | |
| ), | |
| }, | |
| } | |
| RETURN_TYPES = ("VIDEO",) | |
| OUTPUT_NODE = True | |
| FUNCTION = "export_prores" | |
| DEPRECATED = True | |
| CATEGORY = "mtb/IO" | |
| def export_prores( | |
| self, | |
| fps: float, | |
| prefix: str, | |
| format: str, | |
| codec: str, | |
| images: torch.Tensor | None = None, | |
| playlist: list[str] | None = None, | |
| ): | |
| file_ext = format | |
| file_id = f"{prefix}_{uuid.uuid4()}.{file_ext}" | |
| if playlist is not None and images is not None: | |
| log.info(f"Exporting to {output_dir / file_id}") | |
| if playlist is not None: | |
| if len(playlist) == 0: | |
| log.debug("Playlist is empty, skipping") | |
| return ("",) | |
| temp_playlist_path = ( | |
| output_dir / f"temp_playlist_{uuid.uuid4()}.txt" | |
| ) | |
| log.debug( | |
| f"Create a temporary file to list the videos for concatenation to {temp_playlist_path}" | |
| ) | |
| with open(temp_playlist_path, "w") as f: | |
| for video_path in playlist: | |
| f.write(f"file '{video_path}'\n") | |
| out_path = (output_dir / file_id).as_posix() | |
| # Prepare the FFmpeg command for concatenating videos from the playlist | |
| command = [ | |
| "ffmpeg", | |
| "-f", | |
| "concat", | |
| "-safe", | |
| "0", | |
| "-i", | |
| temp_playlist_path.as_posix(), | |
| "-c", | |
| "copy", | |
| "-y", | |
| out_path, | |
| ] | |
| log.debug(f"Executing {command}") | |
| subprocess.run(command) | |
| temp_playlist_path.unlink() | |
| return (out_path,) | |
| if ( | |
| images is None or images.size(0) == 0 | |
| ): # the is None check is just for the type checker | |
| return ("",) | |
| frames = tensor2np(images) | |
| log.debug(f"Frames type {type(frames[0])}") | |
| log.debug(f"Exporting {len(frames)} frames") | |
| height, width, channels = frames[0].shape | |
| has_alpha = channels == 4 | |
| out_path = (output_dir / file_id).as_posix() | |
| if codec == "gif": | |
| command = [ | |
| "ffmpeg", | |
| "-f", | |
| "image2pipe", | |
| "-vcodec", | |
| "png", | |
| "-r", | |
| str(fps), | |
| "-i", | |
| "-", | |
| "-vcodec", | |
| "gif", | |
| "-y", | |
| out_path, | |
| ] | |
| process = subprocess.Popen(command, stdin=subprocess.PIPE) | |
| for frame in frames: | |
| model_management.throw_exception_if_processing_interrupted() | |
| Image.fromarray(frame).save(process.stdin, "PNG") | |
| process.stdin.close() | |
| process.wait() | |
| return (out_path,) | |
| else: | |
| if has_alpha: | |
| if codec in ["prores_ks", "libx264", "libx265"]: | |
| pix_fmt = ( | |
| "yuva444p" if codec == "prores_ks" else "yuva420p" | |
| ) | |
| frames = [ | |
| frame.astype(np.uint16) * 257 for frame in frames | |
| ] | |
| else: | |
| log.warning( | |
| f"Alpha channel not supported for codec {codec}. Alpha will be ignored." | |
| ) | |
| frames = [ | |
| frame[:, :, :3].astype(np.uint16) * 257 | |
| for frame in frames | |
| ] | |
| pix_fmt = "rgb48le" if codec == "prores_ks" else "yuv420p" | |
| else: | |
| pix_fmt = "rgb48le" if codec == "prores_ks" else "yuv420p" | |
| frames = [frame.astype(np.uint16) * 257 for frame in frames] | |
| # Prepare the FFmpeg command | |
| command = [ | |
| "ffmpeg", | |
| "-y", | |
| "-f", | |
| "rawvideo", | |
| "-vcodec", | |
| "rawvideo", | |
| "-s", | |
| f"{width}x{height}", | |
| "-pix_fmt", | |
| pix_fmt, | |
| "-r", | |
| str(fps), | |
| "-i", | |
| "-", | |
| "-c:v", | |
| codec, | |
| ] | |
| if codec == "prores_ks": | |
| command.extend(["-profile:v", "4444"]) | |
| command.extend( | |
| [ | |
| "-r", | |
| str(fps), | |
| "-y", | |
| out_path, | |
| ] | |
| ) | |
| process = subprocess.Popen(command, stdin=subprocess.PIPE) | |
| pbar = comfy.utils.ProgressBar(len(frames)) | |
| for frame in frames: | |
| process.stdin.write(frame.tobytes()) | |
| pbar.update(1) | |
| process.stdin.close() | |
| process.wait() | |
| return (out_path,) | |
| def prepare_animated_batch( | |
| batch: torch.Tensor, | |
| pingpong=False, | |
| resize_by=1.0, | |
| resample_filter: Image.Resampling | None = None, | |
| image_type=np.uint8, | |
| ) -> list[Image.Image]: | |
| images = tensor2np(batch) | |
| images = [frame.astype(image_type) for frame in images] | |
| height, width, _ = batch[0].shape | |
| if pingpong: | |
| reversed_frames = images[::-1] | |
| images.extend(reversed_frames) | |
| pil_images = [Image.fromarray(frame) for frame in images] | |
| # Resize frames if necessary | |
| if abs(resize_by - 1.0) > 1e-6: | |
| new_width = int(width * resize_by) | |
| new_height = int(height * resize_by) | |
| pil_images_resized = [ | |
| frame.resize((new_width, new_height), resample=resample_filter) | |
| for frame in pil_images | |
| ] | |
| pil_images = pil_images_resized | |
| return pil_images | |
| # todo: deprecate for apng | |
| class MTB_SaveGif: | |
| """Save the images from the batch as a GIF. | |
| [DEPRACATED] Use VHS nodes instead | |
| """ | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "image": ("IMAGE",), | |
| "fps": ("INT", {"default": 12, "min": 1, "max": 120}), | |
| "resize_by": ("FLOAT", {"default": 1.0, "min": 0.1}), | |
| "optimize": ("BOOLEAN", {"default": False}), | |
| "pingpong": ("BOOLEAN", {"default": False}), | |
| "resample_filter": (list(PIL_FILTER_MAP.keys()),), | |
| "use_ffmpeg": ("BOOLEAN", {"default": False}), | |
| }, | |
| } | |
| RETURN_TYPES = () | |
| OUTPUT_NODE = True | |
| CATEGORY = "mtb/IO" | |
| FUNCTION = "save_gif" | |
| DEPRECATED = True | |
| def save_gif( | |
| self, | |
| image, | |
| fps=12, | |
| resize_by=1.0, | |
| optimize=False, | |
| pingpong=False, | |
| resample_filter=None, | |
| use_ffmpeg=False, | |
| ): | |
| if image.size(0) == 0: | |
| return ("",) | |
| if resample_filter is not None: | |
| resample_filter = PIL_FILTER_MAP.get(resample_filter) | |
| pil_images = prepare_animated_batch( | |
| image, | |
| pingpong, | |
| resize_by, | |
| resample_filter, | |
| ) | |
| ruuid = uuid.uuid4() | |
| ruuid = ruuid.hex[:10] | |
| out_path = f"{folder_paths.output_directory}/{ruuid}.gif" | |
| if use_ffmpeg: | |
| # Use FFmpeg to create the GIF from PIL images | |
| command = [ | |
| "ffmpeg", | |
| "-f", | |
| "image2pipe", | |
| "-vcodec", | |
| "png", | |
| "-r", | |
| str(fps), | |
| "-i", | |
| "-", | |
| "-vcodec", | |
| "gif", | |
| "-y", | |
| out_path, | |
| ] | |
| process = subprocess.Popen(command, stdin=subprocess.PIPE) | |
| for image in pil_images: | |
| model_management.throw_exception_if_processing_interrupted() | |
| image.save(process.stdin, "PNG") | |
| process.stdin.close() | |
| process.wait() | |
| else: | |
| pil_images[0].save( | |
| out_path, | |
| save_all=True, | |
| append_images=pil_images[1:], | |
| optimize=optimize, | |
| duration=int(1000 / fps), | |
| loop=0, | |
| ) | |
| results = [ | |
| {"filename": f"{ruuid}.gif", "subfolder": "", "type": "output"} | |
| ] | |
| return {"ui": {"gif": results}} | |
| __nodes__ = [ | |
| MTB_SaveGif, | |
| MTB_ExportWithFfmpeg, | |
| MTB_AddToPlaylist, | |
| MTB_ReadPlaylist, | |
| ] | |