|
|
import numpy as np |
|
|
import torch |
|
|
from tqdm.auto import trange |
|
|
from typing_extensions import override |
|
|
|
|
|
import comfy.model_patcher |
|
|
import comfy.samplers |
|
|
import comfy.utils |
|
|
from comfy.k_diffusion.sampling import to_d |
|
|
from comfy_api.latest import ComfyExtension, io |
|
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
def sample_lcm_upscale(model, x, sigmas, extra_args=None, callback=None, disable=None, total_upscale=2.0, upscale_method="bislerp", upscale_steps=None): |
|
|
extra_args = {} if extra_args is None else extra_args |
|
|
|
|
|
if upscale_steps is None: |
|
|
upscale_steps = max(len(sigmas) // 2 + 1, 2) |
|
|
else: |
|
|
upscale_steps += 1 |
|
|
upscale_steps = min(upscale_steps, len(sigmas) + 1) |
|
|
|
|
|
upscales = np.linspace(1.0, total_upscale, upscale_steps)[1:] |
|
|
|
|
|
orig_shape = x.size() |
|
|
s_in = x.new_ones([x.shape[0]]) |
|
|
for i in trange(len(sigmas) - 1, disable=disable): |
|
|
denoised = model(x, sigmas[i] * s_in, **extra_args) |
|
|
if callback is not None: |
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised}) |
|
|
|
|
|
x = denoised |
|
|
if i < len(upscales): |
|
|
x = comfy.utils.common_upscale(x, round(orig_shape[-1] * upscales[i]), round(orig_shape[-2] * upscales[i]), upscale_method, "disabled") |
|
|
|
|
|
if sigmas[i + 1] > 0: |
|
|
x += sigmas[i + 1] * torch.randn_like(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class SamplerLCMUpscale(io.ComfyNode): |
|
|
UPSCALE_METHODS = ["bislerp", "nearest-exact", "bilinear", "area", "bicubic"] |
|
|
|
|
|
@classmethod |
|
|
def define_schema(cls) -> io.Schema: |
|
|
return io.Schema( |
|
|
node_id="SamplerLCMUpscale", |
|
|
category="sampling/custom_sampling/samplers", |
|
|
inputs=[ |
|
|
io.Float.Input("scale_ratio", default=1.0, min=0.1, max=20.0, step=0.01), |
|
|
io.Int.Input("scale_steps", default=-1, min=-1, max=1000, step=1), |
|
|
io.Combo.Input("upscale_method", options=cls.UPSCALE_METHODS), |
|
|
], |
|
|
outputs=[io.Sampler.Output()], |
|
|
) |
|
|
|
|
|
@classmethod |
|
|
def execute(cls, scale_ratio, scale_steps, upscale_method) -> io.NodeOutput: |
|
|
if scale_steps < 0: |
|
|
scale_steps = None |
|
|
sampler = comfy.samplers.KSAMPLER(sample_lcm_upscale, extra_options={"total_upscale": scale_ratio, "upscale_steps": scale_steps, "upscale_method": upscale_method}) |
|
|
return io.NodeOutput(sampler) |
|
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
def sample_euler_pp(model, x, sigmas, extra_args=None, callback=None, disable=None): |
|
|
extra_args = {} if extra_args is None else extra_args |
|
|
|
|
|
temp = [0] |
|
|
def post_cfg_function(args): |
|
|
temp[0] = args["uncond_denoised"] |
|
|
return args["denoised"] |
|
|
|
|
|
model_options = extra_args.get("model_options", {}).copy() |
|
|
extra_args["model_options"] = comfy.model_patcher.set_model_options_post_cfg_function(model_options, post_cfg_function, disable_cfg1_optimization=True) |
|
|
|
|
|
s_in = x.new_ones([x.shape[0]]) |
|
|
for i in trange(len(sigmas) - 1, disable=disable): |
|
|
sigma_hat = sigmas[i] |
|
|
denoised = model(x, sigma_hat * s_in, **extra_args) |
|
|
d = to_d(x - denoised + temp[0], sigmas[i], denoised) |
|
|
if callback is not None: |
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised}) |
|
|
dt = sigmas[i + 1] - sigma_hat |
|
|
x = x + d * dt |
|
|
return x |
|
|
|
|
|
|
|
|
class SamplerEulerCFGpp(io.ComfyNode): |
|
|
@classmethod |
|
|
def define_schema(cls) -> io.Schema: |
|
|
return io.Schema( |
|
|
node_id="SamplerEulerCFGpp", |
|
|
display_name="SamplerEulerCFG++", |
|
|
category="_for_testing", |
|
|
inputs=[ |
|
|
io.Combo.Input("version", options=["regular", "alternative"]), |
|
|
], |
|
|
outputs=[io.Sampler.Output()], |
|
|
is_experimental=True, |
|
|
) |
|
|
|
|
|
@classmethod |
|
|
def execute(cls, version) -> io.NodeOutput: |
|
|
if version == "alternative": |
|
|
sampler = comfy.samplers.KSAMPLER(sample_euler_pp) |
|
|
else: |
|
|
sampler = comfy.samplers.ksampler("euler_cfg_pp") |
|
|
return io.NodeOutput(sampler) |
|
|
|
|
|
|
|
|
class AdvancedSamplersExtension(ComfyExtension): |
|
|
@override |
|
|
async def get_node_list(self) -> list[type[io.ComfyNode]]: |
|
|
return [ |
|
|
SamplerLCMUpscale, |
|
|
SamplerEulerCFGpp, |
|
|
] |
|
|
|
|
|
async def comfy_entrypoint() -> AdvancedSamplersExtension: |
|
|
return AdvancedSamplersExtension() |
|
|
|