StupidGame's picture
Upload 1941 files
baa8e90
import copy
import os
from segment_anything import SamPredictor
import torch.nn.functional as F
from impact.utils import *
from collections import namedtuple
import numpy as np
from skimage.measure import label, regionprops
import nodes
import comfy_extras.nodes_upscale_model as model_upscale
from server import PromptServer
import comfy
import impact.wildcards as wildcards
import math
import cv2
SEG = namedtuple("SEG",
['cropped_image', 'cropped_mask', 'confidence', 'crop_region', 'bbox', 'label', 'control_net_wrapper'],
defaults=[None])
pb_id_cnt = 0
preview_bridge_image_id_map = {}
preview_bridge_image_name_map = {}
preview_bridge_cache = {}
def set_previewbridge_image(node_id, file, item):
global pb_id_cnt
if file in preview_bridge_image_name_map:
pb_id = preview_bridge_image_name_map[node_id, file]
if pb_id.startswith(f"${node_id}"):
return pb_id
pb_id = f"${node_id}-{pb_id_cnt}"
preview_bridge_image_id_map[pb_id] = (file, item)
preview_bridge_image_name_map[node_id, file] = (pb_id, item)
pb_id_cnt += 1
return pb_id
def erosion_mask(mask, grow_mask_by):
if len(mask.shape) == 3:
mask = mask.squeeze(0)
w = mask.shape[1]
h = mask.shape[0]
device = comfy.model_management.get_torch_device()
mask = mask.clone().to(device)
mask2 = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(w, h),
mode="bilinear").to(device)
if grow_mask_by == 0:
mask_erosion = mask2
else:
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)).to(device)
padding = math.ceil((grow_mask_by - 1) / 2)
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask2.round(), kernel_tensor, padding=padding), 0, 1)
return mask_erosion[:, :, :w, :h].round().cpu()
def ksampler_wrapper(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise,
refiner_ratio=None, refiner_model=None, refiner_clip=None, refiner_positive=None,
refiner_negative=None):
if refiner_ratio is None or refiner_model is None or refiner_clip is None or refiner_positive is None or refiner_negative is None:
refined_latent = \
nodes.KSampler().sample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
denoise)[0]
else:
advanced_steps = math.floor(steps / denoise)
start_at_step = advanced_steps - steps
end_at_step = start_at_step + math.floor(steps * (1.0 - refiner_ratio))
print(f"pre: {start_at_step} .. {end_at_step} / {advanced_steps}")
temp_latent = \
nodes.KSamplerAdvanced().sample(model, "enable", seed, advanced_steps, cfg, sampler_name, scheduler,
positive, negative, latent_image, start_at_step, end_at_step,
"enable")[0]
if 'noise_mask' in latent_image:
# noise_latent = \
# nodes.KSamplerAdvanced().sample(refiner_model, "enable", seed, advanced_steps, cfg, sampler_name,
# scheduler, refiner_positive, refiner_negative, latent_image, end_at_step,
# end_at_step, "enable")[0]
latent_compositor = nodes.NODE_CLASS_MAPPINGS['LatentCompositeMasked']()
temp_latent = \
latent_compositor.composite(latent_image, temp_latent, 0, 0, False, latent_image['noise_mask'])[0]
print(f"post: {end_at_step} .. {advanced_steps + 1} / {advanced_steps}")
refined_latent = \
nodes.KSamplerAdvanced().sample(refiner_model, "disable", seed, advanced_steps, cfg, sampler_name, scheduler,
refiner_positive, refiner_negative, temp_latent, end_at_step,
advanced_steps + 1,
"disable")[0]
return refined_latent
class REGIONAL_PROMPT:
def __init__(self, mask, sampler):
if len(mask.shape) == 3:
mask = mask.squeeze(0)
self.mask = mask
self.sampler = sampler
self.mask_erosion = None
self.erosion_factor = None
def get_mask_erosion(self, factor):
if self.mask_erosion is None or self.erosion_factor != factor:
self.mask_erosion = erosion_mask(self.mask, factor)
self.erosion_factor = factor
return self.mask_erosion
class NO_BBOX_DETECTOR:
pass
class NO_SEGM_DETECTOR:
pass
def create_segmasks(results):
bboxs = results[1]
segms = results[2]
confidence = results[3]
results = []
for i in range(len(segms)):
item = (bboxs[i], segms[i].astype(np.float32), confidence[i])
results.append(item)
return results
def gen_detection_hints_from_mask_area(x, y, mask, threshold, use_negative):
if len(mask.shape) == 3:
mask = mask.squeeze(0)
points = []
plabs = []
# minimum sampling step >= 3
y_step = max(3, int(mask.shape[0] / 20))
x_step = max(3, int(mask.shape[1] / 20))
for i in range(0, len(mask), y_step):
for j in range(0, len(mask[i]), x_step):
if mask[i][j] > threshold:
points.append((x + j, y + i))
plabs.append(1)
elif use_negative and mask[i][j] == 0:
points.append((x + j, y + i))
plabs.append(0)
return points, plabs
def gen_negative_hints(w, h, x1, y1, x2, y2):
npoints = []
nplabs = []
# minimum sampling step >= 3
y_step = max(3, int(w / 20))
x_step = max(3, int(h / 20))
for i in range(10, h - 10, y_step):
for j in range(10, w - 10, x_step):
if not (x1 - 10 <= j and j <= x2 + 10 and y1 - 10 <= i and i <= y2 + 10):
npoints.append((j, i))
nplabs.append(0)
return npoints, nplabs
def enhance_detail(image, model, clip, vae, guide_size, guide_size_for_bbox, max_size, bbox, seed, steps, cfg,
sampler_name,
scheduler, positive, negative, denoise, noise_mask, force_inpaint, wildcard_opt=None,
detailer_hook=None,
refiner_ratio=None, refiner_model=None, refiner_clip=None, refiner_positive=None,
refiner_negative=None, control_net_wrapper=None, cycle=1):
if noise_mask is not None and len(noise_mask.shape) == 3:
noise_mask = noise_mask.squeeze(0)
if wildcard_opt is not None and wildcard_opt != "":
model, _, positive = wildcards.process_with_loras(wildcard_opt, model, clip)
h = image.shape[1]
w = image.shape[2]
bbox_h = bbox[3] - bbox[1]
bbox_w = bbox[2] - bbox[0]
# Skip processing if the detected bbox is already larger than the guide_size
if not force_inpaint and bbox_h >= guide_size and bbox_w >= guide_size:
print(f"Detailer: segment skip (enough big)")
return None, None
if guide_size_for_bbox: # == "bbox"
# Scale up based on the smaller dimension between width and height.
upscale = guide_size / min(bbox_w, bbox_h)
else:
# for cropped_size
upscale = guide_size / min(w, h)
new_w = int(w * upscale)
new_h = int(h * upscale)
# safeguard
if 'aitemplate_keep_loaded' in model.model_options:
max_size = min(4096, max_size)
if new_w > max_size or new_h > max_size:
upscale *= max_size / max(new_w, new_h)
new_w = int(w * upscale)
new_h = int(h * upscale)
if not force_inpaint:
if upscale <= 1.0:
print(f"Detailer: segment skip [determined upscale factor={upscale}]")
return None, None
if new_w == 0 or new_h == 0:
print(f"Detailer: segment skip [zero size={new_w, new_h}]")
return None, None
else:
if upscale <= 1.0 or new_w == 0 or new_h == 0:
print(f"Detailer: force inpaint")
upscale = 1.0
new_w = w
new_h = h
if detailer_hook is not None:
new_w, new_h = detailer_hook.touch_scaled_size(new_w, new_h)
print(f"Detailer: segment upscale for ({bbox_w, bbox_h}) | crop region {w, h} x {upscale} -> {new_w, new_h}")
# upscale
upscaled_image = scale_tensor(new_w, new_h, torch.from_numpy(image))
# ksampler
latent_image = to_latent_image(upscaled_image, vae)
upscaled_mask = None
if noise_mask is not None:
# upscale the mask tensor by a factor of 2 using bilinear interpolation
noise_mask = torch.from_numpy(noise_mask)
upscaled_mask = torch.nn.functional.interpolate(noise_mask.unsqueeze(0).unsqueeze(0), size=(new_h, new_w),
mode='bilinear', align_corners=False)
# remove the extra dimensions added by unsqueeze
upscaled_mask = upscaled_mask.squeeze().squeeze()
latent_image['noise_mask'] = upscaled_mask
if detailer_hook is not None:
latent_image = detailer_hook.post_encode(latent_image)
cnet_pil = None
if control_net_wrapper is not None:
positive, cnet_pil = control_net_wrapper.apply(positive, upscaled_image, upscaled_mask)
refined_latent = latent_image
for i in range(0, cycle):
if detailer_hook is not None and hasattr(detailer_hook, 'cycle_latent'):
refined_latent = detailer_hook.cycle_latent(i, refined_latent)
refined_latent = ksampler_wrapper(model, seed+i, steps, cfg, sampler_name, scheduler, positive, negative,
refined_latent, denoise,
refiner_ratio, refiner_model, refiner_clip, refiner_positive, refiner_negative)
if detailer_hook is not None:
refined_latent = detailer_hook.pre_decode(refined_latent)
# non-latent downscale - latent downscale cause bad quality
refined_image = vae.decode(refined_latent['samples'])
if detailer_hook is not None:
refined_image = detailer_hook.post_decode(refined_image)
# downscale
refined_image = scale_tensor_and_to_pil(w, h, refined_image)
# don't convert to latent - latent break image
# preserving pil is much better
return refined_image, cnet_pil
def enhance_detail_for_animatediff(image_frames, model, clip, vae, guide_size, guide_size_for_bbox, max_size, bbox, seed, steps, cfg,
sampler_name,
scheduler, positive, negative, denoise, noise_mask, wildcard_opt=None,
detailer_hook=None,
refiner_ratio=None, refiner_model=None, refiner_clip=None, refiner_positive=None,
refiner_negative=None):
if noise_mask is not None and len(noise_mask.shape) == 3:
noise_mask = noise_mask.squeeze(0)
if wildcard_opt is not None and wildcard_opt != "":
model, _, positive = wildcards.process_with_loras(wildcard_opt, model, clip)
h = image_frames.shape[1]
w = image_frames.shape[2]
bbox_h = bbox[3] - bbox[1]
bbox_w = bbox[2] - bbox[0]
# Skip processing if the detected bbox is already larger than the guide_size
if guide_size_for_bbox: # == "bbox"
# Scale up based on the smaller dimension between width and height.
upscale = guide_size / min(bbox_w, bbox_h)
else:
# for cropped_size
upscale = guide_size / min(w, h)
new_w = int(w * upscale)
new_h = int(h * upscale)
# safeguard
if 'aitemplate_keep_loaded' in model.model_options:
max_size = min(4096, max_size)
if new_w > max_size or new_h > max_size:
upscale *= max_size / max(new_w, new_h)
new_w = int(w * upscale)
new_h = int(h * upscale)
if upscale <= 1.0 or new_w == 0 or new_h == 0:
print(f"Detailer: force inpaint")
upscale = 1.0
new_w = w
new_h = h
if detailer_hook is not None:
new_w, new_h = detailer_hook.touch_scaled_size(new_w, new_h)
print(f"Detailer: segment upscale for ({bbox_w, bbox_h}) | crop region {w, h} x {upscale} -> {new_w, new_h}")
# upscale the mask tensor by a factor of 2 using bilinear interpolation
noise_mask = torch.from_numpy(noise_mask)
upscaled_mask = torch.nn.functional.interpolate(noise_mask.unsqueeze(0).unsqueeze(0), size=(new_h, new_w),
mode='bilinear', align_corners=False)
upscaled_mask = upscaled_mask.squeeze().squeeze()
latent_frames = None
for image in image_frames:
image = torch.from_numpy(image).unsqueeze(0)
# upscale
upscaled_image = scale_tensor(new_w, new_h, image)
# ksampler
samples = to_latent_image(upscaled_image, vae)['samples']
if latent_frames is None:
latent_frames = samples
else:
latent_frames = torch.concat((latent_frames, samples), dim=0)
upscaled_mask = upscaled_mask.expand(len(image_frames), -1, -1)
latent = {
'noise_mask': upscaled_mask,
'samples': latent_frames
}
if detailer_hook is not None:
latent = detailer_hook.post_encode(latent)
refined_latent = ksampler_wrapper(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
latent, denoise,
refiner_ratio, refiner_model, refiner_clip, refiner_positive, refiner_negative)
if detailer_hook is not None:
refined_latent = detailer_hook.pre_decode(refined_latent)
refined_image_frames = None
for refined_sample in refined_latent['samples']:
refined_sample = refined_sample.unsqueeze(0)
# non-latent downscale - latent downscale cause bad quality
refined_image = vae.decode(refined_sample)
if refined_image_frames is None:
refined_image_frames = refined_image
else:
refined_image_frames = torch.concat((refined_image_frames, refined_image), dim=0)
if detailer_hook is not None:
refined_image_frames = detailer_hook.post_decode(refined_image_frames)
refined_image_frames = nodes.ImageScale().upscale(image=refined_image_frames, upscale_method='lanczos', width=w, height=h, crop='disabled')[0]
return refined_image_frames
def composite_to(dest_latent, crop_region, src_latent):
x1 = crop_region[0]
y1 = crop_region[1]
# composite to original latent
lc = nodes.LatentComposite()
orig_image = lc.composite(dest_latent, src_latent, x1, y1)
return orig_image[0]
def sam_predict(predictor, points, plabs, bbox, threshold):
point_coords = None if not points else np.array(points)
point_labels = None if not plabs else np.array(plabs)
box = np.array([bbox]) if bbox is not None else None
cur_masks, scores, _ = predictor.predict(point_coords=point_coords, point_labels=point_labels, box=box)
total_masks = []
selected = False
max_score = 0
for idx in range(len(scores)):
if scores[idx] > max_score:
max_score = scores[idx]
max_mask = cur_masks[idx]
if scores[idx] >= threshold:
selected = True
total_masks.append(cur_masks[idx])
else:
pass
if not selected:
total_masks.append(max_mask)
return total_masks
def make_sam_mask(sam_model, segs, image, detection_hint, dilation,
threshold, bbox_expansion, mask_hint_threshold, mask_hint_use_negative):
if sam_model.is_auto_mode:
device = comfy.model_management.get_torch_device()
sam_model.to(device=device)
try:
predictor = SamPredictor(sam_model)
image = np.clip(255. * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8)
predictor.set_image(image, "RGB")
total_masks = []
use_small_negative = mask_hint_use_negative == "Small"
# seg_shape = segs[0]
segs = segs[1]
if detection_hint == "mask-points":
points = []
plabs = []
for i in range(len(segs)):
bbox = segs[i].bbox
center = center_of_bbox(segs[i].bbox)
points.append(center)
# small point is background, big point is foreground
if use_small_negative and bbox[2] - bbox[0] < 10:
plabs.append(0)
else:
plabs.append(1)
detected_masks = sam_predict(predictor, points, plabs, None, threshold)
total_masks += detected_masks
else:
for i in range(len(segs)):
bbox = segs[i].bbox
center = center_of_bbox(bbox)
x1 = max(bbox[0] - bbox_expansion, 0)
y1 = max(bbox[1] - bbox_expansion, 0)
x2 = min(bbox[2] + bbox_expansion, image.shape[1])
y2 = min(bbox[3] + bbox_expansion, image.shape[0])
dilated_bbox = [x1, y1, x2, y2]
points = []
plabs = []
if detection_hint == "center-1":
points.append(center)
plabs = [1] # 1 = foreground point, 0 = background point
elif detection_hint == "horizontal-2":
gap = (x2 - x1) / 3
points.append((x1 + gap, center[1]))
points.append((x1 + gap * 2, center[1]))
plabs = [1, 1]
elif detection_hint == "vertical-2":
gap = (y2 - y1) / 3
points.append((center[0], y1 + gap))
points.append((center[0], y1 + gap * 2))
plabs = [1, 1]
elif detection_hint == "rect-4":
x_gap = (x2 - x1) / 3
y_gap = (y2 - y1) / 3
points.append((x1 + x_gap, center[1]))
points.append((x1 + x_gap * 2, center[1]))
points.append((center[0], y1 + y_gap))
points.append((center[0], y1 + y_gap * 2))
plabs = [1, 1, 1, 1]
elif detection_hint == "diamond-4":
x_gap = (x2 - x1) / 3
y_gap = (y2 - y1) / 3
points.append((x1 + x_gap, y1 + y_gap))
points.append((x1 + x_gap * 2, y1 + y_gap))
points.append((x1 + x_gap, y1 + y_gap * 2))
points.append((x1 + x_gap * 2, y1 + y_gap * 2))
plabs = [1, 1, 1, 1]
elif detection_hint == "mask-point-bbox":
center = center_of_bbox(segs[i].bbox)
points.append(center)
plabs = [1]
elif detection_hint == "mask-area":
points, plabs = gen_detection_hints_from_mask_area(segs[i].crop_region[0], segs[i].crop_region[1],
segs[i].cropped_mask,
mask_hint_threshold, use_small_negative)
if mask_hint_use_negative == "Outter":
npoints, nplabs = gen_negative_hints(image.shape[0], image.shape[1],
segs[i].crop_region[0], segs[i].crop_region[1],
segs[i].crop_region[2], segs[i].crop_region[3])
points += npoints
plabs += nplabs
detected_masks = sam_predict(predictor, points, plabs, dilated_bbox, threshold)
total_masks += detected_masks
# merge every collected masks
mask = combine_masks2(total_masks)
finally:
if sam_model.is_auto_mode:
print(f"semd to {device}")
sam_model.to(device="cpu")
if mask is not None:
mask = mask.float()
mask = dilate_mask(mask.cpu().numpy(), dilation)
mask = torch.from_numpy(mask)
else:
mask = torch.zeros((8, 8), dtype=torch.float32, device="cpu") # empty mask
return mask
def generate_detection_hints(image, seg, center, detection_hint, dilated_bbox, mask_hint_threshold, use_small_negative,
mask_hint_use_negative):
[x1, y1, x2, y2] = dilated_bbox
points = []
plabs = []
if detection_hint == "center-1":
points.append(center)
plabs = [1] # 1 = foreground point, 0 = background point
elif detection_hint == "horizontal-2":
gap = (x2 - x1) / 3
points.append((x1 + gap, center[1]))
points.append((x1 + gap * 2, center[1]))
plabs = [1, 1]
elif detection_hint == "vertical-2":
gap = (y2 - y1) / 3
points.append((center[0], y1 + gap))
points.append((center[0], y1 + gap * 2))
plabs = [1, 1]
elif detection_hint == "rect-4":
x_gap = (x2 - x1) / 3
y_gap = (y2 - y1) / 3
points.append((x1 + x_gap, center[1]))
points.append((x1 + x_gap * 2, center[1]))
points.append((center[0], y1 + y_gap))
points.append((center[0], y1 + y_gap * 2))
plabs = [1, 1, 1, 1]
elif detection_hint == "diamond-4":
x_gap = (x2 - x1) / 3
y_gap = (y2 - y1) / 3
points.append((x1 + x_gap, y1 + y_gap))
points.append((x1 + x_gap * 2, y1 + y_gap))
points.append((x1 + x_gap, y1 + y_gap * 2))
points.append((x1 + x_gap * 2, y1 + y_gap * 2))
plabs = [1, 1, 1, 1]
elif detection_hint == "mask-point-bbox":
center = center_of_bbox(seg.bbox)
points.append(center)
plabs = [1]
elif detection_hint == "mask-area":
points, plabs = gen_detection_hints_from_mask_area(seg.crop_region[0], seg.crop_region[1],
seg.cropped_mask,
mask_hint_threshold, use_small_negative)
if mask_hint_use_negative == "Outter":
npoints, nplabs = gen_negative_hints(image.shape[0], image.shape[1],
seg.crop_region[0], seg.crop_region[1],
seg.crop_region[2], seg.crop_region[3])
points += npoints
plabs += nplabs
return points, plabs
def convert_and_stack_masks(masks):
if len(masks) == 0:
return None
mask_tensors = []
for mask in masks:
mask_array = np.array(mask, dtype=np.uint8)
mask_tensor = torch.from_numpy(mask_array)
mask_tensors.append(mask_tensor)
stacked_masks = torch.stack(mask_tensors, dim=0)
stacked_masks = stacked_masks.unsqueeze(1)
return stacked_masks
def merge_and_stack_masks(stacked_masks, group_size):
if stacked_masks is None:
return None
num_masks = stacked_masks.size(0)
merged_masks = []
for i in range(0, num_masks, group_size):
subset_masks = stacked_masks[i:i + group_size]
merged_mask = torch.any(subset_masks, dim=0)
merged_masks.append(merged_mask)
if len(merged_masks) > 0:
merged_masks = torch.stack(merged_masks, dim=0)
return merged_masks
def segs_scale_match(segs, target_shape):
h = segs[0][0]
w = segs[0][1]
th = target_shape[1]
tw = target_shape[2]
if (h == th and w == tw) or h == 0 or w == 0:
return segs
rh = th / h
rw = tw / w
new_segs = []
for seg in segs[1]:
cropped_image = seg.cropped_image
cropped_mask = seg.cropped_mask
x1, y1, x2, y2 = seg.crop_region
bx1, by1, bx2, by2 = seg.bbox
crop_region = int(x1*rw), int(y1*rw), int(x2*rh), int(y2*rh)
bbox = int(bx1*rw), int(by1*rw), int(bx2*rh), int(by2*rh)
new_w = crop_region[2] - crop_region[0]
new_h = crop_region[3] - crop_region[1]
cropped_mask = torch.from_numpy(cropped_mask)
cropped_mask = torch.nn.functional.interpolate(cropped_mask.unsqueeze(0).unsqueeze(0), size=(new_h, new_w),
mode='bilinear', align_corners=False)
cropped_mask = cropped_mask.squeeze(0).squeeze(0).numpy()
if cropped_image is not None:
cropped_image = scale_tensor(new_w, new_h, torch.from_numpy(cropped_image))
cropped_image = cropped_image.numpy()
new_seg = SEG(cropped_image, cropped_mask, seg.confidence, crop_region, bbox, seg.label, seg.control_net_wrapper)
new_segs.append(new_seg)
return ((th, tw), new_segs)
# Used Python's slicing feature. stacked_masks[2::3] means starting from index 2, selecting every third tensor with a step size of 3.
# This allows for quickly obtaining the last tensor of every three tensors in stacked_masks.
def every_three_pick_last(stacked_masks):
selected_masks = stacked_masks[2::3]
return selected_masks
def make_sam_mask_segmented(sam_model, segs, image, detection_hint, dilation,
threshold, bbox_expansion, mask_hint_threshold, mask_hint_use_negative):
if sam_model.is_auto_mode:
device = comfy.model_management.get_torch_device()
sam_model.to(device=device)
try:
predictor = SamPredictor(sam_model)
image = np.clip(255. * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8)
predictor.set_image(image, "RGB")
total_masks = []
use_small_negative = mask_hint_use_negative == "Small"
# seg_shape = segs[0]
segs = segs[1]
if detection_hint == "mask-points":
points = []
plabs = []
for i in range(len(segs)):
bbox = segs[i].bbox
center = center_of_bbox(bbox)
points.append(center)
# small point is background, big point is foreground
if use_small_negative and bbox[2] - bbox[0] < 10:
plabs.append(0)
else:
plabs.append(1)
detected_masks = sam_predict(predictor, points, plabs, None, threshold)
total_masks += detected_masks
else:
for i in range(len(segs)):
bbox = segs[i].bbox
center = center_of_bbox(bbox)
x1 = max(bbox[0] - bbox_expansion, 0)
y1 = max(bbox[1] - bbox_expansion, 0)
x2 = min(bbox[2] + bbox_expansion, image.shape[1])
y2 = min(bbox[3] + bbox_expansion, image.shape[0])
dilated_bbox = [x1, y1, x2, y2]
points, plabs = generate_detection_hints(image, segs[i], center, detection_hint, dilated_bbox,
mask_hint_threshold, use_small_negative,
mask_hint_use_negative)
detected_masks = sam_predict(predictor, points, plabs, dilated_bbox, threshold)
total_masks += detected_masks
# merge every collected masks
mask = combine_masks2(total_masks)
finally:
# Temporarily disabling the switch back to CPU after inference.
# Rationale: After multiple tests and comparisons, it's concluded that not only does it fail to conserve GPU memory,
# but it also introduces additional IO overhead from transferring the model between devices.
# if sam_model.is_auto_mode:
# sam_model.to(device=torch.device("cpu"))
pass
mask_working_device = torch.device("cpu")
if mask is not None:
mask = mask.float()
mask = dilate_mask(mask.cpu().numpy(), dilation)
mask = torch.from_numpy(mask)
mask = mask.to(device=mask_working_device)
else:
# Extracting batch, height and width
height, width, _ = image.shape
mask = torch.zeros(
(height, width), dtype=torch.float32, device=mask_working_device
) # empty mask
stacked_masks = convert_and_stack_masks(total_masks)
return (mask, merge_and_stack_masks(stacked_masks, group_size=3))
# return every_three_pick_last(stacked_masks)
def segs_bitwise_and_mask(segs, mask):
if len(mask.shape) == 3:
mask = mask.squeeze(0)
if mask is None:
print("[SegsBitwiseAndMask] Cannot operate: MASK is empty.")
return ([],)
items = []
mask = (mask.cpu().numpy() * 255).astype(np.uint8)
for seg in segs[1]:
cropped_mask = (seg.cropped_mask * 255).astype(np.uint8)
crop_region = seg.crop_region
cropped_mask2 = mask[crop_region[1]:crop_region[3], crop_region[0]:crop_region[2]]
new_mask = np.bitwise_and(cropped_mask.astype(np.uint8), cropped_mask2)
new_mask = new_mask.astype(np.float32) / 255.0
item = SEG(seg.cropped_image, new_mask, seg.confidence, seg.crop_region, seg.bbox, seg.label, None)
items.append(item)
return segs[0], items
def apply_mask_to_each_seg(segs, masks):
if masks is None:
print("[SegsBitwiseAndMask] Cannot operate: MASK is empty.")
return (segs[0], [],)
items = []
masks = masks.squeeze(1)
for seg, mask in zip(segs[1], masks):
cropped_mask = (seg.cropped_mask * 255).astype(np.uint8)
crop_region = seg.crop_region
cropped_mask2 = (mask.cpu().numpy() * 255).astype(np.uint8)
cropped_mask2 = cropped_mask2[crop_region[1]:crop_region[3], crop_region[0]:crop_region[2]]
new_mask = np.bitwise_and(cropped_mask.astype(np.uint8), cropped_mask2)
new_mask = new_mask.astype(np.float32) / 255.0
item = SEG(seg.cropped_image, new_mask, seg.confidence, seg.crop_region, seg.bbox, seg.label, None)
items.append(item)
return segs[0], items
class ONNXDetector:
onnx_model = None
def __init__(self, onnx_model):
self.onnx_model = onnx_model
def detect(self, image, threshold, dilation, crop_factor, drop_size=1, detailer_hook=None):
drop_size = max(drop_size, 1)
try:
import impact.onnx as onnx
h = image.shape[1]
w = image.shape[2]
labels, scores, boxes = onnx.onnx_inference(image, self.onnx_model)
# collect feasible item
result = []
for i in range(len(labels)):
if scores[i] > threshold:
item_bbox = boxes[i]
x1, y1, x2, y2 = item_bbox
if x2 - x1 > drop_size and y2 - y1 > drop_size: # minimum dimension must be (2,2) to avoid squeeze issue
crop_region = make_crop_region(w, h, item_bbox, crop_factor)
if detailer_hook is not None:
crop_region = item_bbox.post_crop_region(w, h, item_bbox, crop_region)
crop_x1, crop_y1, crop_x2, crop_y2, = crop_region
# prepare cropped mask
cropped_mask = np.zeros((crop_y2 - crop_y1, crop_x2 - crop_x1))
cropped_mask[y1 - crop_y1:y2 - crop_y1, x1 - crop_x1:x2 - crop_x1] = 1
cropped_mask = dilate_mask(cropped_mask, dilation)
# make items. just convert the integer label to a string
item = SEG(None, cropped_mask, scores[i], crop_region, item_bbox, str(labels[i]), None)
result.append(item)
shape = h, w
return shape, result
except Exception as e:
print(f"ONNXDetector: unable to execute.\n{e}")
pass
def detect_combined(self, image, threshold, dilation):
return segs_to_combined_mask(self.detect(image, threshold, dilation, 1))
def setAux(self, x):
pass
def mask_to_segs(mask, combined, crop_factor, bbox_fill, drop_size=1, label='A', crop_min_size=None, detailer_hook=None, is_contour=True):
drop_size = max(drop_size, 1)
if mask is None:
print("[mask_to_segs] Cannot operate: MASK is empty.")
return ([],)
if isinstance(mask, np.ndarray):
pass # `mask` is already a NumPy array
else:
try:
mask = mask.numpy()
except AttributeError:
print("[mask_to_segs] Cannot operate: MASK is not a NumPy array or Tensor.")
return ([],)
if mask is None:
print("[mask_to_segs] Cannot operate: MASK is empty.")
return ([],)
result = []
if len(mask.shape) == 2:
mask = np.expand_dims(mask, axis=0)
for i in range(mask.shape[0]):
mask_i = mask[i]
if combined:
indices = np.nonzero(mask_i)
if len(indices[0]) > 0 and len(indices[1]) > 0:
bbox = (
np.min(indices[1]),
np.min(indices[0]),
np.max(indices[1]),
np.max(indices[0]),
)
crop_region = make_crop_region(
mask_i.shape[1], mask_i.shape[0], bbox, crop_factor
)
x1, y1, x2, y2 = crop_region
if detailer_hook is not None:
crop_region = detailer_hook.post_crop_region(mask_i.shape[1], mask_i.shape[0], bbox, crop_region)
if x2 - x1 > 0 and y2 - y1 > 0:
cropped_mask = mask_i[y1:y2, x1:x2]
if cropped_mask is not None:
item = SEG(None, cropped_mask, 1.0, crop_region, bbox, label, None)
result.append(item)
else:
mask_i_uint8 = (mask_i * 255.0).astype(np.uint8)
contours, _ = cv2.findContours(mask_i_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
separated_mask = np.zeros_like(mask_i_uint8)
cv2.drawContours(separated_mask, [contour], 0, 255, -1)
separated_mask = np.array(separated_mask / 255.0).astype(np.float32)
x, y, w, h = cv2.boundingRect(contour)
bbox = x, y, x + w, y + h
crop_region = make_crop_region(
mask_i.shape[1], mask_i.shape[0], bbox, crop_factor, crop_min_size
)
if detailer_hook is not None:
crop_region = detailer_hook.post_crop_region(mask_i.shape[1], mask_i.shape[0], bbox, crop_region)
if w > drop_size and h > drop_size:
if is_contour:
mask_src = separated_mask
else:
mask_src = mask_i
cropped_mask = np.array(
mask_src[
crop_region[1]: crop_region[3],
crop_region[0]: crop_region[2],
]
)
if bbox_fill:
cx1, cy1, _, _ = crop_region
bx1 = x - cx1
bx2 = x+w - cx1
by1 = y - cy1
by2 = y+h - cy1
cropped_mask[by1:by2, bx1:bx2] = 1.0
if cropped_mask is not None:
item = SEG(None, cropped_mask, 1.0, crop_region, bbox, label, None)
result.append(item)
if not result:
print(f"[mask_to_segs] Empty mask.")
print(f"# of Detected SEGS: {len(result)}")
# for r in result:
# print(f"\tbbox={r.bbox}, crop={r.crop_region}, label={r.label}")
# shape: (b,h,w) -> (h,w)
return (mask.shape[1], mask.shape[2]), result
def mediapipe_facemesh_to_segs(image, crop_factor, bbox_fill, crop_min_size, drop_size, dilation, face, mouth, left_eyebrow, left_eye, left_pupil, right_eyebrow, right_eye, right_pupil):
parts = {
"face": np.array([0x0A, 0xC8, 0x0A]),
"mouth": np.array([0x0A, 0xB4, 0x0A]),
"left_eyebrow": np.array([0xB4, 0xDC, 0x0A]),
"left_eye": np.array([0xB4, 0xC8, 0x0A]),
"left_pupil": np.array([0xFA, 0xC8, 0x0A]),
"right_eyebrow": np.array([0x0A, 0xDC, 0xB4]),
"right_eye": np.array([0x0A, 0xC8, 0xB4]),
"right_pupil": np.array([0x0A, 0xC8, 0xFA]),
}
def create_segment(image, color):
image = (image * 255).to(torch.uint8)
image = image.squeeze(0).numpy()
mask = cv2.inRange(image, color, color)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
max_contour = max(contours, key=cv2.contourArea)
convex_hull = cv2.convexHull(max_contour)
convex_segment = np.zeros_like(image)
cv2.fillPoly(convex_segment, [convex_hull], (255, 255, 255))
convex_segment = np.expand_dims(convex_segment, axis=0).astype(np.float32) / 255.0
tensor = torch.from_numpy(convex_segment)
mask_tensor = torch.any(tensor != 0, dim=-1).float()
mask_tensor = mask_tensor.squeeze(0)
mask_tensor = torch.from_numpy(dilate_mask(mask_tensor.numpy(), dilation))
return mask_tensor.unsqueeze(0)
return None
segs = []
def create_seg(label):
mask = create_segment(image, parts[label])
if mask is not None:
seg = mask_to_segs(mask, False, crop_factor, bbox_fill, drop_size=drop_size, label=label, crop_min_size=crop_min_size)
if len(seg[1]) > 0:
segs.append(seg[1][0])
if face:
create_seg('face')
if mouth:
create_seg('mouth')
if left_eyebrow:
create_seg('left_eyebrow')
if left_eye:
create_seg('left_eye')
if left_pupil:
create_seg('left_pupil')
if right_eyebrow:
create_seg('right_eyebrow')
if right_eye:
create_seg('right_eye')
if right_pupil:
create_seg('right_pupil')
return (image.shape[1], image.shape[2]), segs
def segs_to_combined_mask(segs):
shape = segs[0]
h = shape[0]
w = shape[1]
mask = np.zeros((h, w), dtype=np.uint8)
for seg in segs[1]:
cropped_mask = seg.cropped_mask
crop_region = seg.crop_region
mask[crop_region[1]:crop_region[3], crop_region[0]:crop_region[2]] |= (cropped_mask * 255).astype(np.uint8)
return torch.from_numpy(mask.astype(np.float32) / 255.0)
def segs_to_masklist(segs):
shape = segs[0]
h = shape[0]
w = shape[1]
masks = []
for seg in segs[1]:
mask = np.zeros((h, w), dtype=np.uint8)
cropped_mask = seg.cropped_mask
crop_region = seg.crop_region
mask[crop_region[1]:crop_region[3], crop_region[0]:crop_region[2]] |= (cropped_mask * 255).astype(np.uint8)
mask = torch.from_numpy(mask.astype(np.float32) / 255.0)
masks.append(mask)
if len(masks) == 0:
empty_mask = torch.zeros((h, w), dtype=torch.float32, device="cpu")
masks = [empty_mask]
return masks
def vae_decode(vae, samples, use_tile, hook, tile_size=512):
if use_tile:
pixels = nodes.VAEDecodeTiled().decode(vae, samples, tile_size)[0]
else:
pixels = nodes.VAEDecode().decode(vae, samples)[0]
if hook is not None:
pixels = hook.post_decode(pixels)
return pixels
def vae_encode(vae, pixels, use_tile, hook, tile_size=512):
if use_tile:
samples = nodes.VAEEncodeTiled().encode(vae, pixels, tile_size)[0]
else:
samples = nodes.VAEEncode().encode(vae, pixels)[0]
if hook is not None:
samples = hook.post_encode(samples)
return samples
class KSamplerWrapper:
params = None
def __init__(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise):
self.params = model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise
def sample(self, latent_image, hook=None):
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise = self.params
if hook is not None:
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise = \
hook.pre_ksample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
denoise)
return nodes.common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
denoise=denoise)[0]
class KSamplerAdvancedWrapper:
params = None
def __init__(self, model, cfg, sampler_name, scheduler, positive, negative):
self.params = model, cfg, sampler_name, scheduler, positive, negative
def sample_advanced(self, add_noise, seed, steps, latent_image, start_at_step, end_at_step,
return_with_leftover_noise, hook=None, recover_special_sampler=False):
model, cfg, sampler_name, scheduler, positive, negative = self.params
if hook is not None:
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent = \
hook.pre_ksample_advanced(model, add_noise, seed, steps, cfg, sampler_name, scheduler,
positive, negative, latent_image, start_at_step, end_at_step,
return_with_leftover_noise)
if recover_special_sampler and sampler_name in ['uni_pc', 'uni_pc_bh2', 'dpmpp_sde', 'dpmpp_sde_gpu', 'dpmpp_2m_sde', 'dpmpp_2m_sde_gpu', 'dpmpp_3m_sde', 'dpmpp_3m_sde_gpu']:
base_image = latent_image.copy()
else:
base_image = None
try:
latent_image = nodes.KSamplerAdvanced().sample(model, add_noise, seed, steps, cfg, sampler_name, scheduler,
positive, negative, latent_image, start_at_step, end_at_step,
return_with_leftover_noise)[0]
except ValueError as e:
if str(e) == 'sigma_min and sigma_max must not be 0':
print(f"\nWARN: sampling skipped - sigma_min and sigma_max are 0")
return latent_image
if recover_special_sampler and sampler_name in ['uni_pc', 'uni_pc_bh2', 'dpmpp_sde', 'dpmpp_sde_gpu', 'dpmpp_2m_sde', 'dpmpp_2m_sde_gpu', 'dpmpp_3m_sde', 'dpmpp_3m_sde_gpu']:
compensate = 0 if sampler_name in ['uni_pc', 'uni_pc_bh2'] else 2
sampler_name = 'dpmpp_fast' if sampler_name in ['uni_pc', 'uni_pc_bh2', 'dpmpp_sde', 'dpmpp_sde_gpu'] else 'dpmpp_2m'
latent_compositor = nodes.NODE_CLASS_MAPPINGS['LatentCompositeMasked']()
noise_mask = latent_image['noise_mask']
if len(noise_mask.shape) == 4:
noise_mask = noise_mask.squeeze(0).squeeze(0)
latent_image = \
latent_compositor.composite(base_image, latent_image, 0, 0, False, noise_mask)[0]
try:
latent_image = nodes.KSamplerAdvanced().sample(model, add_noise, seed, steps, cfg, sampler_name, scheduler,
positive, negative, latent_image, start_at_step-compensate, end_at_step,
return_with_leftover_noise)[0]
except ValueError as e:
if str(e) == 'sigma_min and sigma_max must not be 0':
print(f"\nWARN: sampling skipped - sigma_min and sigma_max are 0")
return latent_image
class PixelKSampleHook:
cur_step = 0
total_step = 0
def __init__(self):
pass
def set_steps(self, info):
self.cur_step, self.total_step = info
def post_decode(self, pixels):
return pixels
def post_upscale(self, pixels):
return pixels
def post_encode(self, samples):
return samples
def pre_decode(self, samples):
return samples
def pre_ksample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent,
denoise):
return model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise
def post_crop_region(self, w, h, item_bbox, crop_region):
return crop_region
def touch_scaled_size(self, w, h):
return w, h
class PixelKSampleHookCombine(PixelKSampleHook):
hook1 = None
hook2 = None
def __init__(self, hook1, hook2):
super().__init__()
self.hook1 = hook1
self.hook2 = hook2
def set_steps(self, info):
self.hook1.set_steps(info)
self.hook2.set_steps(info)
def post_decode(self, pixels):
return self.hook2.post_decode(self.hook1.post_decode(pixels))
def post_upscale(self, pixels):
return self.hook2.post_upscale(self.hook1.post_upscale(pixels))
def post_encode(self, samples):
return self.hook2.post_encode(self.hook1.post_encode(samples))
def pre_ksample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent,
denoise):
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise = \
self.hook1.pre_ksample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
upscaled_latent, denoise)
return self.hook2.pre_ksample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
upscaled_latent, denoise)
class SimpleCfgScheduleHook(PixelKSampleHook):
target_cfg = 0
def __init__(self, target_cfg):
super().__init__()
self.target_cfg = target_cfg
def pre_ksample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent,
denoise):
progress = self.cur_step / self.total_step
gap = self.target_cfg - cfg
current_cfg = cfg + gap * progress
return model, seed, steps, current_cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise
class SimpleDenoiseScheduleHook(PixelKSampleHook):
target_denoise = 0
def __init__(self, target_denoise):
super().__init__()
self.target_denoise = target_denoise
def pre_ksample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent,
denoise):
progress = self.cur_step / self.total_step
gap = self.target_denoise - denoise
current_denoise = denoise + gap * progress
return model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, current_denoise
def latent_upscale_on_pixel_space_shape(samples, scale_method, w, h, vae, use_tile=False, tile_size=512,
save_temp_prefix=None, hook=None):
pixels = vae_decode(vae, samples, use_tile, hook, tile_size=tile_size)
if save_temp_prefix is not None:
nodes.PreviewImage().save_images(pixels, filename_prefix=save_temp_prefix)
pixels = nodes.ImageScale().upscale(pixels, scale_method, int(w), int(h), False)[0]
if hook is not None:
pixels = hook.post_upscale(pixels)
return vae_encode(vae, pixels, use_tile, hook, tile_size=tile_size)
def latent_upscale_on_pixel_space2(samples, scale_method, scale_factor, vae, use_tile=False, tile_size=512,
save_temp_prefix=None, hook=None):
pixels = vae_decode(vae, samples, use_tile, hook, tile_size=tile_size)
if save_temp_prefix is not None:
nodes.PreviewImage().save_images(pixels, filename_prefix=save_temp_prefix)
w = pixels.shape[2] * scale_factor
h = pixels.shape[1] * scale_factor
pixels = nodes.ImageScale().upscale(pixels, scale_method, int(w), int(h), False)[0]
if hook is not None:
pixels = hook.post_upscale(pixels)
return (vae_encode(vae, pixels, use_tile, hook, tile_size=tile_size), pixels)
def latent_upscale_on_pixel_space(samples, scale_method, scale_factor, vae, use_tile=False, tile_size=512,
save_temp_prefix=None, hook=None):
return latent_upscale_on_pixel_space2(samples, scale_method, scale_factor, vae, use_tile, tile_size, save_temp_prefix, hook)[0]
def latent_upscale_on_pixel_space_with_model_shape(samples, scale_method, upscale_model, new_w, new_h, vae,
use_tile=False, tile_size=512, save_temp_prefix=None, hook=None):
pixels = vae_decode(vae, samples, use_tile, hook, tile_size=tile_size)
if save_temp_prefix is not None:
nodes.PreviewImage().save_images(pixels, filename_prefix=save_temp_prefix)
w = pixels.shape[2]
# upscale by model upscaler
current_w = w
while current_w < new_w:
pixels = model_upscale.ImageUpscaleWithModel().upscale(upscale_model, pixels)[0]
current_w = pixels.shape[2]
if current_w == w:
print(f"[latent_upscale_on_pixel_space_with_model] x1 upscale model selected")
break
# downscale to target scale
pixels = nodes.ImageScale().upscale(pixels, scale_method, int(new_w), int(new_h), False)[0]
if hook is not None:
pixels = hook.post_upscale(pixels)
return vae_encode(vae, pixels, use_tile, hook, tile_size=tile_size)
def latent_upscale_on_pixel_space_with_model2(samples, scale_method, upscale_model, scale_factor, vae, use_tile=False,
tile_size=512, save_temp_prefix=None, hook=None):
pixels = vae_decode(vae, samples, use_tile, hook, tile_size=tile_size)
if save_temp_prefix is not None:
nodes.PreviewImage().save_images(pixels, filename_prefix=save_temp_prefix)
w = pixels.shape[2]
h = pixels.shape[1]
new_w = w * scale_factor
new_h = h * scale_factor
# upscale by model upscaler
current_w = w
while current_w < new_w:
pixels = model_upscale.ImageUpscaleWithModel().upscale(upscale_model, pixels)[0]
current_w = pixels.shape[2]
if current_w == w:
print(f"[latent_upscale_on_pixel_space_with_model] x1 upscale model selected")
break
# downscale to target scale
pixels = nodes.ImageScale().upscale(pixels, scale_method, int(new_w), int(new_h), False)[0]
if hook is not None:
pixels = hook.post_upscale(pixels)
return (vae_encode(vae, pixels, use_tile, hook, tile_size=tile_size), pixels)
def latent_upscale_on_pixel_space_with_model(samples, scale_method, upscale_model, scale_factor, vae, use_tile=False,
tile_size=512, save_temp_prefix=None, hook=None):
return latent_upscale_on_pixel_space_with_model2(samples, scale_method, upscale_model, scale_factor, vae, use_tile, tile_size, save_temp_prefix, hook)[0]
class TwoSamplersForMaskUpscaler:
params = None
upscale_model = None
hook_base = None
hook_mask = None
hook_full = None
use_tiled_vae = False
is_tiled = False
tile_size = 512
def __init__(self, scale_method, sample_schedule, use_tiled_vae, base_sampler, mask_sampler, mask, vae,
full_sampler_opt=None, upscale_model_opt=None, hook_base_opt=None, hook_mask_opt=None,
hook_full_opt=None,
tile_size=512):
if len(mask.shape) == 3:
mask = mask.squeeze(0)
mask = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
self.params = scale_method, sample_schedule, use_tiled_vae, base_sampler, mask_sampler, mask, vae
self.upscale_model = upscale_model_opt
self.full_sampler = full_sampler_opt
self.hook_base = hook_base_opt
self.hook_mask = hook_mask_opt
self.hook_full = hook_full_opt
self.use_tiled_vae = use_tiled_vae
self.tile_size = tile_size
def upscale(self, step_info, samples, upscale_factor, save_temp_prefix=None):
scale_method, sample_schedule, use_tiled_vae, base_sampler, mask_sampler, mask, vae = self.params
if len(mask.shape) == 3:
mask = mask.squeeze(0)
self.prepare_hook(step_info)
# upscale latent
if self.upscale_model is None:
upscaled_latent = latent_upscale_on_pixel_space(samples, scale_method, upscale_factor, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix,
hook=self.hook_base, tile_size=self.tile_size)
else:
upscaled_latent = latent_upscale_on_pixel_space_with_model(samples, scale_method, self.upscale_model,
upscale_factor, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix,
hook=self.hook_mask, tile_size=self.tile_size)
return self.do_samples(step_info, base_sampler, mask_sampler, sample_schedule, mask, upscaled_latent)
def prepare_hook(self, step_info):
if self.hook_base is not None:
self.hook_base.set_steps(step_info)
if self.hook_mask is not None:
self.hook_mask.set_steps(step_info)
if self.hook_full is not None:
self.hook_full.set_steps(step_info)
def upscale_shape(self, step_info, samples, w, h, save_temp_prefix=None):
scale_method, sample_schedule, use_tiled_vae, base_sampler, mask_sampler, mask, vae = self.params
if len(mask.shape) == 3:
mask = mask.squeeze(0)
self.prepare_hook(step_info)
# upscale latent
if self.upscale_model is None:
upscaled_latent = latent_upscale_on_pixel_space_shape(samples, scale_method, w, h, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix,
hook=self.hook_base,
tile_size=self.tile_size)
else:
upscaled_latent = latent_upscale_on_pixel_space_with_model_shape(samples, scale_method, self.upscale_model,
w, h, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix,
hook=self.hook_mask,
tile_size=self.tile_size)
return self.do_samples(step_info, base_sampler, mask_sampler, sample_schedule, mask, upscaled_latent)
def is_full_sample_time(self, step_info, sample_schedule):
cur_step, total_step = step_info
# make start from 1 instead of zero
cur_step += 1
total_step += 1
if sample_schedule == "none":
return False
elif sample_schedule == "interleave1":
return cur_step % 2 == 0
elif sample_schedule == "interleave2":
return cur_step % 3 == 0
elif sample_schedule == "interleave3":
return cur_step % 4 == 0
elif sample_schedule == "last1":
return cur_step == total_step
elif sample_schedule == "last2":
return cur_step >= total_step - 1
elif sample_schedule == "interleave1+last1":
return cur_step % 2 == 0 or cur_step >= total_step - 1
elif sample_schedule == "interleave2+last1":
return cur_step % 2 == 0 or cur_step >= total_step - 1
elif sample_schedule == "interleave3+last1":
return cur_step % 2 == 0 or cur_step >= total_step - 1
def do_samples(self, step_info, base_sampler, mask_sampler, sample_schedule, mask, upscaled_latent):
if len(mask.shape) == 3:
mask = mask.squeeze(0)
if self.is_full_sample_time(step_info, sample_schedule):
print(f"step_info={step_info} / full time")
upscaled_latent = base_sampler.sample(upscaled_latent, self.hook_base)
sampler = self.full_sampler if self.full_sampler is not None else base_sampler
return sampler.sample(upscaled_latent, self.hook_full)
else:
print(f"step_info={step_info} / non-full time")
# upscale mask
upscaled_mask = F.interpolate(mask, size=(
upscaled_latent['samples'].shape[2], upscaled_latent['samples'].shape[3]),
mode='bilinear', align_corners=True)
upscaled_mask = upscaled_mask[:, :, :upscaled_latent['samples'].shape[2],
:upscaled_latent['samples'].shape[3]]
# base sampler
upscaled_inv_mask = torch.where(upscaled_mask != 1.0, torch.tensor(1.0), torch.tensor(0.0))
upscaled_latent['noise_mask'] = upscaled_inv_mask
upscaled_latent = base_sampler.sample(upscaled_latent, self.hook_base)
# mask sampler
upscaled_latent['noise_mask'] = upscaled_mask
upscaled_latent = mask_sampler.sample(upscaled_latent, self.hook_mask)
# remove mask
del upscaled_latent['noise_mask']
return upscaled_latent
class PixelKSampleUpscaler:
params = None
upscale_model = None
hook = None
use_tiled_vae = False
is_tiled = False
tile_size = 512
def __init__(self, scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise,
use_tiled_vae, upscale_model_opt=None, hook_opt=None, tile_size=512):
self.params = scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise
self.upscale_model = upscale_model_opt
self.hook = hook_opt
self.use_tiled_vae = use_tiled_vae
self.tile_size = tile_size
def upscale(self, step_info, samples, upscale_factor, save_temp_prefix=None):
scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise = self.params
if self.hook is not None:
self.hook.set_steps(step_info)
if self.upscale_model is None:
upscaled_latent = latent_upscale_on_pixel_space(samples, scale_method, upscale_factor, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix, hook=self.hook)
else:
upscaled_latent = latent_upscale_on_pixel_space_with_model(samples, scale_method, self.upscale_model,
upscale_factor, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix,
hook=self.hook,
tile_size=self.tile_size)
if self.hook is not None:
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise = \
self.hook.pre_ksample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
upscaled_latent, denoise)
refined_latent = nodes.KSampler().sample(model, seed, steps, cfg, sampler_name, scheduler,
positive, negative, upscaled_latent, denoise)[0]
return refined_latent
def upscale_shape(self, step_info, samples, w, h, save_temp_prefix=None):
scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise = self.params
if self.hook is not None:
self.hook.set_steps(step_info)
if self.upscale_model is None:
upscaled_latent = latent_upscale_on_pixel_space_shape(samples, scale_method, w, h, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix, hook=self.hook,
tile_size=self.tile_size)
else:
upscaled_latent = latent_upscale_on_pixel_space_with_model_shape(samples, scale_method, self.upscale_model,
w, h, vae,
use_tile=self.use_tiled_vae,
save_temp_prefix=save_temp_prefix,
hook=self.hook,
tile_size=self.tile_size)
if self.hook is not None:
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise = \
self.hook.pre_ksample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
upscaled_latent, denoise)
refined_latent = nodes.KSampler().sample(model, seed, steps, cfg, sampler_name, scheduler,
positive, negative, upscaled_latent, denoise)[0]
return refined_latent
class ControlNetWrapper:
def __init__(self, control_net, strength, preprocessor):
self.control_net = control_net
self.strength = strength
self.preprocessor = preprocessor
self.image = None
def apply(self, conditioning, image, mask=None):
if self.preprocessor is not None:
image = self.preprocessor.apply(image, mask)
return nodes.ControlNetApply().apply_controlnet(conditioning, self.control_net, image, self.strength)[0], image
class CoreMLHook(PixelKSampleHook):
def __init__(self, mode):
super().__init__()
resolution = mode.split('x')
self.w = int(resolution[0])
self.h = int(resolution[1])
self.override_bbox_by_segm = False
def pre_decode(self, samples):
new_samples = copy.deepcopy(samples)
new_samples['samples'] = samples['samples'][0].unsqueeze(0)
return new_samples
def post_encode(self, samples):
new_samples = copy.deepcopy(samples)
new_samples['samples'] = samples['samples'].repeat(2, 1, 1, 1)
return new_samples
def post_crop_region(self, w, h, item_bbox, crop_region):
x1, y1, x2, y2 = crop_region
bx1, by1, bx2, by2 = item_bbox
crop_w = x2-x1
crop_h = y2-y1
crop_ratio = crop_w/crop_h
target_ratio = self.w/self.h
if crop_ratio < target_ratio:
# shrink height
top_gap = by1 - y1
bottom_gap = y2 - by2
gap_ratio = top_gap / bottom_gap
target_height = 1/target_ratio*crop_w
delta_height = crop_h - target_height
new_y1 = int(y1 + delta_height*gap_ratio)
new_y2 = int(new_y1 + target_height)
crop_region = x1, new_y1, x2, new_y2
elif crop_ratio > target_ratio:
# shrink width
left_gap = bx1 - x1
right_gap = x2 - bx2
gap_ratio = left_gap / right_gap
target_width = target_ratio*crop_h
delta_width = crop_w - target_width
new_x1 = int(x1 + delta_width*gap_ratio)
new_x2 = int(new_x1 + target_width)
crop_region = new_x1, y1, new_x2, y2
return crop_region
def touch_scaled_size(self, w, h):
return self.w, self.h
# REQUIREMENTS: BlenderNeko/ComfyUI Noise
class InjectNoiseHook(PixelKSampleHook):
def __init__(self, source, seed, start_strength, end_strength):
super().__init__()
self.source = source
self.seed = seed
self.start_strength = start_strength
self.end_strength = end_strength
def post_encode(self, samples, seed_idx=0):
# gen noise
size = samples['samples'].shape
seed = self.cur_step + self.seed + seed_idx
if "BNK_NoisyLatentImage" in nodes.NODE_CLASS_MAPPINGS and "BNK_InjectNoise" in nodes.NODE_CLASS_MAPPINGS:
NoisyLatentImage = nodes.NODE_CLASS_MAPPINGS["BNK_NoisyLatentImage"]
InjectNoise = nodes.NODE_CLASS_MAPPINGS["BNK_InjectNoise"]
else:
raise Exception("'BNK_NoisyLatentImage', 'BNK_InjectNoise' nodes are not installed.")
noise = NoisyLatentImage().create_noisy_latents(self.source, seed, size[3] * 8, size[2] * 8, size[0])[0]
# inj noise
mask = None
if 'noise_mask' in samples:
mask = samples['noise_mask']
strength = self.start_strength + (self.end_strength - self.start_strength) * self.cur_step / self.total_step
samples = InjectNoise().inject_noise(samples, strength, noise, mask)[0]
if mask is not None:
samples['noise_mask'] = mask
return samples
def cycle_latent(self, i, latent):
if i == 0:
return latent
else:
return self.post_encode(latent, i)
# REQUIREMENTS: BlenderNeko/ComfyUI_TiledKSampler
class TiledKSamplerWrapper:
params = None
def __init__(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise,
tile_width, tile_height, tiling_strategy):
self.params = model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise, tile_width, tile_height, tiling_strategy
def sample(self, latent_image, hook=None):
if "BNK_TiledKSampler" in nodes.NODE_CLASS_MAPPINGS:
TiledKSampler = nodes.NODE_CLASS_MAPPINGS['BNK_TiledKSampler']
else:
raise Exception("'BNK_TiledKSampler' node isn't installed.")
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise, tile_width, tile_height, tiling_strategy = self.params
if hook is not None:
model, seed, steps, cfg, sampler_name, scheduler, positive, negative, upscaled_latent, denoise = \
hook.pre_ksample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
denoise)
return \
TiledKSampler().sample(model, seed, tile_width, tile_height, tiling_strategy, steps, cfg, sampler_name,
scheduler,
positive, negative, latent_image, denoise)[0]
class PixelTiledKSampleUpscaler:
params = None
upscale_model = None
tile_params = None
hook = None
is_tiled = True
tile_size = 512
def __init__(self, scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative,
denoise,
tile_width, tile_height, tiling_strategy,
upscale_model_opt=None, hook_opt=None, tile_size=512):
self.params = scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise
self.tile_params = tile_width, tile_height, tiling_strategy
self.upscale_model = upscale_model_opt
self.hook = hook_opt
self.tile_size = tile_size
def tiled_ksample(self, latent):
if "BNK_TiledKSampler" in nodes.NODE_CLASS_MAPPINGS:
TiledKSampler = nodes.NODE_CLASS_MAPPINGS['BNK_TiledKSampler']
else:
raise Exception("'BNK_TiledKSampler' node isn't installed.")
scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise = self.params
tile_width, tile_height, tiling_strategy = self.tile_params
return \
TiledKSampler().sample(model, seed, tile_width, tile_height, tiling_strategy, steps, cfg, sampler_name,
scheduler,
positive, negative, latent, denoise)[0]
def upscale(self, step_info, samples, upscale_factor, save_temp_prefix=None):
scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise = self.params
if self.hook is not None:
self.hook.set_steps(step_info)
if self.upscale_model is None:
upscaled_latent = latent_upscale_on_pixel_space(samples, scale_method, upscale_factor, vae,
use_tile=True, save_temp_prefix=save_temp_prefix,
hook=self.hook,
tile_size=self.tile_size)
else:
upscaled_latent = latent_upscale_on_pixel_space_with_model(samples, scale_method, self.upscale_model,
upscale_factor, vae,
use_tile=True,
save_temp_prefix=save_temp_prefix,
hook=self.hook,
tile_size=self.tile_size)
refined_latent = self.tiled_ksample(upscaled_latent)
return refined_latent
def upscale_shape(self, step_info, samples, w, h, save_temp_prefix=None):
scale_method, model, vae, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise = self.params
if self.hook is not None:
self.hook.set_steps(step_info)
if self.upscale_model is None:
upscaled_latent = latent_upscale_on_pixel_space_shape(samples, scale_method, w, h, vae,
use_tile=True, save_temp_prefix=save_temp_prefix,
hook=self.hook, tile_size=self.tile_size)
else:
upscaled_latent = latent_upscale_on_pixel_space_with_model_shape(samples, scale_method,
self.upscale_model, w, h, vae,
use_tile=True,
save_temp_prefix=save_temp_prefix,
hook=self.hook,
tile_size=self.tile_size)
refined_latent = self.tiled_ksample(upscaled_latent)
return refined_latent
# REQUIREMENTS: biegert/ComfyUI-CLIPSeg
class BBoxDetectorBasedOnCLIPSeg:
prompt = None
blur = None
threshold = None
dilation_factor = None
aux = None
def __init__(self, prompt, blur, threshold, dilation_factor):
self.prompt = prompt
self.blur = blur
self.threshold = threshold
self.dilation_factor = dilation_factor
def detect(self, image, bbox_threshold, bbox_dilation, bbox_crop_factor, drop_size=1, detailer_hook=None):
mask = self.detect_combined(image, bbox_threshold, bbox_dilation)
if len(mask.shape) == 3:
mask = mask.squeeze(0)
segs = mask_to_segs(mask, False, bbox_crop_factor, True, drop_size, detailer_hook=detailer_hook)
return segs
def detect_combined(self, image, bbox_threshold, bbox_dilation):
if "CLIPSeg" in nodes.NODE_CLASS_MAPPINGS:
CLIPSeg = nodes.NODE_CLASS_MAPPINGS['CLIPSeg']
else:
raise Exception("'CLIPSeg' node isn't installed.")
if self.threshold is None:
threshold = bbox_threshold
else:
threshold = self.threshold
if self.dilation_factor is None:
dilation_factor = bbox_dilation
else:
dilation_factor = self.dilation_factor
prompt = self.aux if self.prompt == '' and self.aux is not None else self.prompt
mask, _, _ = CLIPSeg().segment_image(image, prompt, self.blur, threshold, dilation_factor)
mask = to_binary_mask(mask)
return mask
def setAux(self, x):
self.aux = x
def update_node_status(node, text, progress=None):
if PromptServer.instance.client_id is None:
return
PromptServer.instance.send_sync("impact/update_status", {
"node": node,
"progress": progress,
"text": text
}, PromptServer.instance.client_id)
from comfy.cli_args import args, LatentPreviewMethod
import folder_paths
from latent_preview import TAESD, TAESDPreviewerImpl, Latent2RGBPreviewer
try:
import comfy.latent_formats as latent_formats
def get_previewer(device, latent_format=latent_formats.SD15(), force=False, method=None):
previewer = None
if method is None:
method = args.preview_method
if method != LatentPreviewMethod.NoPreviews or force:
# TODO previewer methods
taesd_decoder_path = folder_paths.get_full_path("vae_approx", latent_format.taesd_decoder_name)
if method == LatentPreviewMethod.Auto:
method = LatentPreviewMethod.Latent2RGB
if taesd_decoder_path:
method = LatentPreviewMethod.TAESD
if method == LatentPreviewMethod.TAESD:
if taesd_decoder_path:
taesd = TAESD(None, taesd_decoder_path).to(device)
previewer = TAESDPreviewerImpl(taesd)
else:
print("Warning: TAESD previews enabled, but could not find models/vae_approx/{}".format(
latent_format.taesd_decoder_name))
if previewer is None:
previewer = Latent2RGBPreviewer(latent_format.latent_rgb_factors)
return previewer
except:
print(f"#########################################################################")
print(f"[ERROR] ComfyUI-Impact-Pack: Please update ComfyUI to the latest version.")
print(f"#########################################################################")