v3: small nodes(pag, perpneg, morph, optimsteps)

This commit is contained in:
bigcat88 2025-07-19 11:55:43 +03:00
parent b6a4a4c664
commit edc8f06770
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
5 changed files with 346 additions and 0 deletions

View File

@ -0,0 +1,108 @@
from __future__ import annotations
import kornia.color
import torch
from kornia.morphology import (
bottom_hat,
closing,
dilation,
erosion,
gradient,
opening,
top_hat,
)
import comfy.model_management
from comfy_api.v3 import io
class ImageRGBToYUV(io.ComfyNodeV3):
@classmethod
def define_schema(cls):
return io.SchemaV3(
node_id="ImageRGBToYUV_V3",
category="image/batch",
inputs=[
io.Image.Input(id="image"),
],
outputs=[
io.Image.Output(id="Y", display_name="Y"),
io.Image.Output(id="U", display_name="U"),
io.Image.Output(id="V", display_name="V"),
],
)
@classmethod
def execute(cls, image):
out = kornia.color.rgb_to_ycbcr(image.movedim(-1, 1)).movedim(1, -1)
return io.NodeOutput(out[..., 0:1].expand_as(image), out[..., 1:2].expand_as(image), out[..., 2:3].expand_as(image))
class ImageYUVToRGB(io.ComfyNodeV3):
@classmethod
def define_schema(cls):
return io.SchemaV3(
node_id="ImageYUVToRGB_V3",
category="image/batch",
inputs=[
io.Image.Input(id="Y"),
io.Image.Input(id="U"),
io.Image.Input(id="V"),
],
outputs=[
io.Image.Output(),
],
)
@classmethod
def execute(cls, Y, U, V):
image = torch.cat([torch.mean(Y, dim=-1, keepdim=True), torch.mean(U, dim=-1, keepdim=True), torch.mean(V, dim=-1, keepdim=True)], dim=-1)
return io.NodeOutput(kornia.color.ycbcr_to_rgb(image.movedim(-1, 1)).movedim(1, -1))
class Morphology(io.ComfyNodeV3):
@classmethod
def define_schema(cls):
return io.SchemaV3(
node_id="Morphology_V3",
display_name="ImageMorphology _V3",
category="image/postprocessing",
inputs=[
io.Image.Input(id="image"),
io.Combo.Input(id="operation", options=["erode", "dilate", "open", "close", "gradient", "bottom_hat", "top_hat"]),
io.Int.Input(id="kernel_size", default=3, min=3, max=999, step=1),
],
outputs=[
io.Image.Output(),
],
)
@classmethod
def execute(cls, image, operation, kernel_size):
device = comfy.model_management.get_torch_device()
kernel = torch.ones(kernel_size, kernel_size, device=device)
image_k = image.to(device).movedim(-1, 1)
if operation == "erode":
output = erosion(image_k, kernel)
elif operation == "dilate":
output = dilation(image_k, kernel)
elif operation == "open":
output = opening(image_k, kernel)
elif operation == "close":
output = closing(image_k, kernel)
elif operation == "gradient":
output = gradient(image_k, kernel)
elif operation == "top_hat":
output = top_hat(image_k, kernel)
elif operation == "bottom_hat":
output = bottom_hat(image_k, kernel)
else:
raise ValueError(f"Invalid operation {operation} for morphology. Must be one of 'erode', 'dilate', 'open', 'close', 'gradient', 'tophat', 'bottomhat'")
return io.NodeOutput(output.to(comfy.model_management.intermediate_device()).movedim(1, -1))
NODES_LIST = [
ImageRGBToYUV,
ImageYUVToRGB,
Morphology,
]

View File

@ -0,0 +1,62 @@
from __future__ import annotations
import numpy as np
import torch
from comfy_api.v3 import io
# from https://github.com/bebebe666/OptimalSteps
def loglinear_interp(t_steps, num_steps):
"""Performs log-linear interpolation of a given array of decreasing numbers."""
xs = np.linspace(0, 1, len(t_steps))
ys = np.log(t_steps[::-1])
new_xs = np.linspace(0, 1, num_steps)
new_ys = np.interp(new_xs, xs, ys)
return np.exp(new_ys)[::-1].copy()
NOISE_LEVELS = {
"FLUX": [0.9968, 0.9886, 0.9819, 0.975, 0.966, 0.9471, 0.9158, 0.8287, 0.5512, 0.2808, 0.001],
"Wan": [1.0, 0.997, 0.995, 0.993, 0.991, 0.989, 0.987, 0.985, 0.98, 0.975, 0.973, 0.968, 0.96, 0.946, 0.927, 0.902, 0.864, 0.776, 0.539, 0.208, 0.001],
"Chroma": [0.992, 0.99, 0.988, 0.985, 0.982, 0.978, 0.973, 0.968, 0.961, 0.953, 0.943, 0.931, 0.917, 0.9, 0.881, 0.858, 0.832, 0.802, 0.769, 0.731, 0.69, 0.646, 0.599, 0.55, 0.501, 0.451, 0.402, 0.355, 0.311, 0.27, 0.232, 0.199, 0.169, 0.143, 0.12, 0.101, 0.084, 0.07, 0.058, 0.048, 0.001],
}
class OptimalStepsScheduler(io.ComfyNodeV3):
@classmethod
def define_schema(cls):
return io.SchemaV3(
node_id="OptimalStepsScheduler_V3",
category="sampling/custom_sampling/schedulers",
inputs=[
io.Combo.Input(id="model_type", options=["FLUX", "Wan", "Chroma"]),
io.Int.Input(id="steps", default=20, min=3, max=1000),
io.Float.Input(id="denoise", default=1.0, min=0.0, max=1.0, step=0.01),
],
outputs=[
io.Sigmas.Output(),
],
)
@classmethod
def execute(cls, model_type, steps, denoise):
total_steps = steps
if denoise < 1.0:
if denoise <= 0.0:
return io.NodeOutput(torch.FloatTensor([]))
total_steps = round(steps * denoise)
sigmas = NOISE_LEVELS[model_type][:]
if (steps + 1) != len(sigmas):
sigmas = loglinear_interp(sigmas, steps + 1)
sigmas = sigmas[-(total_steps + 1):]
sigmas[-1] = 0
return io.NodeOutput(torch.FloatTensor(sigmas))
NODES_LIST = [OptimalStepsScheduler]

View File

@ -0,0 +1,60 @@
from __future__ import annotations
import comfy.model_patcher
import comfy.samplers
from comfy_api.v3 import io
#Modified/simplified version of the node from: https://github.com/pamparamm/sd-perturbed-attention
#If you want the one with more options see the above repo.
#My modified one here is more basic but has fewer chances of breaking with ComfyUI updates.
class PerturbedAttentionGuidance(io.ComfyNodeV3):
@classmethod
def define_schema(cls):
return io.SchemaV3(
node_id="PerturbedAttentionGuidance_V3",
category="model_patches/unet",
inputs=[
io.Model.Input(id="model"),
io.Float.Input(id="scale", default=3.0, min=0.0, max=100.0, step=0.01, round=0.01),
],
outputs=[
io.Model.Output(),
],
)
@classmethod
def execute(cls, model, scale):
unet_block = "middle"
unet_block_id = 0
m = model.clone()
def perturbed_attention(q, k, v, extra_options, mask=None):
return v
def post_cfg_function(args):
model = args["model"]
cond_pred = args["cond_denoised"]
cond = args["cond"]
cfg_result = args["denoised"]
sigma = args["sigma"]
model_options = args["model_options"].copy()
x = args["input"]
if scale == 0:
return cfg_result
# Replace Self-attention with PAG
model_options = comfy.model_patcher.set_model_options_patch_replace(model_options, perturbed_attention, "attn1", unet_block, unet_block_id)
(pag,) = comfy.samplers.calc_cond_batch(model, [cond], x, sigma, model_options)
return cfg_result + (cond_pred - pag) * scale
m.set_model_sampler_post_cfg_function(post_cfg_function)
return io.NodeOutput(m)
NODES_LIST = [PerturbedAttentionGuidance]

View File

@ -0,0 +1,112 @@
from __future__ import annotations
import math
import torch
import comfy.model_management
import comfy.sampler_helpers
import comfy.samplers
import comfy.utils
import node_helpers
from comfy_api.v3 import io
def perp_neg(x, noise_pred_pos, noise_pred_neg, noise_pred_nocond, neg_scale, cond_scale):
pos = noise_pred_pos - noise_pred_nocond
neg = noise_pred_neg - noise_pred_nocond
perp = neg - ((torch.mul(neg, pos).sum())/(torch.norm(pos)**2)) * pos
perp_neg = perp * neg_scale
return noise_pred_nocond + cond_scale*(pos - perp_neg)
class Guider_PerpNeg(comfy.samplers.CFGGuider):
def set_conds(self, positive, negative, empty_negative_prompt):
empty_negative_prompt = node_helpers.conditioning_set_values(empty_negative_prompt, {"prompt_type": "negative"})
self.inner_set_conds({"positive": positive, "empty_negative_prompt": empty_negative_prompt, "negative": negative})
def set_cfg(self, cfg, neg_scale):
self.cfg = cfg
self.neg_scale = neg_scale
def predict_noise(self, x, timestep, model_options={}, seed=None):
# in CFGGuider.predict_noise, we call sampling_function(), which uses cfg_function() to compute pos & neg
# but we'd rather do a single batch of sampling pos, neg, and empty, so we call calc_cond_batch([pos,neg,empty]) directly
positive_cond = self.conds.get("positive", None)
negative_cond = self.conds.get("negative", None)
empty_cond = self.conds.get("empty_negative_prompt", None)
if not model_options.get("disable_cfg1_optimization", False):
if math.isclose(self.neg_scale, 0.0):
negative_cond = None
if math.isclose(self.cfg, 1.0):
empty_cond = None
conds = [positive_cond, negative_cond, empty_cond]
out = comfy.samplers.calc_cond_batch(self.inner_model, conds, x, timestep, model_options)
# Apply pre_cfg_functions since sampling_function() is skipped
for fn in model_options.get("sampler_pre_cfg_function", []):
args = {"conds":conds, "conds_out": out, "cond_scale": self.cfg, "timestep": timestep,
"input": x, "sigma": timestep, "model": self.inner_model, "model_options": model_options}
out = fn(args)
noise_pred_pos, noise_pred_neg, noise_pred_empty = out
cfg_result = perp_neg(x, noise_pred_pos, noise_pred_neg, noise_pred_empty, self.neg_scale, self.cfg)
# normally this would be done in cfg_function, but we skipped
# that for efficiency: we can compute the noise predictions in
# a single call to calc_cond_batch() (rather than two)
# so we replicate the hook here
for fn in model_options.get("sampler_post_cfg_function", []):
args = {
"denoised": cfg_result,
"cond": positive_cond,
"uncond": negative_cond,
"cond_scale": self.cfg,
"model": self.inner_model,
"uncond_denoised": noise_pred_neg,
"cond_denoised": noise_pred_pos,
"sigma": timestep,
"model_options": model_options,
"input": x,
# not in the original call in samplers.py:cfg_function, but made available for future hooks
"empty_cond": empty_cond,
"empty_cond_denoised": noise_pred_empty,}
cfg_result = fn(args)
return cfg_result
class PerpNegGuider(io.ComfyNodeV3):
@classmethod
def define_schema(cls):
return io.SchemaV3(
node_id="PerpNegGuider_V3",
category="_for_testing",
inputs=[
io.Model.Input(id="model"),
io.Conditioning.Input(id="positive"),
io.Conditioning.Input(id="negative"),
io.Conditioning.Input(id="empty_conditioning"),
io.Float.Input(id="cfg", default=8.0, min=0.0, max=100.0, step=0.1, round=0.01),
io.Float.Input(id="neg_scale", default=1.0, min=0.0, max=100.0, step=0.01),
],
outputs=[
io.Guider.Output(),
],
is_experimental=True,
)
@classmethod
def execute(cls, model, positive, negative, empty_conditioning, cfg, neg_scale):
guider = Guider_PerpNeg(model)
guider.set_conds(positive, negative, empty_conditioning)
guider.set_cfg(cfg, neg_scale)
return io.NodeOutput(guider)
NODES_LIST = [PerpNegGuider]

View File

@ -2320,6 +2320,10 @@ def init_builtin_extra_nodes():
"v3/nodes_gits.py",
"v3/nodes_images.py",
"v3/nodes_mask.py",
"v3/nodes_morphology.py",
"v3/nodes_optimalsteps.py",
"v3/nodes_pag.py",
"v3/nodes_perpneg.py",
"v3/nodes_preview_any.py",
"v3/nodes_primitive.py",
"v3/nodes_rebatch.py",