Spaces:
Configuration error
Configuration error
from typing import Iterator, List, Tuple, Dict, Any, Union, Optional | |
from _decimal import Context, getcontext | |
from decimal import Decimal | |
from .libs.utils import AlwaysEqualProxy, ByPassTypeTuple, cleanGPUUsedForce, compare_revision | |
from .libs.cache import remove_cache | |
import numpy as np | |
import re | |
import json | |
import torch | |
import comfy.utils | |
DEFAULT_FLOW_NUM = 2 | |
MAX_FLOW_NUM = 10 | |
lazy_options = {"lazy": True} if compare_revision(2543) else {} | |
def validate_list_args(args: Dict[str, List[Any]]) -> Tuple[bool, Optional[str], Optional[str]]: | |
""" | |
Checks that if there are multiple arguments, they are all the same length or 1 | |
:param args: | |
:return: Tuple (Status, mismatched_key_1, mismatched_key_2) | |
""" | |
# Only have 1 arg | |
if len(args) == 1: | |
return True, None, None | |
len_to_match = None | |
matched_arg_name = None | |
for arg_name, arg in args.items(): | |
if arg_name == 'self': | |
# self is in locals() | |
continue | |
if len(arg) != 1: | |
if len_to_match is None: | |
len_to_match = len(arg) | |
matched_arg_name = arg_name | |
elif len(arg) != len_to_match: | |
return False, arg_name, matched_arg_name | |
return True, None, None | |
def error_if_mismatched_list_args(args: Dict[str, List[Any]]) -> None: | |
is_valid, failed_key1, failed_key2 = validate_list_args(args) | |
if not is_valid: | |
assert failed_key1 is not None | |
assert failed_key2 is not None | |
raise ValueError( | |
f"Mismatched list inputs received. {failed_key1}({len(args[failed_key1])}) !== {failed_key2}({len(args[failed_key2])})" | |
) | |
def zip_with_fill(*lists: Union[List[Any], None]) -> Iterator[Tuple[Any, ...]]: | |
""" | |
Zips lists together, but if a list has 1 element, it will be repeated for each element in the other lists. | |
If a list is None, None will be used for that element. | |
(Not intended for use with lists of different lengths) | |
:param lists: | |
:return: Iterator of tuples of length len(lists) | |
""" | |
max_len = max(len(lst) if lst is not None else 0 for lst in lists) | |
for i in range(max_len): | |
yield tuple(None if lst is None else (lst[0] if len(lst) == 1 else lst[i]) for lst in lists) | |
# ---------------------------------------------------------------类型 开始----------------------------------------------------------------------# | |
# 字符串 | |
class String: | |
def INPUT_TYPES(s): | |
return { | |
"required": {"value": ("STRING", {"default": ""})}, | |
} | |
RETURN_TYPES = ("STRING",) | |
RETURN_NAMES = ("string",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic/Type" | |
def execute(self, value): | |
return (value,) | |
# 整数 | |
class Int: | |
def INPUT_TYPES(s): | |
return { | |
"required": {"value": ("INT", {"default": 0, "min": -999999, "max": 999999,})}, | |
} | |
RETURN_TYPES = ("INT",) | |
RETURN_NAMES = ("int",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic/Type" | |
def execute(self, value): | |
return (value,) | |
# 整数范围 | |
class RangeInt: | |
def __init__(self) -> None: | |
pass | |
def INPUT_TYPES(s) -> Dict[str, Dict[str, Any]]: | |
return { | |
"required": { | |
"range_mode": (["step", "num_steps"], {"default": "step"}), | |
"start": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
"stop": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
"step": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
"num_steps": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
"end_mode": (["Inclusive", "Exclusive"], {"default": "Inclusive"}), | |
}, | |
} | |
RETURN_TYPES = ("INT", "INT") | |
RETURN_NAMES = ("range", "range_sizes") | |
INPUT_IS_LIST = True | |
OUTPUT_IS_LIST = (True, True) | |
FUNCTION = "build_range" | |
CATEGORY = "EasyUse/Logic/Type" | |
def build_range( | |
self, range_mode, start, stop, step, num_steps, end_mode | |
) -> Tuple[List[int], List[int]]: | |
error_if_mismatched_list_args(locals()) | |
ranges = [] | |
range_sizes = [] | |
for range_mode, e_start, e_stop, e_num_steps, e_step, e_end_mode in zip_with_fill( | |
range_mode, start, stop, num_steps, step, end_mode | |
): | |
if range_mode == 'step': | |
if e_end_mode == "Inclusive": | |
e_stop += 1 | |
vals = list(range(e_start, e_stop, e_step)) | |
ranges.extend(vals) | |
range_sizes.append(len(vals)) | |
elif range_mode == 'num_steps': | |
direction = 1 if e_stop > e_start else -1 | |
if e_end_mode == "Exclusive": | |
e_stop -= direction | |
vals = (np.rint(np.linspace(e_start, e_stop, e_num_steps)).astype(int).tolist()) | |
ranges.extend(vals) | |
range_sizes.append(len(vals)) | |
return ranges, range_sizes | |
# 浮点数 | |
class Float: | |
def INPUT_TYPES(s): | |
return { | |
"required": {"value": ("FLOAT", {"default": 0, "step": 0.01, "min": -999999, "max": 999999,})}, | |
} | |
RETURN_TYPES = ("FLOAT",) | |
RETURN_NAMES = ("float",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic/Type" | |
def execute(self, value): | |
return (value,) | |
# 浮点数范围 | |
class RangeFloat: | |
def __init__(self) -> None: | |
pass | |
def INPUT_TYPES(s) -> Dict[str, Dict[str, Any]]: | |
return { | |
"required": { | |
"range_mode": (["step", "num_steps"], {"default": "step"}), | |
"start": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}), | |
"stop": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}), | |
"step": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}), | |
"num_steps": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
"end_mode": (["Inclusive", "Exclusive"], {"default": "Inclusive"}), | |
}, | |
} | |
RETURN_TYPES = ("FLOAT", "INT") | |
RETURN_NAMES = ("range", "range_sizes") | |
INPUT_IS_LIST = True | |
OUTPUT_IS_LIST = (True, True) | |
FUNCTION = "build_range" | |
CATEGORY = "EasyUse/Logic/Type" | |
def _decimal_range( | |
range_mode: String, start: Decimal, stop: Decimal, step: Decimal, num_steps: Int, inclusive: bool | |
) -> Iterator[float]: | |
if range_mode == 'step': | |
ret_val = start | |
if inclusive: | |
stop = stop + step | |
direction = 1 if step > 0 else -1 | |
while (ret_val - stop) * direction < 0: | |
yield float(ret_val) | |
ret_val += step | |
elif range_mode == 'num_steps': | |
step = (stop - start) / (num_steps - 1) | |
direction = 1 if step > 0 else -1 | |
ret_val = start | |
for _ in range(num_steps): | |
if (ret_val - stop) * direction > 0: # Ensure we don't exceed the 'stop' value | |
break | |
yield float(ret_val) | |
ret_val += step | |
def build_range( | |
self, | |
range_mode, | |
start, | |
stop, | |
step, | |
num_steps, | |
end_mode, | |
) -> Tuple[List[float], List[int]]: | |
error_if_mismatched_list_args(locals()) | |
getcontext().prec = 12 | |
start = [Decimal(s) for s in start] | |
stop = [Decimal(s) for s in stop] | |
step = [Decimal(s) for s in step] | |
ranges = [] | |
range_sizes = [] | |
for range_mode, e_start, e_stop, e_step, e_num_steps, e_end_mode in zip_with_fill( | |
range_mode, start, stop, step, num_steps, end_mode | |
): | |
vals = list( | |
self._decimal_range(range_mode, e_start, e_stop, e_step, e_num_steps, e_end_mode == 'Inclusive') | |
) | |
ranges.extend(vals) | |
range_sizes.append(len(vals)) | |
return ranges, range_sizes | |
# 布尔 | |
class Boolean: | |
def INPUT_TYPES(s): | |
return { | |
"required": {"value": ("BOOLEAN", {"default": False})}, | |
} | |
RETURN_TYPES = ("BOOLEAN",) | |
RETURN_NAMES = ("boolean",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic/Type" | |
def execute(self, value): | |
return (value,) | |
# ---------------------------------------------------------------开关 开始----------------------------------------------------------------------# | |
class imageSwitch: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"image_a": ("IMAGE",), | |
"image_b": ("IMAGE",), | |
"boolean": ("BOOLEAN", {"default": False}), | |
} | |
} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "image_switch" | |
CATEGORY = "EasyUse/Logic/Switch" | |
def image_switch(self, image_a, image_b, boolean): | |
if boolean: | |
return (image_a, ) | |
else: | |
return (image_b, ) | |
class textSwitch: | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"input": ("INT", {"default": 1, "min": 1, "max": 2}), | |
}, | |
"optional": { | |
"text1": ("STRING", {"forceInput": True}), | |
"text2": ("STRING", {"forceInput": True}), | |
} | |
} | |
RETURN_TYPES = ("STRING",) | |
RETURN_NAMES = ("STRING",) | |
CATEGORY = "EasyUse/Logic/Switch" | |
FUNCTION = "switch" | |
def switch(self, input, text1=None, text2=None,): | |
if input == 1: | |
return (text1,) | |
else: | |
return (text2,) | |
# ---------------------------------------------------------------Index Switch----------------------------------------------------------------------# | |
class anythingIndexSwitch: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
inputs = { | |
"required": { | |
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
}, | |
"optional": { | |
} | |
} | |
for i in range(DEFAULT_FLOW_NUM): | |
inputs["optional"]["value%d" % i] = (AlwaysEqualProxy("*"),lazy_options) | |
return inputs | |
RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
RETURN_NAMES = ("value",) | |
FUNCTION = "index_switch" | |
CATEGORY = "EasyUse/Logic/Index Switch" | |
def check_lazy_status(self, index, **kwargs): | |
key = "value%d" % index | |
if kwargs.get(key, None) is None: | |
return [key] | |
def index_switch(self, index, **kwargs): | |
key = "value%d" % index | |
return (kwargs[key],) | |
class imageIndexSwitch: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
inputs = { | |
"required": { | |
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
}, | |
"optional": { | |
} | |
} | |
for i in range(DEFAULT_FLOW_NUM): | |
inputs["optional"]["image%d" % i] = ("IMAGE",lazy_options) | |
return inputs | |
RETURN_TYPES = ("IMAGE",) | |
RETURN_NAMES = ("image",) | |
FUNCTION = "index_switch" | |
CATEGORY = "EasyUse/Logic/Index Switch" | |
def check_lazy_status(self, index, **kwargs): | |
key = "image%d" % index | |
if kwargs.get(key, None) is None: | |
return [key] | |
def index_switch(self, index, **kwargs): | |
key = "image%d" % index | |
return (kwargs[key],) | |
class textIndexSwitch: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
inputs = { | |
"required": { | |
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
}, | |
"optional": { | |
} | |
} | |
for i in range(DEFAULT_FLOW_NUM): | |
inputs["optional"]["text%d" % i] = ("STRING",{**lazy_options,"forceInput":True}) | |
return inputs | |
RETURN_TYPES = ("STRING",) | |
RETURN_NAMES = ("text",) | |
FUNCTION = "index_switch" | |
CATEGORY = "EasyUse/Logic/Index Switch" | |
def check_lazy_status(self, index, **kwargs): | |
key = "text%d" % index | |
if kwargs.get(key, None) is None: | |
return [key] | |
def index_switch(self, index, **kwargs): | |
key = "text%d" % index | |
return (kwargs[key],) | |
class conditioningIndexSwitch: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
inputs = { | |
"required": { | |
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
}, | |
"optional": { | |
} | |
} | |
for i in range(DEFAULT_FLOW_NUM): | |
inputs["optional"]["cond%d" % i] = ("CONDITIONING",lazy_options) | |
return inputs | |
RETURN_TYPES = ("CONDITIONING",) | |
RETURN_NAMES = ("conditioning",) | |
FUNCTION = "index_switch" | |
CATEGORY = "EasyUse/Logic/Index Switch" | |
def check_lazy_status(self, index, **kwargs): | |
key = "cond%d" % index | |
if kwargs.get(key, None) is None: | |
return [key] | |
def index_switch(self, index, **kwargs): | |
key = "cond%d" % index | |
return (kwargs[key],) | |
# ---------------------------------------------------------------Math----------------------------------------------------------------------# | |
class mathIntOperation: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), | |
"b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), | |
"operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],), | |
}, | |
} | |
RETURN_TYPES = ("INT",) | |
FUNCTION = "int_math_operation" | |
CATEGORY = "EasyUse/Logic/Math" | |
def int_math_operation(self, a, b, operation): | |
if operation == "add": | |
return (a + b,) | |
elif operation == "subtract": | |
return (a - b,) | |
elif operation == "multiply": | |
return (a * b,) | |
elif operation == "divide": | |
return (a // b,) | |
elif operation == "modulo": | |
return (a % b,) | |
elif operation == "power": | |
return (a ** b,) | |
class mathFloatOperation: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"a": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}), | |
"b": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}), | |
"operation": (["==", "!=", "<", ">", "<=", ">="],), | |
}, | |
} | |
RETURN_TYPES = ("BOOLEAN",) | |
FUNCTION = "float_math_operation" | |
CATEGORY = "EasyUse/Logic/Math" | |
def float_math_operation(self, a, b, operation): | |
if operation == "==": | |
return (a == b,) | |
elif operation == "!=": | |
return (a != b,) | |
elif operation == "<": | |
return (a < b,) | |
elif operation == ">": | |
return (a > b,) | |
elif operation == "<=": | |
return (a <= b,) | |
elif operation == ">=": | |
return (a >= b,) | |
class mathStringOperation: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"a": ("STRING", {"multiline": False}), | |
"b": ("STRING", {"multiline": False}), | |
"operation": (["a == b", "a != b", "a IN b", "a MATCH REGEX(b)", "a BEGINSWITH b", "a ENDSWITH b"],), | |
"case_sensitive": ("BOOLEAN", {"default": True}), | |
}, | |
} | |
RETURN_TYPES = ("BOOLEAN",) | |
FUNCTION = "string_math_operation" | |
CATEGORY = "EasyUse/Logic/Math" | |
def string_math_operation(self, a, b, operation, case_sensitive): | |
if not case_sensitive: | |
a = a.lower() | |
b = b.lower() | |
if operation == "a == b": | |
return (a == b,) | |
elif operation == "a != b": | |
return (a != b,) | |
elif operation == "a IN b": | |
return (a in b,) | |
elif operation == "a MATCH REGEX(b)": | |
try: | |
return (re.match(b, a) is not None,) | |
except: | |
return (False,) | |
elif operation == "a BEGINSWITH b": | |
return (a.startswith(b),) | |
elif operation == "a ENDSWITH b": | |
return (a.endswith(b),) | |
# ---------------------------------------------------------------Flow----------------------------------------------------------------------# | |
try: | |
from comfy_execution.graph_utils import GraphBuilder, is_link | |
except: | |
GraphBuilder = None | |
class whileLoopStart: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
inputs = { | |
"required": { | |
"condition": ("BOOLEAN", {"default": True}), | |
}, | |
"optional": { | |
}, | |
} | |
for i in range(MAX_FLOW_NUM): | |
inputs["optional"]["initial_value%d" % i] = ("*",) | |
return inputs | |
RETURN_TYPES = ByPassTypeTuple(tuple(["FLOW_CONTROL"] + ["*"] * MAX_FLOW_NUM)) | |
RETURN_NAMES = ByPassTypeTuple(tuple(["flow"] + ["value%d" % i for i in range(MAX_FLOW_NUM)])) | |
FUNCTION = "while_loop_open" | |
CATEGORY = "EasyUse/Logic/While Loop" | |
def while_loop_open(self, condition, **kwargs): | |
values = [] | |
for i in range(MAX_FLOW_NUM): | |
values.append(kwargs.get("initial_value%d" % i, None)) | |
return tuple(["stub"] + values) | |
class whileLoopEnd: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
inputs = { | |
"required": { | |
"flow": ("FLOW_CONTROL", {"rawLink": True}), | |
"condition": ("BOOLEAN", {}), | |
}, | |
"optional": { | |
}, | |
"hidden": { | |
"dynprompt": "DYNPROMPT", | |
"unique_id": "UNIQUE_ID", | |
} | |
} | |
for i in range(MAX_FLOW_NUM): | |
inputs["optional"]["initial_value%d" % i] = (AlwaysEqualProxy('*'),) | |
return inputs | |
RETURN_TYPES = ByPassTypeTuple(tuple([AlwaysEqualProxy('*')] * MAX_FLOW_NUM)) | |
RETURN_NAMES = ByPassTypeTuple(tuple(["value%d" % i for i in range(MAX_FLOW_NUM)])) | |
FUNCTION = "while_loop_close" | |
CATEGORY = "EasyUse/Logic/While Loop" | |
def explore_dependencies(self, node_id, dynprompt, upstream): | |
node_info = dynprompt.get_node(node_id) | |
if "inputs" not in node_info: | |
return | |
for k, v in node_info["inputs"].items(): | |
if is_link(v): | |
parent_id = v[0] | |
if parent_id not in upstream: | |
upstream[parent_id] = [] | |
self.explore_dependencies(parent_id, dynprompt, upstream) | |
upstream[parent_id].append(node_id) | |
def collect_contained(self, node_id, upstream, contained): | |
if node_id not in upstream: | |
return | |
for child_id in upstream[node_id]: | |
if child_id not in contained: | |
contained[child_id] = True | |
self.collect_contained(child_id, upstream, contained) | |
def while_loop_close(self, flow, condition, dynprompt=None, unique_id=None, **kwargs): | |
if not condition: | |
# We're done with the loop | |
values = [] | |
for i in range(MAX_FLOW_NUM): | |
values.append(kwargs.get("initial_value%d" % i, None)) | |
return tuple(values) | |
# We want to loop | |
this_node = dynprompt.get_node(unique_id) | |
upstream = {} | |
# Get the list of all nodes between the open and close nodes | |
self.explore_dependencies(unique_id, dynprompt, upstream) | |
contained = {} | |
open_node = flow[0] | |
self.collect_contained(open_node, upstream, contained) | |
contained[unique_id] = True | |
contained[open_node] = True | |
graph = GraphBuilder() | |
for node_id in contained: | |
original_node = dynprompt.get_node(node_id) | |
node = graph.node(original_node["class_type"], "Recurse" if node_id == unique_id else node_id) | |
node.set_override_display_id(node_id) | |
for node_id in contained: | |
original_node = dynprompt.get_node(node_id) | |
node = graph.lookup_node("Recurse" if node_id == unique_id else node_id) | |
for k, v in original_node["inputs"].items(): | |
if is_link(v) and v[0] in contained: | |
parent = graph.lookup_node(v[0]) | |
node.set_input(k, parent.out(v[1])) | |
else: | |
node.set_input(k, v) | |
new_open = graph.lookup_node(open_node) | |
for i in range(MAX_FLOW_NUM): | |
key = "initial_value%d" % i | |
new_open.set_input(key, kwargs.get(key, None)) | |
my_clone = graph.lookup_node("Recurse") | |
result = map(lambda x: my_clone.out(x), range(MAX_FLOW_NUM)) | |
return { | |
"result": tuple(result), | |
"expand": graph.finalize(), | |
} | |
class forLoopStart: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"total": ("INT", {"default": 1, "min": 1, "max": 100000, "step": 1}), | |
}, | |
"optional": { | |
"initial_value%d" % i: (AlwaysEqualProxy("*"),) for i in range(1, MAX_FLOW_NUM) | |
}, | |
"hidden": { | |
"initial_value0": (AlwaysEqualProxy("*"),), | |
"prompt": "PROMPT", | |
"extra_pnginfo": "EXTRA_PNGINFO", | |
"unique_id": "UNIQUE_ID" | |
} | |
} | |
RETURN_TYPES = ByPassTypeTuple(tuple(["FLOW_CONTROL", "INT"] + [AlwaysEqualProxy("*")] * (MAX_FLOW_NUM - 1))) | |
RETURN_NAMES = ByPassTypeTuple(tuple(["flow", "index"] + ["value%d" % i for i in range(1, MAX_FLOW_NUM)])) | |
FUNCTION = "for_loop_start" | |
CATEGORY = "EasyUse/Logic/For Loop" | |
def for_loop_start(self, total, prompt=None, extra_pnginfo=None, unique_id=None, **kwargs): | |
graph = GraphBuilder() | |
i = 0 | |
unique_id = unique_id.split('.')[len(unique_id.split('.'))-1] if "." in unique_id else unique_id | |
node = next((x for x in extra_pnginfo['workflow']['nodes'] if x['id'] == int(unique_id)), None) | |
if node: | |
node['properties']['total'] = total | |
if "initial_value0" in kwargs: | |
i = kwargs["initial_value0"] | |
initial_values = {("initial_value%d" % num): kwargs.get("initial_value%d" % num, None) for num in range(1, MAX_FLOW_NUM)} | |
while_open = graph.node("easy whileLoopStart", condition=total, initial_value0=i, **initial_values) | |
outputs = [kwargs.get("initial_value%d" % num, None) for num in range(1, MAX_FLOW_NUM)] | |
return { | |
"result": tuple(["stub", i] + outputs), | |
"expand": graph.finalize(), | |
} | |
class forLoopEnd: | |
def __init__(self): | |
pass | |
def INPUT_TYPES(cls): | |
return { | |
"required": { | |
"flow": ("FLOW_CONTROL", {"rawLink": True}), | |
}, | |
"optional": { | |
"initial_value%d" % i: (AlwaysEqualProxy("*"), {"rawLink": True}) for i in range(1, MAX_FLOW_NUM) | |
}, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, | |
} | |
RETURN_TYPES = ByPassTypeTuple(tuple([AlwaysEqualProxy("*")] * (MAX_FLOW_NUM - 1))) | |
RETURN_NAMES = ByPassTypeTuple(tuple(["value%d" % i for i in range(1, MAX_FLOW_NUM)])) | |
FUNCTION = "for_loop_end" | |
CATEGORY = "EasyUse/Logic/For Loop" | |
def for_loop_end(self, flow, prompt=None, extra_pnginfo=None, my_unique_id=None, **kwargs): | |
graph = GraphBuilder() | |
while_open = flow[0] | |
total = None | |
if extra_pnginfo: | |
node = next((x for x in extra_pnginfo['workflow']['nodes'] if x['id'] == int(while_open)), None) | |
if node: | |
if 'properties' in node and 'total' in node['properties']: | |
total = node['properties']['total'] | |
else: | |
total = node['widgets_values'][0] if "widgets_values" in node else None | |
if total is None: | |
raise Exception("Unable to get parameters for the start of the loop") | |
sub = graph.node("easy mathInt", operation="add", a=[while_open, 1], b=1) | |
cond = graph.node("easy compare", a=sub.out(0), b=total, comparison='a < b') | |
input_values = {("initial_value%d" % i): kwargs.get("initial_value%d" % i, None) for i in | |
range(1, MAX_FLOW_NUM)} | |
while_close = graph.node("easy whileLoopEnd", | |
flow=flow, | |
condition=cond.out(0), | |
initial_value0=sub.out(0), | |
**input_values) | |
return { | |
"result": tuple([while_close.out(i) for i in range(1, MAX_FLOW_NUM)]), | |
"expand": graph.finalize(), | |
} | |
COMPARE_FUNCTIONS = { | |
"a == b": lambda a, b: a == b, | |
"a != b": lambda a, b: a != b, | |
"a < b": lambda a, b: a < b, | |
"a > b": lambda a, b: a > b, | |
"a <= b": lambda a, b: a <= b, | |
"a >= b": lambda a, b: a >= b, | |
} | |
# 比较 | |
class Compare: | |
def INPUT_TYPES(s): | |
compare_functions = list(COMPARE_FUNCTIONS.keys()) | |
return { | |
"required": { | |
"a": (AlwaysEqualProxy("*"), {"default": 0}), | |
"b": (AlwaysEqualProxy("*"), {"default": 0}), | |
"comparison": (compare_functions, {"default": "a == b"}), | |
}, | |
} | |
RETURN_TYPES = ("BOOLEAN",) | |
RETURN_NAMES = ("boolean",) | |
FUNCTION = "compare" | |
CATEGORY = "EasyUse/Logic" | |
def compare(self, a, b, comparison): | |
return (COMPARE_FUNCTIONS[comparison](a, b),) | |
# 判断 | |
class IfElse: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"boolean": ("BOOLEAN",), | |
"on_true": (AlwaysEqualProxy("*"), lazy_options), | |
"on_false": (AlwaysEqualProxy("*"), lazy_options), | |
}, | |
} | |
RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
RETURN_NAMES = ("*",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic" | |
def check_lazy_status(self, boolean, on_true=None, on_false=None): | |
if boolean and on_true is None: | |
return ["on_true"] | |
if not boolean and on_false is None: | |
return ["on_false"] | |
def execute(self, *args, **kwargs): | |
return (kwargs['on_true'] if kwargs['boolean'] else kwargs['on_false'],) | |
#是否为SDXL | |
from comfy.sdxl_clip import SDXLClipModel, SDXLRefinerClipModel, SDXLClipG | |
class isNone: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"any": (AlwaysEqualProxy("*"),) | |
}, | |
"optional": { | |
} | |
} | |
RETURN_TYPES = ("BOOLEAN",) | |
RETURN_NAMES = ("boolean",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic" | |
def execute(self, any): | |
return (True if any is None else False,) | |
class isSDXL: | |
def INPUT_TYPES(s): | |
return { | |
"required": {}, | |
"optional": { | |
"optional_pipe": ("PIPE_LINE",), | |
"optional_clip": ("CLIP",), | |
} | |
} | |
RETURN_TYPES = ("BOOLEAN",) | |
RETURN_NAMES = ("boolean",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/Logic" | |
def execute(self, optional_pipe=None, optional_clip=None): | |
if optional_pipe is None and optional_clip is None: | |
raise Exception(f"[ERROR] optional_pipe or optional_clip is missing") | |
clip = optional_clip if optional_clip is not None else optional_pipe['clip'] | |
if isinstance(clip.cond_stage_model, (SDXLClipModel, SDXLRefinerClipModel, SDXLClipG)): | |
return (True,) | |
else: | |
return (False,) | |
#xy矩阵 | |
class xyAny: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"X": (AlwaysEqualProxy("*"), {}), | |
"Y": (AlwaysEqualProxy("*"), {}), | |
"direction": (["horizontal", "vertical"], {"default": "horizontal"}) | |
} | |
} | |
RETURN_TYPES = (AlwaysEqualProxy("*"), AlwaysEqualProxy("*")) | |
RETURN_NAMES = ("X", "Y") | |
INPUT_IS_LIST = True | |
OUTPUT_IS_LIST = (True, True) | |
CATEGORY = "EasyUse/Logic" | |
FUNCTION = "to_xy" | |
def to_xy(self, X, Y, direction): | |
new_x = list() | |
new_y = list() | |
if direction[0] == "horizontal": | |
for y in Y: | |
for x in X: | |
new_x.append(x) | |
new_y.append(y) | |
else: | |
for x in X: | |
for y in Y: | |
new_x.append(x) | |
new_y.append(y) | |
return (new_x, new_y) | |
class batchAnything: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"any_1": (AlwaysEqualProxy("*"),{}), | |
"any_2": (AlwaysEqualProxy("*"),{}) | |
} | |
} | |
RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
RETURN_NAMES = ("batch",) | |
FUNCTION = "batch" | |
CATEGORY = "EasyUse/Logic" | |
def batch(self, any_1, any_2): | |
if isinstance(any_1, torch.Tensor) or isinstance(any_2, torch.Tensor): | |
if any_1 is None: | |
return (any_2,) | |
elif any_2 is None: | |
return (any_1,) | |
if any_1.shape[1:] != any_2.shape[1:]: | |
any_2 = comfy.utils.common_upscale(any_2.movedim(-1, 1), any_1.shape[2], any_1.shape[1], "bilinear", "center").movedim(1, -1) | |
return (torch.cat((any_1, any_2), 0),) | |
elif isinstance(any_1, (str, float, int)): | |
if any_2 is None: | |
return (any_1,) | |
elif isinstance(any_2, tuple): | |
return (any_2 + (any_1,),) | |
return ((any_1, any_2),) | |
elif isinstance(any_2, (str, float, int)): | |
if any_1 is None: | |
return (any_2,) | |
elif isinstance(any_1, tuple): | |
return (any_1 + (any_2,),) | |
return ((any_2, any_1),) | |
else: | |
if any_1 is None: | |
return (any_2,) | |
elif any_2 is None: | |
return (any_1,) | |
return (any_1 + any_2,) | |
# 转换所有类型 | |
class convertAnything: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"*": (AlwaysEqualProxy("*"),), | |
"output_type": (["string", "int", "float", "boolean"], {"default": "string"}), | |
}} | |
RETURN_TYPES = ByPassTypeTuple((AlwaysEqualProxy("*"),)) | |
OUTPUT_NODE = True | |
FUNCTION = "convert" | |
CATEGORY = "EasyUse/Logic" | |
def convert(self, *args, **kwargs): | |
anything = kwargs['*'] | |
output_type = kwargs['output_type'] | |
params = None | |
if output_type == 'string': | |
params = str(anything) | |
elif output_type == 'int': | |
params = int(anything) | |
elif output_type == 'float': | |
params = float(anything) | |
elif output_type == 'boolean': | |
params = bool(anything) | |
return (params,) | |
# 将所有类型的内容都转成字符串输出 | |
class showAnything: | |
def INPUT_TYPES(s): | |
return {"required": {}, "optional": {"anything": (AlwaysEqualProxy("*"), {}), }, | |
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO", | |
}} | |
RETURN_TYPES = () | |
INPUT_IS_LIST = True | |
OUTPUT_NODE = True | |
FUNCTION = "log_input" | |
CATEGORY = "EasyUse/Logic" | |
def log_input(self, unique_id=None, extra_pnginfo=None, **kwargs): | |
values = [] | |
if "anything" in kwargs: | |
for val in kwargs['anything']: | |
try: | |
if type(val) is str: | |
values.append(val) | |
else: | |
val = json.dumps(val) | |
values.append(str(val)) | |
except Exception: | |
values.append(str(val)) | |
pass | |
if not extra_pnginfo: | |
print("Error: extra_pnginfo is empty") | |
elif (not isinstance(extra_pnginfo[0], dict) or "workflow" not in extra_pnginfo[0]): | |
print("Error: extra_pnginfo[0] is not a dict or missing 'workflow' key") | |
else: | |
workflow = extra_pnginfo[0]["workflow"] | |
node = next((x for x in workflow["nodes"] if str(x["id"]) == unique_id[0]), None) | |
if node: | |
node["widgets_values"] = [values] | |
return {"ui": {"text": values}} | |
class showTensorShape: | |
def INPUT_TYPES(s): | |
return {"required": {"tensor": (AlwaysEqualProxy("*"),)}, "optional": {}, | |
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO" | |
}} | |
RETURN_TYPES = () | |
RETURN_NAMES = () | |
OUTPUT_NODE = True | |
FUNCTION = "log_input" | |
CATEGORY = "EasyUse/Logic" | |
def log_input(self, tensor, unique_id=None, extra_pnginfo=None): | |
shapes = [] | |
def tensorShape(tensor): | |
if isinstance(tensor, dict): | |
for k in tensor: | |
tensorShape(tensor[k]) | |
elif isinstance(tensor, list): | |
for i in range(len(tensor)): | |
tensorShape(tensor[i]) | |
elif hasattr(tensor, 'shape'): | |
shapes.append(list(tensor.shape)) | |
tensorShape(tensor) | |
return {"ui": {"text": shapes}} | |
class outputToList: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"tuple": (AlwaysEqualProxy("*"), {}), | |
}, "optional": {}, | |
} | |
RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
RETURN_NAMES = ("list",) | |
OUTPUT_IS_LIST = (True,) | |
FUNCTION = "output_to_List" | |
CATEGORY = "EasyUse/Logic" | |
def output_to_List(self, tuple): | |
return (tuple,) | |
# cleanGpuUsed | |
class cleanGPUUsed: | |
def INPUT_TYPES(s): | |
return {"required": {"anything": (AlwaysEqualProxy("*"), {})}, "optional": {}, | |
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO", | |
}} | |
RETURN_TYPES = () | |
RETURN_NAMES = () | |
OUTPUT_NODE = True | |
FUNCTION = "empty_cache" | |
CATEGORY = "EasyUse/Logic" | |
def empty_cache(self, anything, unique_id=None, extra_pnginfo=None): | |
cleanGPUUsedForce() | |
remove_cache('*') | |
return () | |
class clearCacheKey: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"anything": (AlwaysEqualProxy("*"), {}), | |
"cache_key": ("STRING", {"default": "*"}), | |
}, "optional": {}, | |
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",} | |
} | |
RETURN_TYPES = () | |
RETURN_NAMES = () | |
OUTPUT_NODE = True | |
FUNCTION = "empty_cache" | |
CATEGORY = "EasyUse/Logic" | |
def empty_cache(self, anything, cache_name, unique_id=None, extra_pnginfo=None): | |
remove_cache(cache_name) | |
return () | |
class clearCacheAll: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"anything": (AlwaysEqualProxy("*"), {}), | |
}, "optional": {}, | |
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",} | |
} | |
RETURN_TYPES = () | |
RETURN_NAMES = () | |
OUTPUT_NODE = True | |
FUNCTION = "empty_cache" | |
CATEGORY = "EasyUse/Logic" | |
def empty_cache(self, anything, unique_id=None, extra_pnginfo=None): | |
remove_cache('*') | |
return () | |
# Deprecated | |
class If: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"any": (AlwaysEqualProxy("*"),), | |
"if": (AlwaysEqualProxy("*"),), | |
"else": (AlwaysEqualProxy("*"),), | |
}, | |
} | |
RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
RETURN_NAMES = ("?",) | |
FUNCTION = "execute" | |
CATEGORY = "EasyUse/🚫 Deprecated" | |
DEPRECATED = True | |
def execute(self, *args, **kwargs): | |
return (kwargs['if'] if kwargs['any'] else kwargs['else'],) | |
class poseEditor: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"image": ("STRING", {"default":""}) | |
}} | |
FUNCTION = "output_pose" | |
CATEGORY = "EasyUse/🚫 Deprecated" | |
DEPRECATED = True | |
RETURN_TYPES = () | |
RETURN_NAMES = () | |
def output_pose(self, image): | |
return () | |
class imageToMask: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"image": ("IMAGE",), | |
"channel": (['red', 'green', 'blue'],), | |
} | |
} | |
RETURN_TYPES = ("MASK",) | |
FUNCTION = "convert" | |
CATEGORY = "EasyUse/🚫 Deprecated" | |
DEPRECATED = True | |
def convert_to_single_channel(self, image, channel='red'): | |
from PIL import Image | |
# Convert to RGB mode to access individual channels | |
image = image.convert('RGB') | |
# Extract the desired channel and convert to greyscale | |
if channel == 'red': | |
channel_img = image.split()[0].convert('L') | |
elif channel == 'green': | |
channel_img = image.split()[1].convert('L') | |
elif channel == 'blue': | |
channel_img = image.split()[2].convert('L') | |
else: | |
raise ValueError( | |
"Invalid channel option. Please choose 'red', 'green', or 'blue'.") | |
# Convert the greyscale channel back to RGB mode | |
channel_img = Image.merge( | |
'RGB', (channel_img, channel_img, channel_img)) | |
return channel_img | |
def convert(self, image, channel='red'): | |
from .libs.image import pil2tensor, tensor2pil | |
image = self.convert_to_single_channel(tensor2pil(image), channel) | |
image = pil2tensor(image) | |
return (image.squeeze().mean(2),) | |
NODE_CLASS_MAPPINGS = { | |
"easy string": String, | |
"easy int": Int, | |
"easy rangeInt": RangeInt, | |
"easy float": Float, | |
"easy rangeFloat": RangeFloat, | |
"easy boolean": Boolean, | |
"easy mathString": mathStringOperation, | |
"easy mathInt": mathIntOperation, | |
"easy mathFloat": mathFloatOperation, | |
"easy compare": Compare, | |
"easy imageSwitch": imageSwitch, | |
"easy textSwitch": textSwitch, | |
"easy anythingIndexSwitch": anythingIndexSwitch, | |
"easy imageIndexSwitch": imageIndexSwitch, | |
"easy textIndexSwitch": textIndexSwitch, | |
"easy conditioningIndexSwitch": conditioningIndexSwitch, | |
"easy whileLoopStart": whileLoopStart, | |
"easy whileLoopEnd": whileLoopEnd, | |
"easy forLoopStart": forLoopStart, | |
"easy forLoopEnd": forLoopEnd, | |
"easy ifElse": IfElse, | |
"easy isNone": isNone, | |
"easy isSDXL": isSDXL, | |
"easy outputToList": outputToList, | |
"easy xyAny": xyAny, | |
"easy batchAnything": batchAnything, | |
"easy convertAnything": convertAnything, | |
"easy showAnything": showAnything, | |
"easy showTensorShape": showTensorShape, | |
"easy clearCacheKey": clearCacheKey, | |
"easy clearCacheAll": clearCacheAll, | |
"easy cleanGpuUsed": cleanGPUUsed, | |
"easy if": If, | |
"easy poseEditor": poseEditor, | |
"easy imageToMask": imageToMask | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
"easy string": "String", | |
"easy int": "Int", | |
"easy rangeInt": "Range(Int)", | |
"easy float": "Float", | |
"easy rangeFloat": "Range(Float)", | |
"easy boolean": "Boolean", | |
"easy compare": "Compare", | |
"easy mathString": "Math String", | |
"easy mathInt": "Math Int", | |
"easy mathFloat": "Math Float", | |
"easy imageSwitch": "Image Switch", | |
"easy textSwitch": "Text Switch", | |
"easy anythingIndexSwitch": "Any Index Switch", | |
"easy imageIndexSwitch": "Image Index Switch", | |
"easy textIndexSwitch": "Text Index Switch", | |
"easy conditioningIndexSwitch": "Conditioning Index Switch", | |
"easy whileLoopStart": "While Loop Start", | |
"easy whileLoopEnd": "While Loop End", | |
"easy forLoopStart": "For Loop Start", | |
"easy forLoopEnd": "For Loop End", | |
"easy ifElse": "If else", | |
"easy isNone": "Is None", | |
"easy isSDXL": "Is SDXL", | |
"easy outputToList": "Output to List", | |
"easy xyAny": "XYAny", | |
"easy batchAnything": "Batch Any", | |
"easy convertAnything": "Convert Any", | |
"easy showAnything": "Show Any", | |
"easy showTensorShape": "Show Tensor Shape", | |
"easy clearCacheKey": "Clear Cache Key", | |
"easy clearCacheAll": "Clear Cache All", | |
"easy cleanGpuUsed": "Clean GPU Used", | |
"easy if": "If (🚫Deprecated)", | |
"easy poseEditor": "PoseEditor (🚫Deprecated)", | |
"easy imageToMask": "ImageToMask (🚫Deprecated)" | |
} |