JasonSmithSO's picture
Upload 578 files
8866644 verified
from typing import Iterator, List, Tuple, Dict, Any, Union, Optional
from _decimal import Context, getcontext
from decimal import Decimal
from .libs.utils import AlwaysEqualProxy, ByPassTypeTuple, cleanGPUUsedForce, compare_revision
from .libs.cache import remove_cache
import numpy as np
import re
import json
import torch
import comfy.utils
DEFAULT_FLOW_NUM = 2
MAX_FLOW_NUM = 10
lazy_options = {"lazy": True} if compare_revision(2543) else {}
def validate_list_args(args: Dict[str, List[Any]]) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Checks that if there are multiple arguments, they are all the same length or 1
:param args:
:return: Tuple (Status, mismatched_key_1, mismatched_key_2)
"""
# Only have 1 arg
if len(args) == 1:
return True, None, None
len_to_match = None
matched_arg_name = None
for arg_name, arg in args.items():
if arg_name == 'self':
# self is in locals()
continue
if len(arg) != 1:
if len_to_match is None:
len_to_match = len(arg)
matched_arg_name = arg_name
elif len(arg) != len_to_match:
return False, arg_name, matched_arg_name
return True, None, None
def error_if_mismatched_list_args(args: Dict[str, List[Any]]) -> None:
is_valid, failed_key1, failed_key2 = validate_list_args(args)
if not is_valid:
assert failed_key1 is not None
assert failed_key2 is not None
raise ValueError(
f"Mismatched list inputs received. {failed_key1}({len(args[failed_key1])}) !== {failed_key2}({len(args[failed_key2])})"
)
def zip_with_fill(*lists: Union[List[Any], None]) -> Iterator[Tuple[Any, ...]]:
"""
Zips lists together, but if a list has 1 element, it will be repeated for each element in the other lists.
If a list is None, None will be used for that element.
(Not intended for use with lists of different lengths)
:param lists:
:return: Iterator of tuples of length len(lists)
"""
max_len = max(len(lst) if lst is not None else 0 for lst in lists)
for i in range(max_len):
yield tuple(None if lst is None else (lst[0] if len(lst) == 1 else lst[i]) for lst in lists)
# ---------------------------------------------------------------类型 开始----------------------------------------------------------------------#
# 字符串
class String:
@classmethod
def INPUT_TYPES(s):
return {
"required": {"value": ("STRING", {"default": ""})},
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("string",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic/Type"
def execute(self, value):
return (value,)
# 整数
class Int:
@classmethod
def INPUT_TYPES(s):
return {
"required": {"value": ("INT", {"default": 0, "min": -999999, "max": 999999,})},
}
RETURN_TYPES = ("INT",)
RETURN_NAMES = ("int",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic/Type"
def execute(self, value):
return (value,)
# 整数范围
class RangeInt:
def __init__(self) -> None:
pass
@classmethod
def INPUT_TYPES(s) -> Dict[str, Dict[str, Any]]:
return {
"required": {
"range_mode": (["step", "num_steps"], {"default": "step"}),
"start": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}),
"stop": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}),
"step": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}),
"num_steps": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}),
"end_mode": (["Inclusive", "Exclusive"], {"default": "Inclusive"}),
},
}
RETURN_TYPES = ("INT", "INT")
RETURN_NAMES = ("range", "range_sizes")
INPUT_IS_LIST = True
OUTPUT_IS_LIST = (True, True)
FUNCTION = "build_range"
CATEGORY = "EasyUse/Logic/Type"
def build_range(
self, range_mode, start, stop, step, num_steps, end_mode
) -> Tuple[List[int], List[int]]:
error_if_mismatched_list_args(locals())
ranges = []
range_sizes = []
for range_mode, e_start, e_stop, e_num_steps, e_step, e_end_mode in zip_with_fill(
range_mode, start, stop, num_steps, step, end_mode
):
if range_mode == 'step':
if e_end_mode == "Inclusive":
e_stop += 1
vals = list(range(e_start, e_stop, e_step))
ranges.extend(vals)
range_sizes.append(len(vals))
elif range_mode == 'num_steps':
direction = 1 if e_stop > e_start else -1
if e_end_mode == "Exclusive":
e_stop -= direction
vals = (np.rint(np.linspace(e_start, e_stop, e_num_steps)).astype(int).tolist())
ranges.extend(vals)
range_sizes.append(len(vals))
return ranges, range_sizes
# 浮点数
class Float:
@classmethod
def INPUT_TYPES(s):
return {
"required": {"value": ("FLOAT", {"default": 0, "step": 0.01, "min": -999999, "max": 999999,})},
}
RETURN_TYPES = ("FLOAT",)
RETURN_NAMES = ("float",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic/Type"
def execute(self, value):
return (value,)
# 浮点数范围
class RangeFloat:
def __init__(self) -> None:
pass
@classmethod
def INPUT_TYPES(s) -> Dict[str, Dict[str, Any]]:
return {
"required": {
"range_mode": (["step", "num_steps"], {"default": "step"}),
"start": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}),
"stop": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}),
"step": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}),
"num_steps": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}),
"end_mode": (["Inclusive", "Exclusive"], {"default": "Inclusive"}),
},
}
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("range", "range_sizes")
INPUT_IS_LIST = True
OUTPUT_IS_LIST = (True, True)
FUNCTION = "build_range"
CATEGORY = "EasyUse/Logic/Type"
@staticmethod
def _decimal_range(
range_mode: String, start: Decimal, stop: Decimal, step: Decimal, num_steps: Int, inclusive: bool
) -> Iterator[float]:
if range_mode == 'step':
ret_val = start
if inclusive:
stop = stop + step
direction = 1 if step > 0 else -1
while (ret_val - stop) * direction < 0:
yield float(ret_val)
ret_val += step
elif range_mode == 'num_steps':
step = (stop - start) / (num_steps - 1)
direction = 1 if step > 0 else -1
ret_val = start
for _ in range(num_steps):
if (ret_val - stop) * direction > 0: # Ensure we don't exceed the 'stop' value
break
yield float(ret_val)
ret_val += step
def build_range(
self,
range_mode,
start,
stop,
step,
num_steps,
end_mode,
) -> Tuple[List[float], List[int]]:
error_if_mismatched_list_args(locals())
getcontext().prec = 12
start = [Decimal(s) for s in start]
stop = [Decimal(s) for s in stop]
step = [Decimal(s) for s in step]
ranges = []
range_sizes = []
for range_mode, e_start, e_stop, e_step, e_num_steps, e_end_mode in zip_with_fill(
range_mode, start, stop, step, num_steps, end_mode
):
vals = list(
self._decimal_range(range_mode, e_start, e_stop, e_step, e_num_steps, e_end_mode == 'Inclusive')
)
ranges.extend(vals)
range_sizes.append(len(vals))
return ranges, range_sizes
# 布尔
class Boolean:
@classmethod
def INPUT_TYPES(s):
return {
"required": {"value": ("BOOLEAN", {"default": False})},
}
RETURN_TYPES = ("BOOLEAN",)
RETURN_NAMES = ("boolean",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic/Type"
def execute(self, value):
return (value,)
# ---------------------------------------------------------------开关 开始----------------------------------------------------------------------#
class imageSwitch:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image_a": ("IMAGE",),
"image_b": ("IMAGE",),
"boolean": ("BOOLEAN", {"default": False}),
}
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "image_switch"
CATEGORY = "EasyUse/Logic/Switch"
def image_switch(self, image_a, image_b, boolean):
if boolean:
return (image_a, )
else:
return (image_b, )
class textSwitch:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input": ("INT", {"default": 1, "min": 1, "max": 2}),
},
"optional": {
"text1": ("STRING", {"forceInput": True}),
"text2": ("STRING", {"forceInput": True}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("STRING",)
CATEGORY = "EasyUse/Logic/Switch"
FUNCTION = "switch"
def switch(self, input, text1=None, text2=None,):
if input == 1:
return (text1,)
else:
return (text2,)
# ---------------------------------------------------------------Index Switch----------------------------------------------------------------------#
class anythingIndexSwitch:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}),
},
"optional": {
}
}
for i in range(DEFAULT_FLOW_NUM):
inputs["optional"]["value%d" % i] = (AlwaysEqualProxy("*"),lazy_options)
return inputs
RETURN_TYPES = (AlwaysEqualProxy("*"),)
RETURN_NAMES = ("value",)
FUNCTION = "index_switch"
CATEGORY = "EasyUse/Logic/Index Switch"
def check_lazy_status(self, index, **kwargs):
key = "value%d" % index
if kwargs.get(key, None) is None:
return [key]
def index_switch(self, index, **kwargs):
key = "value%d" % index
return (kwargs[key],)
class imageIndexSwitch:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}),
},
"optional": {
}
}
for i in range(DEFAULT_FLOW_NUM):
inputs["optional"]["image%d" % i] = ("IMAGE",lazy_options)
return inputs
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES = ("image",)
FUNCTION = "index_switch"
CATEGORY = "EasyUse/Logic/Index Switch"
def check_lazy_status(self, index, **kwargs):
key = "image%d" % index
if kwargs.get(key, None) is None:
return [key]
def index_switch(self, index, **kwargs):
key = "image%d" % index
return (kwargs[key],)
class textIndexSwitch:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}),
},
"optional": {
}
}
for i in range(DEFAULT_FLOW_NUM):
inputs["optional"]["text%d" % i] = ("STRING",{**lazy_options,"forceInput":True})
return inputs
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "index_switch"
CATEGORY = "EasyUse/Logic/Index Switch"
def check_lazy_status(self, index, **kwargs):
key = "text%d" % index
if kwargs.get(key, None) is None:
return [key]
def index_switch(self, index, **kwargs):
key = "text%d" % index
return (kwargs[key],)
class conditioningIndexSwitch:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}),
},
"optional": {
}
}
for i in range(DEFAULT_FLOW_NUM):
inputs["optional"]["cond%d" % i] = ("CONDITIONING",lazy_options)
return inputs
RETURN_TYPES = ("CONDITIONING",)
RETURN_NAMES = ("conditioning",)
FUNCTION = "index_switch"
CATEGORY = "EasyUse/Logic/Index Switch"
def check_lazy_status(self, index, **kwargs):
key = "cond%d" % index
if kwargs.get(key, None) is None:
return [key]
def index_switch(self, index, **kwargs):
key = "cond%d" % index
return (kwargs[key],)
# ---------------------------------------------------------------Math----------------------------------------------------------------------#
class mathIntOperation:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
"b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
"operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],),
},
}
RETURN_TYPES = ("INT",)
FUNCTION = "int_math_operation"
CATEGORY = "EasyUse/Logic/Math"
def int_math_operation(self, a, b, operation):
if operation == "add":
return (a + b,)
elif operation == "subtract":
return (a - b,)
elif operation == "multiply":
return (a * b,)
elif operation == "divide":
return (a // b,)
elif operation == "modulo":
return (a % b,)
elif operation == "power":
return (a ** b,)
class mathFloatOperation:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}),
"b": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}),
"operation": (["==", "!=", "<", ">", "<=", ">="],),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "float_math_operation"
CATEGORY = "EasyUse/Logic/Math"
def float_math_operation(self, a, b, operation):
if operation == "==":
return (a == b,)
elif operation == "!=":
return (a != b,)
elif operation == "<":
return (a < b,)
elif operation == ">":
return (a > b,)
elif operation == "<=":
return (a <= b,)
elif operation == ">=":
return (a >= b,)
class mathStringOperation:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("STRING", {"multiline": False}),
"b": ("STRING", {"multiline": False}),
"operation": (["a == b", "a != b", "a IN b", "a MATCH REGEX(b)", "a BEGINSWITH b", "a ENDSWITH b"],),
"case_sensitive": ("BOOLEAN", {"default": True}),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "string_math_operation"
CATEGORY = "EasyUse/Logic/Math"
def string_math_operation(self, a, b, operation, case_sensitive):
if not case_sensitive:
a = a.lower()
b = b.lower()
if operation == "a == b":
return (a == b,)
elif operation == "a != b":
return (a != b,)
elif operation == "a IN b":
return (a in b,)
elif operation == "a MATCH REGEX(b)":
try:
return (re.match(b, a) is not None,)
except:
return (False,)
elif operation == "a BEGINSWITH b":
return (a.startswith(b),)
elif operation == "a ENDSWITH b":
return (a.endswith(b),)
# ---------------------------------------------------------------Flow----------------------------------------------------------------------#
try:
from comfy_execution.graph_utils import GraphBuilder, is_link
except:
GraphBuilder = None
class whileLoopStart:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"condition": ("BOOLEAN", {"default": True}),
},
"optional": {
},
}
for i in range(MAX_FLOW_NUM):
inputs["optional"]["initial_value%d" % i] = ("*",)
return inputs
RETURN_TYPES = ByPassTypeTuple(tuple(["FLOW_CONTROL"] + ["*"] * MAX_FLOW_NUM))
RETURN_NAMES = ByPassTypeTuple(tuple(["flow"] + ["value%d" % i for i in range(MAX_FLOW_NUM)]))
FUNCTION = "while_loop_open"
CATEGORY = "EasyUse/Logic/While Loop"
def while_loop_open(self, condition, **kwargs):
values = []
for i in range(MAX_FLOW_NUM):
values.append(kwargs.get("initial_value%d" % i, None))
return tuple(["stub"] + values)
class whileLoopEnd:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"flow": ("FLOW_CONTROL", {"rawLink": True}),
"condition": ("BOOLEAN", {}),
},
"optional": {
},
"hidden": {
"dynprompt": "DYNPROMPT",
"unique_id": "UNIQUE_ID",
}
}
for i in range(MAX_FLOW_NUM):
inputs["optional"]["initial_value%d" % i] = (AlwaysEqualProxy('*'),)
return inputs
RETURN_TYPES = ByPassTypeTuple(tuple([AlwaysEqualProxy('*')] * MAX_FLOW_NUM))
RETURN_NAMES = ByPassTypeTuple(tuple(["value%d" % i for i in range(MAX_FLOW_NUM)]))
FUNCTION = "while_loop_close"
CATEGORY = "EasyUse/Logic/While Loop"
def explore_dependencies(self, node_id, dynprompt, upstream):
node_info = dynprompt.get_node(node_id)
if "inputs" not in node_info:
return
for k, v in node_info["inputs"].items():
if is_link(v):
parent_id = v[0]
if parent_id not in upstream:
upstream[parent_id] = []
self.explore_dependencies(parent_id, dynprompt, upstream)
upstream[parent_id].append(node_id)
def collect_contained(self, node_id, upstream, contained):
if node_id not in upstream:
return
for child_id in upstream[node_id]:
if child_id not in contained:
contained[child_id] = True
self.collect_contained(child_id, upstream, contained)
def while_loop_close(self, flow, condition, dynprompt=None, unique_id=None, **kwargs):
if not condition:
# We're done with the loop
values = []
for i in range(MAX_FLOW_NUM):
values.append(kwargs.get("initial_value%d" % i, None))
return tuple(values)
# We want to loop
this_node = dynprompt.get_node(unique_id)
upstream = {}
# Get the list of all nodes between the open and close nodes
self.explore_dependencies(unique_id, dynprompt, upstream)
contained = {}
open_node = flow[0]
self.collect_contained(open_node, upstream, contained)
contained[unique_id] = True
contained[open_node] = True
graph = GraphBuilder()
for node_id in contained:
original_node = dynprompt.get_node(node_id)
node = graph.node(original_node["class_type"], "Recurse" if node_id == unique_id else node_id)
node.set_override_display_id(node_id)
for node_id in contained:
original_node = dynprompt.get_node(node_id)
node = graph.lookup_node("Recurse" if node_id == unique_id else node_id)
for k, v in original_node["inputs"].items():
if is_link(v) and v[0] in contained:
parent = graph.lookup_node(v[0])
node.set_input(k, parent.out(v[1]))
else:
node.set_input(k, v)
new_open = graph.lookup_node(open_node)
for i in range(MAX_FLOW_NUM):
key = "initial_value%d" % i
new_open.set_input(key, kwargs.get(key, None))
my_clone = graph.lookup_node("Recurse")
result = map(lambda x: my_clone.out(x), range(MAX_FLOW_NUM))
return {
"result": tuple(result),
"expand": graph.finalize(),
}
class forLoopStart:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"total": ("INT", {"default": 1, "min": 1, "max": 100000, "step": 1}),
},
"optional": {
"initial_value%d" % i: (AlwaysEqualProxy("*"),) for i in range(1, MAX_FLOW_NUM)
},
"hidden": {
"initial_value0": (AlwaysEqualProxy("*"),),
"prompt": "PROMPT",
"extra_pnginfo": "EXTRA_PNGINFO",
"unique_id": "UNIQUE_ID"
}
}
RETURN_TYPES = ByPassTypeTuple(tuple(["FLOW_CONTROL", "INT"] + [AlwaysEqualProxy("*")] * (MAX_FLOW_NUM - 1)))
RETURN_NAMES = ByPassTypeTuple(tuple(["flow", "index"] + ["value%d" % i for i in range(1, MAX_FLOW_NUM)]))
FUNCTION = "for_loop_start"
CATEGORY = "EasyUse/Logic/For Loop"
def for_loop_start(self, total, prompt=None, extra_pnginfo=None, unique_id=None, **kwargs):
graph = GraphBuilder()
i = 0
unique_id = unique_id.split('.')[len(unique_id.split('.'))-1] if "." in unique_id else unique_id
node = next((x for x in extra_pnginfo['workflow']['nodes'] if x['id'] == int(unique_id)), None)
if node:
node['properties']['total'] = total
if "initial_value0" in kwargs:
i = kwargs["initial_value0"]
initial_values = {("initial_value%d" % num): kwargs.get("initial_value%d" % num, None) for num in range(1, MAX_FLOW_NUM)}
while_open = graph.node("easy whileLoopStart", condition=total, initial_value0=i, **initial_values)
outputs = [kwargs.get("initial_value%d" % num, None) for num in range(1, MAX_FLOW_NUM)]
return {
"result": tuple(["stub", i] + outputs),
"expand": graph.finalize(),
}
class forLoopEnd:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"flow": ("FLOW_CONTROL", {"rawLink": True}),
},
"optional": {
"initial_value%d" % i: (AlwaysEqualProxy("*"), {"rawLink": True}) for i in range(1, MAX_FLOW_NUM)
},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ByPassTypeTuple(tuple([AlwaysEqualProxy("*")] * (MAX_FLOW_NUM - 1)))
RETURN_NAMES = ByPassTypeTuple(tuple(["value%d" % i for i in range(1, MAX_FLOW_NUM)]))
FUNCTION = "for_loop_end"
CATEGORY = "EasyUse/Logic/For Loop"
def for_loop_end(self, flow, prompt=None, extra_pnginfo=None, my_unique_id=None, **kwargs):
graph = GraphBuilder()
while_open = flow[0]
total = None
if extra_pnginfo:
node = next((x for x in extra_pnginfo['workflow']['nodes'] if x['id'] == int(while_open)), None)
if node:
if 'properties' in node and 'total' in node['properties']:
total = node['properties']['total']
else:
total = node['widgets_values'][0] if "widgets_values" in node else None
if total is None:
raise Exception("Unable to get parameters for the start of the loop")
sub = graph.node("easy mathInt", operation="add", a=[while_open, 1], b=1)
cond = graph.node("easy compare", a=sub.out(0), b=total, comparison='a < b')
input_values = {("initial_value%d" % i): kwargs.get("initial_value%d" % i, None) for i in
range(1, MAX_FLOW_NUM)}
while_close = graph.node("easy whileLoopEnd",
flow=flow,
condition=cond.out(0),
initial_value0=sub.out(0),
**input_values)
return {
"result": tuple([while_close.out(i) for i in range(1, MAX_FLOW_NUM)]),
"expand": graph.finalize(),
}
COMPARE_FUNCTIONS = {
"a == b": lambda a, b: a == b,
"a != b": lambda a, b: a != b,
"a < b": lambda a, b: a < b,
"a > b": lambda a, b: a > b,
"a <= b": lambda a, b: a <= b,
"a >= b": lambda a, b: a >= b,
}
# 比较
class Compare:
@classmethod
def INPUT_TYPES(s):
compare_functions = list(COMPARE_FUNCTIONS.keys())
return {
"required": {
"a": (AlwaysEqualProxy("*"), {"default": 0}),
"b": (AlwaysEqualProxy("*"), {"default": 0}),
"comparison": (compare_functions, {"default": "a == b"}),
},
}
RETURN_TYPES = ("BOOLEAN",)
RETURN_NAMES = ("boolean",)
FUNCTION = "compare"
CATEGORY = "EasyUse/Logic"
def compare(self, a, b, comparison):
return (COMPARE_FUNCTIONS[comparison](a, b),)
# 判断
class IfElse:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"boolean": ("BOOLEAN",),
"on_true": (AlwaysEqualProxy("*"), lazy_options),
"on_false": (AlwaysEqualProxy("*"), lazy_options),
},
}
RETURN_TYPES = (AlwaysEqualProxy("*"),)
RETURN_NAMES = ("*",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic"
def check_lazy_status(self, boolean, on_true=None, on_false=None):
if boolean and on_true is None:
return ["on_true"]
if not boolean and on_false is None:
return ["on_false"]
def execute(self, *args, **kwargs):
return (kwargs['on_true'] if kwargs['boolean'] else kwargs['on_false'],)
#是否为SDXL
from comfy.sdxl_clip import SDXLClipModel, SDXLRefinerClipModel, SDXLClipG
class isNone:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"any": (AlwaysEqualProxy("*"),)
},
"optional": {
}
}
RETURN_TYPES = ("BOOLEAN",)
RETURN_NAMES = ("boolean",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic"
def execute(self, any):
return (True if any is None else False,)
class isSDXL:
@classmethod
def INPUT_TYPES(s):
return {
"required": {},
"optional": {
"optional_pipe": ("PIPE_LINE",),
"optional_clip": ("CLIP",),
}
}
RETURN_TYPES = ("BOOLEAN",)
RETURN_NAMES = ("boolean",)
FUNCTION = "execute"
CATEGORY = "EasyUse/Logic"
def execute(self, optional_pipe=None, optional_clip=None):
if optional_pipe is None and optional_clip is None:
raise Exception(f"[ERROR] optional_pipe or optional_clip is missing")
clip = optional_clip if optional_clip is not None else optional_pipe['clip']
if isinstance(clip.cond_stage_model, (SDXLClipModel, SDXLRefinerClipModel, SDXLClipG)):
return (True,)
else:
return (False,)
#xy矩阵
class xyAny:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"X": (AlwaysEqualProxy("*"), {}),
"Y": (AlwaysEqualProxy("*"), {}),
"direction": (["horizontal", "vertical"], {"default": "horizontal"})
}
}
RETURN_TYPES = (AlwaysEqualProxy("*"), AlwaysEqualProxy("*"))
RETURN_NAMES = ("X", "Y")
INPUT_IS_LIST = True
OUTPUT_IS_LIST = (True, True)
CATEGORY = "EasyUse/Logic"
FUNCTION = "to_xy"
def to_xy(self, X, Y, direction):
new_x = list()
new_y = list()
if direction[0] == "horizontal":
for y in Y:
for x in X:
new_x.append(x)
new_y.append(y)
else:
for x in X:
for y in Y:
new_x.append(x)
new_y.append(y)
return (new_x, new_y)
class batchAnything:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"any_1": (AlwaysEqualProxy("*"),{}),
"any_2": (AlwaysEqualProxy("*"),{})
}
}
RETURN_TYPES = (AlwaysEqualProxy("*"),)
RETURN_NAMES = ("batch",)
FUNCTION = "batch"
CATEGORY = "EasyUse/Logic"
def batch(self, any_1, any_2):
if isinstance(any_1, torch.Tensor) or isinstance(any_2, torch.Tensor):
if any_1 is None:
return (any_2,)
elif any_2 is None:
return (any_1,)
if any_1.shape[1:] != any_2.shape[1:]:
any_2 = comfy.utils.common_upscale(any_2.movedim(-1, 1), any_1.shape[2], any_1.shape[1], "bilinear", "center").movedim(1, -1)
return (torch.cat((any_1, any_2), 0),)
elif isinstance(any_1, (str, float, int)):
if any_2 is None:
return (any_1,)
elif isinstance(any_2, tuple):
return (any_2 + (any_1,),)
return ((any_1, any_2),)
elif isinstance(any_2, (str, float, int)):
if any_1 is None:
return (any_2,)
elif isinstance(any_1, tuple):
return (any_1 + (any_2,),)
return ((any_2, any_1),)
else:
if any_1 is None:
return (any_2,)
elif any_2 is None:
return (any_1,)
return (any_1 + any_2,)
# 转换所有类型
class convertAnything:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"*": (AlwaysEqualProxy("*"),),
"output_type": (["string", "int", "float", "boolean"], {"default": "string"}),
}}
RETURN_TYPES = ByPassTypeTuple((AlwaysEqualProxy("*"),))
OUTPUT_NODE = True
FUNCTION = "convert"
CATEGORY = "EasyUse/Logic"
def convert(self, *args, **kwargs):
anything = kwargs['*']
output_type = kwargs['output_type']
params = None
if output_type == 'string':
params = str(anything)
elif output_type == 'int':
params = int(anything)
elif output_type == 'float':
params = float(anything)
elif output_type == 'boolean':
params = bool(anything)
return (params,)
# 将所有类型的内容都转成字符串输出
class showAnything:
@classmethod
def INPUT_TYPES(s):
return {"required": {}, "optional": {"anything": (AlwaysEqualProxy("*"), {}), },
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",
}}
RETURN_TYPES = ()
INPUT_IS_LIST = True
OUTPUT_NODE = True
FUNCTION = "log_input"
CATEGORY = "EasyUse/Logic"
def log_input(self, unique_id=None, extra_pnginfo=None, **kwargs):
values = []
if "anything" in kwargs:
for val in kwargs['anything']:
try:
if type(val) is str:
values.append(val)
else:
val = json.dumps(val)
values.append(str(val))
except Exception:
values.append(str(val))
pass
if not extra_pnginfo:
print("Error: extra_pnginfo is empty")
elif (not isinstance(extra_pnginfo[0], dict) or "workflow" not in extra_pnginfo[0]):
print("Error: extra_pnginfo[0] is not a dict or missing 'workflow' key")
else:
workflow = extra_pnginfo[0]["workflow"]
node = next((x for x in workflow["nodes"] if str(x["id"]) == unique_id[0]), None)
if node:
node["widgets_values"] = [values]
return {"ui": {"text": values}}
class showTensorShape:
@classmethod
def INPUT_TYPES(s):
return {"required": {"tensor": (AlwaysEqualProxy("*"),)}, "optional": {},
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO"
}}
RETURN_TYPES = ()
RETURN_NAMES = ()
OUTPUT_NODE = True
FUNCTION = "log_input"
CATEGORY = "EasyUse/Logic"
def log_input(self, tensor, unique_id=None, extra_pnginfo=None):
shapes = []
def tensorShape(tensor):
if isinstance(tensor, dict):
for k in tensor:
tensorShape(tensor[k])
elif isinstance(tensor, list):
for i in range(len(tensor)):
tensorShape(tensor[i])
elif hasattr(tensor, 'shape'):
shapes.append(list(tensor.shape))
tensorShape(tensor)
return {"ui": {"text": shapes}}
class outputToList:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"tuple": (AlwaysEqualProxy("*"), {}),
}, "optional": {},
}
RETURN_TYPES = (AlwaysEqualProxy("*"),)
RETURN_NAMES = ("list",)
OUTPUT_IS_LIST = (True,)
FUNCTION = "output_to_List"
CATEGORY = "EasyUse/Logic"
def output_to_List(self, tuple):
return (tuple,)
# cleanGpuUsed
class cleanGPUUsed:
@classmethod
def INPUT_TYPES(s):
return {"required": {"anything": (AlwaysEqualProxy("*"), {})}, "optional": {},
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",
}}
RETURN_TYPES = ()
RETURN_NAMES = ()
OUTPUT_NODE = True
FUNCTION = "empty_cache"
CATEGORY = "EasyUse/Logic"
def empty_cache(self, anything, unique_id=None, extra_pnginfo=None):
cleanGPUUsedForce()
remove_cache('*')
return ()
class clearCacheKey:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"anything": (AlwaysEqualProxy("*"), {}),
"cache_key": ("STRING", {"default": "*"}),
}, "optional": {},
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",}
}
RETURN_TYPES = ()
RETURN_NAMES = ()
OUTPUT_NODE = True
FUNCTION = "empty_cache"
CATEGORY = "EasyUse/Logic"
def empty_cache(self, anything, cache_name, unique_id=None, extra_pnginfo=None):
remove_cache(cache_name)
return ()
class clearCacheAll:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"anything": (AlwaysEqualProxy("*"), {}),
}, "optional": {},
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",}
}
RETURN_TYPES = ()
RETURN_NAMES = ()
OUTPUT_NODE = True
FUNCTION = "empty_cache"
CATEGORY = "EasyUse/Logic"
def empty_cache(self, anything, unique_id=None, extra_pnginfo=None):
remove_cache('*')
return ()
# Deprecated
class If:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"any": (AlwaysEqualProxy("*"),),
"if": (AlwaysEqualProxy("*"),),
"else": (AlwaysEqualProxy("*"),),
},
}
RETURN_TYPES = (AlwaysEqualProxy("*"),)
RETURN_NAMES = ("?",)
FUNCTION = "execute"
CATEGORY = "EasyUse/🚫 Deprecated"
DEPRECATED = True
def execute(self, *args, **kwargs):
return (kwargs['if'] if kwargs['any'] else kwargs['else'],)
class poseEditor:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"image": ("STRING", {"default":""})
}}
FUNCTION = "output_pose"
CATEGORY = "EasyUse/🚫 Deprecated"
DEPRECATED = True
RETURN_TYPES = ()
RETURN_NAMES = ()
def output_pose(self, image):
return ()
class imageToMask:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"image": ("IMAGE",),
"channel": (['red', 'green', 'blue'],),
}
}
RETURN_TYPES = ("MASK",)
FUNCTION = "convert"
CATEGORY = "EasyUse/🚫 Deprecated"
DEPRECATED = True
def convert_to_single_channel(self, image, channel='red'):
from PIL import Image
# Convert to RGB mode to access individual channels
image = image.convert('RGB')
# Extract the desired channel and convert to greyscale
if channel == 'red':
channel_img = image.split()[0].convert('L')
elif channel == 'green':
channel_img = image.split()[1].convert('L')
elif channel == 'blue':
channel_img = image.split()[2].convert('L')
else:
raise ValueError(
"Invalid channel option. Please choose 'red', 'green', or 'blue'.")
# Convert the greyscale channel back to RGB mode
channel_img = Image.merge(
'RGB', (channel_img, channel_img, channel_img))
return channel_img
def convert(self, image, channel='red'):
from .libs.image import pil2tensor, tensor2pil
image = self.convert_to_single_channel(tensor2pil(image), channel)
image = pil2tensor(image)
return (image.squeeze().mean(2),)
NODE_CLASS_MAPPINGS = {
"easy string": String,
"easy int": Int,
"easy rangeInt": RangeInt,
"easy float": Float,
"easy rangeFloat": RangeFloat,
"easy boolean": Boolean,
"easy mathString": mathStringOperation,
"easy mathInt": mathIntOperation,
"easy mathFloat": mathFloatOperation,
"easy compare": Compare,
"easy imageSwitch": imageSwitch,
"easy textSwitch": textSwitch,
"easy anythingIndexSwitch": anythingIndexSwitch,
"easy imageIndexSwitch": imageIndexSwitch,
"easy textIndexSwitch": textIndexSwitch,
"easy conditioningIndexSwitch": conditioningIndexSwitch,
"easy whileLoopStart": whileLoopStart,
"easy whileLoopEnd": whileLoopEnd,
"easy forLoopStart": forLoopStart,
"easy forLoopEnd": forLoopEnd,
"easy ifElse": IfElse,
"easy isNone": isNone,
"easy isSDXL": isSDXL,
"easy outputToList": outputToList,
"easy xyAny": xyAny,
"easy batchAnything": batchAnything,
"easy convertAnything": convertAnything,
"easy showAnything": showAnything,
"easy showTensorShape": showTensorShape,
"easy clearCacheKey": clearCacheKey,
"easy clearCacheAll": clearCacheAll,
"easy cleanGpuUsed": cleanGPUUsed,
"easy if": If,
"easy poseEditor": poseEditor,
"easy imageToMask": imageToMask
}
NODE_DISPLAY_NAME_MAPPINGS = {
"easy string": "String",
"easy int": "Int",
"easy rangeInt": "Range(Int)",
"easy float": "Float",
"easy rangeFloat": "Range(Float)",
"easy boolean": "Boolean",
"easy compare": "Compare",
"easy mathString": "Math String",
"easy mathInt": "Math Int",
"easy mathFloat": "Math Float",
"easy imageSwitch": "Image Switch",
"easy textSwitch": "Text Switch",
"easy anythingIndexSwitch": "Any Index Switch",
"easy imageIndexSwitch": "Image Index Switch",
"easy textIndexSwitch": "Text Index Switch",
"easy conditioningIndexSwitch": "Conditioning Index Switch",
"easy whileLoopStart": "While Loop Start",
"easy whileLoopEnd": "While Loop End",
"easy forLoopStart": "For Loop Start",
"easy forLoopEnd": "For Loop End",
"easy ifElse": "If else",
"easy isNone": "Is None",
"easy isSDXL": "Is SDXL",
"easy outputToList": "Output to List",
"easy xyAny": "XYAny",
"easy batchAnything": "Batch Any",
"easy convertAnything": "Convert Any",
"easy showAnything": "Show Any",
"easy showTensorShape": "Show Tensor Shape",
"easy clearCacheKey": "Clear Cache Key",
"easy clearCacheAll": "Clear Cache All",
"easy cleanGpuUsed": "Clean GPU Used",
"easy if": "If (🚫Deprecated)",
"easy poseEditor": "PoseEditor (🚫Deprecated)",
"easy imageToMask": "ImageToMask (🚫Deprecated)"
}