import os from PIL import Image from PIL import ImageOps import numpy as np import torch import comfy import folder_paths import base64 from io import BytesIO class LoadImagesFromDirBatch: @classmethod def INPUT_TYPES(s): return { "required": { "directory": ("STRING", {"default": ""}), }, "optional": { "image_load_cap": ("INT", {"default": 0, "min": 0, "step": 1}), "start_index": ("INT", {"default": 0, "min": 0, "step": 1}), } } RETURN_TYPES = ("IMAGE", "MASK", "INT") FUNCTION = "load_images" CATEGORY = "image" def load_images(self, directory: str, image_load_cap: int = 0, start_index: int = 0): if not os.path.isdir(directory): raise FileNotFoundError(f"Directory '{directory} cannot be found.'") dir_files = os.listdir(directory) if len(dir_files) == 0: raise FileNotFoundError(f"No files in directory '{directory}'.") # Filter files by extension valid_extensions = ['.jpg', '.jpeg', '.png', '.webp'] dir_files = [f for f in dir_files if any(f.lower().endswith(ext) for ext in valid_extensions)] dir_files = sorted(dir_files) dir_files = [os.path.join(directory, x) for x in dir_files] # start at start_index dir_files = dir_files[start_index:] images = [] masks = [] limit_images = False if image_load_cap > 0: limit_images = True image_count = 0 for image_path in dir_files: if os.path.isdir(image_path) and os.path.ex: continue if limit_images and image_count >= image_load_cap: break i = Image.open(image_path) i = ImageOps.exif_transpose(i) image = i.convert("RGB") image = np.array(image).astype(np.float32) / 255.0 image = torch.from_numpy(image)[None,] if 'A' in i.getbands(): mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) else: mask = torch.zeros((64, 64), dtype=torch.float32, device="cpu") images.append(image) masks.append(mask) image_count += 1 if len(images) == 1: return (images[0], 1) elif len(images) > 1: image1 = images[0] for image2 in images[1:]: if image1.shape[1:] != image2.shape[1:]: image2 = comfy.utils.common_upscale(image2.movedim(-1, 1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1, -1) image1 = torch.cat((image1, image2), dim=0) return (image1, len(images)) class LoadImagesFromDirList: @classmethod def INPUT_TYPES(s): return { "required": { "directory": ("STRING", {"default": ""}), }, "optional": { "image_load_cap": ("INT", {"default": 0, "min": 0, "step": 1}), "start_index": ("INT", {"default": 0, "min": 0, "step": 1}), } } RETURN_TYPES = ("IMAGE", "MASK") OUTPUT_IS_LIST = (True, True) FUNCTION = "load_images" CATEGORY = "image" def load_images(self, directory: str, image_load_cap: int = 0, start_index: int = 0): if not os.path.isdir(directory): raise FileNotFoundError(f"Directory '{directory} cannot be found.'") dir_files = os.listdir(directory) if len(dir_files) == 0: raise FileNotFoundError(f"No files in directory '{directory}'.") # Filter files by extension valid_extensions = ['.jpg', '.jpeg', '.png', '.webp'] dir_files = [f for f in dir_files if any(f.lower().endswith(ext) for ext in valid_extensions)] dir_files = sorted(dir_files) dir_files = [os.path.join(directory, x) for x in dir_files] # start at start_index dir_files = dir_files[start_index:] images = [] masks = [] limit_images = False if image_load_cap > 0: limit_images = True image_count = 0 for image_path in dir_files: if os.path.isdir(image_path) and os.path.ex: continue if limit_images and image_count >= image_load_cap: break i = Image.open(image_path) i = ImageOps.exif_transpose(i) image = i.convert("RGB") image = np.array(image).astype(np.float32) / 255.0 image = torch.from_numpy(image)[None,] if 'A' in i.getbands(): mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) else: mask = torch.zeros((64, 64), dtype=torch.float32, device="cpu") images.append(image) masks.append(mask) image_count += 1 return images, masks class LoadImageInspire: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": { "image": (sorted(files) + ["#DATA"], {"image_upload": True}), "image_data": ("STRING", {"multiline": False}), } } CATEGORY = "InspirePack/image" RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "load_image" def load_image(self, image, image_data): image_data = base64.b64decode(image_data.split(",")[1]) i = Image.open(BytesIO(image_data)) i = ImageOps.exif_transpose(i) image = i.convert("RGB") image = np.array(image).astype(np.float32) / 255.0 image = torch.from_numpy(image)[None,] if 'A' in i.getbands(): mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) else: mask = torch.zeros((64, 64), dtype=torch.float32, device="cpu") return (image, mask.unsqueeze(0)) class ChangeImageBatchSize: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": { "image": ("IMAGE",), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "step": 1}), "mode": (["simple"],) } } CATEGORY = "InspirePack/image" RETURN_TYPES = ("IMAGE", ) FUNCTION = "load_image" def load_image(self, image, batch_size, mode): if mode == "simple": if len(image) < batch_size: last_frame = image[-1].unsqueeze(0).expand(batch_size - len(image), -1, -1, -1) image = torch.concat((image, last_frame), dim=0) else: image = image[:batch_size, :, :, :] return (image,) else: print(f"[WARN] ChangeImageBatchSize: Unknown mode `{mode}` - ignored") return (image, ) NODE_CLASS_MAPPINGS = { "LoadImagesFromDir //Inspire": LoadImagesFromDirBatch, "LoadImageListFromDir //Inspire": LoadImagesFromDirList, "LoadImage //Inspire": LoadImageInspire, "ChangeImageBatchSize //Inspire": ChangeImageBatchSize, } NODE_DISPLAY_NAME_MAPPINGS = { "LoadImagesFromDir //Inspire": "Load Image Batch From Dir (Inspire)", "LoadImageListFromDir //Inspire": "Load Image List From Dir (Inspire)", "ChangeImageBatchSize //Inspire": "Change Image Batch Size (Inspire)" }