ComfyUI/comfy_extras/nodes_v3_test.py
2025-07-24 18:23:29 -07:00

286 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import torch
import time
from comfy_api.latest import io, ui, resources, _io
import logging # noqa
import folder_paths
import comfy.utils
import comfy.sd
import asyncio
@io.comfytype(io_type="XYZ")
class XYZ(io.ComfyTypeIO):
Type = tuple[int,str]
class V3TestNode(io.ComfyNode):
# NOTE: this is here just to test that state is not leaking
def __init__(self):
super().__init__()
self.hahajkunless = ";)"
@classmethod
def define_schema(cls):
return io.Schema(
node_id="V3_01_TestNode1",
display_name="V3 Test Node",
category="v3 nodes",
description="This is a funky V3 node test.",
inputs=[
io.Image.Input("image", display_name="new_image"),
XYZ.Input("xyz", optional=True),
io.Custom("JKL").Input("jkl", optional=True),
io.Mask.Input("mask", display_name="mask haha", optional=True),
io.Int.Input("some_int", display_name="new_name", min=0, max=127, default=42,
tooltip="My tooltip 😎", display_mode=io.NumberDisplay.slider),
io.Combo.Input("combo", options=["a", "b", "c"], tooltip="This is a combo input"),
io.MultiCombo.Input("combo2", options=["a","b","c"]),
io.MultiType.Input(io.Int.Input("int_multitype", display_name="haha"), types=[io.Float]),
io.MultiType.Input("multitype", types=[io.Mask, io.Float, io.Int], optional=True),
# ComboInput("combo", image_upload=True, image_folder=FolderType.output,
# remote=RemoteOptions(
# route="/internal/files/output",
# refresh_button=True,
# ),
# tooltip="This is a combo input"),
# IntegerInput("some_int", display_name="new_name", min=0, tooltip="My tooltip 😎", display=NumberDisplay.slider, ),
# ComboDynamicInput("mask", behavior=InputBehavior.optional),
# IntegerInput("some_int", display_name="new_name", min=0, tooltip="My tooltip 😎", display=NumberDisplay.slider,
# dependent_inputs=[ComboDynamicInput("mask", behavior=InputBehavior.optional)],
# dependent_values=[lambda my_value: IO.STRING if my_value < 5 else IO.NUMBER],
# ),
# ["option1", "option2". "option3"]
# ComboDynamicInput["sdfgjhl", [ComboDynamicOptions("option1", [IntegerInput("some_int", display_name="new_name", min=0, tooltip="My tooltip 😎", display=NumberDisplay.slider, ImageInput(), MaskInput(), String()]),
# CombyDynamicOptons("option2", [])
# ]]
],
outputs=[
io.Int.Output(),
io.Image.Output(display_name="img🖼", tooltip="This is an image"),
],
hidden=[
io.Hidden.prompt,
io.Hidden.auth_token_comfy_org,
io.Hidden.unique_id,
],
is_output_node=True,
)
@classmethod
def validate_inputs(cls, image: io.Image.Type, some_int: int, combo: io.Combo.Type, combo2: io.MultiCombo.Type, xyz: XYZ.Type=None, mask: io.Mask.Type=None, **kwargs):
if some_int < 0:
raise Exception("some_int must be greater than 0")
if combo == "c":
raise Exception("combo must be a or b")
return True
@classmethod
def execute(cls, image: io.Image.Type, some_int: int, combo: io.Combo.Type, combo2: io.MultiCombo.Type, xyz: XYZ.Type=None, mask: io.Mask.Type=None, **kwargs):
if hasattr(cls, "hahajkunless"):
raise Exception("The 'cls' variable leaked instance state between runs!")
if hasattr(cls, "doohickey"):
raise Exception("The 'cls' variable leaked state on class properties between runs!")
try:
cls.doohickey = "LOLJK"
except AttributeError:
pass
return io.NodeOutput(some_int, image, ui=ui.PreviewImage(image, cls=cls))
class V3LoraLoader(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="V3_LoraLoader",
display_name="V3 LoRA Loader",
category="v3 nodes",
description="LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together.",
inputs=[
io.Model.Input("model", tooltip="The diffusion model the LoRA will be applied to."),
io.Clip.Input("clip", tooltip="The CLIP model the LoRA will be applied to."),
io.Combo.Input(
"lora_name",
options=folder_paths.get_filename_list("loras"),
tooltip="The name of the LoRA."
),
io.Float.Input(
"strength_model",
default=1.0,
min=-100.0,
max=100.0,
step=0.01,
tooltip="How strongly to modify the diffusion model. This value can be negative."
),
io.Float.Input(
"strength_clip",
default=1.0,
min=-100.0,
max=100.0,
step=0.01,
tooltip="How strongly to modify the CLIP model. This value can be negative."
),
],
outputs=[
io.Model.Output(),
io.Clip.Output(),
],
)
@classmethod
def execute(cls, model: io.Model.Type, clip: io.Clip.Type, lora_name: str, strength_model: float, strength_clip: float, **kwargs):
if strength_model == 0 and strength_clip == 0:
return io.NodeOutput(model, clip)
lora = cls.resources.get(resources.TorchDictFolderFilename("loras", lora_name))
model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip)
return io.NodeOutput(model_lora, clip_lora)
class NInputsTest(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="V3_NInputsTest",
display_name="V3 N Inputs Test",
inputs=[
_io.AutogrowDynamic.Input("nmock", template_input=io.Image.Input("image"), min=1, max=3),
_io.AutogrowDynamic.Input("nmock2", template_input=io.Int.Input("int"), optional=True, min=1, max=4),
],
outputs=[
io.Image.Output(),
],
)
@classmethod
def validate_inputs(cls, nmock, nmock2):
return True
@classmethod
def fingerprint_inputs(cls, nmock, nmock2):
return time.time()
@classmethod
def check_lazy_status(cls, **kwargs) -> list[str]:
need = [name for name in kwargs if kwargs[name] is None]
return need
@classmethod
def execute(cls, nmock, nmock2):
first_image = nmock[0]
all_images = []
for img in nmock:
if img.shape != first_image.shape:
img = img.movedim(-1,1)
img = comfy.utils.common_upscale(img, first_image.shape[2], first_image.shape[1], "lanczos", "center")
img = img.movedim(1,-1)
all_images.append(img)
combined_image = torch.cat(all_images, dim=0)
return io.NodeOutput(combined_image)
class V3TestSleep(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="V3_TestSleep",
display_name="V3 Test Sleep",
category="_for_testing",
description="Test async sleep functionality.",
inputs=[
io.AnyType.Input("value", display_name="Value"),
io.Float.Input("seconds", display_name="Seconds", default=1.0, min=0.0, max=9999.0, step=0.01, tooltip="The amount of seconds to sleep."),
],
outputs=[
io.AnyType.Output(),
],
hidden=[
io.Hidden.unique_id,
],
is_experimental=True,
)
@classmethod
async def execute(cls, value: io.AnyType.Type, seconds: io.Float.Type, **kwargs):
logging.info(f"V3TestSleep: {cls.hidden.unique_id}")
pbar = comfy.utils.ProgressBar(seconds, node_id=cls.hidden.unique_id)
start = time.time()
expiration = start + seconds
now = start
while now < expiration:
now = time.time()
pbar.update_absolute(now - start)
await asyncio.sleep(0.02)
return io.NodeOutput(value)
class V3DummyStart(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="V3_DummyStart",
display_name="V3 Dummy Start",
category="v3 nodes",
description="This is a dummy start node.",
inputs=[],
outputs=[
io.Custom("XYZ").Output(),
],
)
@classmethod
def execute(cls):
return io.NodeOutput(None)
class V3DummyEnd(io.ComfyNode):
COOL_VALUE = 123
@classmethod
def define_schema(cls):
return io.Schema(
node_id="V3_DummyEnd",
display_name="V3 Dummy End",
category="v3 nodes",
description="This is a dummy end node.",
inputs=[
io.Custom("XYZ").Input("xyz"),
],
outputs=[],
is_output_node=True,
)
@classmethod
def custom_action(cls):
return 456
@classmethod
def execute(cls, xyz: io.Custom("XYZ").Type):
logging.info(f"V3DummyEnd: {cls.COOL_VALUE}")
logging.info(f"V3DummyEnd: {cls.custom_action()}")
return
class V3DummyEndInherit(V3DummyEnd):
@classmethod
def define_schema(cls):
schema = super().define_schema()
schema.node_id = "V3_DummyEndInherit"
schema.display_name = "V3 Dummy End Inherit"
return schema
@classmethod
def execute(cls, xyz: io.Custom("XYZ").Type):
logging.info(f"V3DummyEndInherit: {cls.COOL_VALUE}")
return super().execute(xyz)
NODES_LIST: list[type[io.ComfyNode]] = [
V3TestNode,
V3LoraLoader,
NInputsTest,
V3TestSleep,
V3DummyStart,
V3DummyEnd,
V3DummyEndInherit,
]