v3: refactoring of image saving code

This commit is contained in:
bigcat88 2025-07-20 11:03:04 +03:00
parent 45363ad31f
commit aae60881de
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
2 changed files with 175 additions and 124 deletions

View File

@ -4,9 +4,11 @@ import json
import os import os
import random import random
from io import BytesIO from io import BytesIO
from typing import Type
import av import av
import numpy as np import numpy as np
import torch
import torchaudio import torchaudio
from PIL import Image as PILImage from PIL import Image as PILImage
from PIL.PngImagePlugin import PngInfo from PIL.PngImagePlugin import PngInfo
@ -35,32 +37,161 @@ class SavedResult(dict):
return FolderType(self["type"]) return FolderType(self["type"])
class PreviewImage(_UIOutput): def _get_directory_by_folder_type(folder_type: FolderType) -> str:
def __init__(self, image: Image.Type, animated: bool=False, cls: ComfyNodeV3=None, **kwargs): if folder_type == FolderType.input:
output_dir = folder_paths.get_temp_directory() return folder_paths.get_input_directory()
prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) if folder_type == FolderType.output:
filename_prefix = "ComfyUI" + prefix_append return folder_paths.get_output_directory()
return folder_paths.get_temp_directory()
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, output_dir, image[0].shape[1], image[0].shape[0])
results = list()
for (batch_number, image) in enumerate(image):
img = PILImage.fromarray(np.clip(255. * image.cpu().numpy(), 0, 255).astype(np.uint8))
metadata = None
if not args.disable_metadata and cls is not None:
metadata = PngInfo()
if cls.hidden.prompt is not None:
metadata.add_text("prompt", json.dumps(cls.hidden.prompt))
if cls.hidden.extra_pnginfo is not None:
for x in cls.hidden.extra_pnginfo:
metadata.add_text(x, json.dumps(cls.hidden.extra_pnginfo[x]))
class ImageSaveHelper:
"""A helper class with static methods to handle image saving and metadata."""
@staticmethod
def _convert_tensor_to_pil(image_tensor: torch.Tensor) -> PILImage.Image:
"""Converts a single torch tensor to a PIL Image."""
return PILImage.fromarray(np.clip(255.0 * image_tensor.cpu().numpy(), 0, 255).astype(np.uint8))
@staticmethod
def _create_png_metadata(cls: Type[ComfyNodeV3] | None) -> PngInfo | None:
"""Creates a PngInfo object with prompt and extra_pnginfo."""
if args.disable_metadata or cls is None or not cls.hidden:
return None
metadata = PngInfo()
if cls.hidden.prompt:
metadata.add_text("prompt", json.dumps(cls.hidden.prompt))
if cls.hidden.extra_pnginfo:
for x in cls.hidden.extra_pnginfo:
metadata.add_text(x, json.dumps(cls.hidden.extra_pnginfo[x]))
return metadata
@staticmethod
def _create_animated_png_metadata(cls: Type[ComfyNodeV3] | None) -> PngInfo | None:
"""Creates a PngInfo object with prompt and extra_pnginfo for animated PNGs (APNG)."""
if args.disable_metadata or cls is None or not cls.hidden:
return None
metadata = PngInfo()
if cls.hidden.prompt:
metadata.add(
b"comf",
"prompt".encode("latin-1", "strict")
+ b"\0"
+ json.dumps(cls.hidden.prompt).encode("latin-1", "strict"),
after_idat=True,
)
if cls.hidden.extra_pnginfo:
for x in cls.hidden.extra_pnginfo:
metadata.add(
b"comf",
x.encode("latin-1", "strict")
+ b"\0"
+ json.dumps(cls.hidden.extra_pnginfo[x]).encode("latin-1", "strict"),
after_idat=True,
)
return metadata
@staticmethod
def _create_webp_metadata(pil_image: PILImage.Image, cls: Type[ComfyNodeV3] | None) -> PILImage.Exif:
"""Creates EXIF metadata bytes for WebP images."""
exif_data = pil_image.getexif()
if args.disable_metadata or cls is None or cls.hidden is None:
return exif_data
if cls.hidden.prompt is not None:
exif_data[0x0110] = "prompt:{}".format(json.dumps(cls.hidden.prompt)) # EXIF 0x0110 = Model
if cls.hidden.extra_pnginfo is not None:
inital_exif_tag = 0x010F # EXIF 0x010f = Make
for key, value in cls.hidden.extra_pnginfo.items():
exif_data[inital_exif_tag] = "{}:{}".format(key, json.dumps(value))
inital_exif_tag -= 1
return exif_data
@staticmethod
def save_images(
images, filename_prefix: str, folder_type: FolderType, cls: Type[ComfyNodeV3] | None, compress_level = 4,
) -> list[SavedResult]:
"""Saves a batch of images as individual PNG files."""
full_output_folder, filename, counter, subfolder, _ = folder_paths.get_save_image_path(
filename_prefix, _get_directory_by_folder_type(folder_type), images[0].shape[1], images[0].shape[0]
)
results = []
metadata = ImageSaveHelper._create_png_metadata(cls)
for batch_number, image_tensor in enumerate(images):
img = ImageSaveHelper._convert_tensor_to_pil(image_tensor)
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
file = f"{filename_with_batch_num}_{counter:05}_.png" file = f"{filename_with_batch_num}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=1) img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=compress_level)
results.append(SavedResult(file, subfolder, FolderType.temp)) results.append(SavedResult(file, subfolder, folder_type))
counter += 1 counter += 1
return results
self.values = results @staticmethod
def save_animated_png(
images,
filename_prefix: str,
folder_type: FolderType,
cls: Type[ComfyNodeV3] | None,
fps: float,
compress_level: int
) -> SavedResult:
"""Saves a batch of images as a single animated PNG."""
full_output_folder, filename, counter, subfolder, _ = folder_paths.get_save_image_path(
filename_prefix, _get_directory_by_folder_type(folder_type), images[0].shape[1], images[0].shape[0]
)
pil_images = [ImageSaveHelper._convert_tensor_to_pil(img) for img in images]
metadata = ImageSaveHelper._create_animated_png_metadata(cls)
file = f"{filename}_{counter:05}_.png"
save_path = os.path.join(full_output_folder, file)
pil_images[0].save(
save_path,
pnginfo=metadata,
compress_level=compress_level,
save_all=True,
duration=int(1000.0 / fps),
append_images=pil_images[1:],
)
return SavedResult(file, subfolder, folder_type)
@staticmethod
def save_animated_webp(
images,
filename_prefix: str,
folder_type: FolderType,
cls: Type[ComfyNodeV3] | None,
fps: float,
lossless: bool,
quality: int,
method: int,
) -> SavedResult:
"""Saves a batch of images as a single animated WebP."""
full_output_folder, filename, counter, subfolder, _ = folder_paths.get_save_image_path(
filename_prefix, _get_directory_by_folder_type(folder_type), images[0].shape[1], images[0].shape[0]
)
pil_images = [ImageSaveHelper._convert_tensor_to_pil(img) for img in images]
pil_exif = ImageSaveHelper._create_webp_metadata(pil_images[0], cls)
file = f"{filename}_{counter:05}_.webp"
pil_images[0].save(
os.path.join(full_output_folder, file),
save_all=True,
duration=int(1000.0 / fps),
append_images=pil_images[1:],
exif=pil_exif,
lossless=lossless,
quality=quality,
method=method,
)
return SavedResult(file, subfolder, folder_type)
class PreviewImage(_UIOutput):
def __init__(self, image: Image.Type, animated: bool=False, cls: ComfyNodeV3=None, **kwargs):
self.values = ImageSaveHelper.save_images(
image,
filename_prefix="ComfyUI_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for _ in range(5)),
folder_type=FolderType.temp,
cls=cls,
compress_level=1,
)
self.animated = animated self.animated = animated
def as_dict(self): def as_dict(self):

View File

@ -1,17 +1,14 @@
import hashlib import hashlib
import json
import os import os
import numpy as np import numpy as np
import torch import torch
from PIL import Image, ImageOps, ImageSequence from PIL import Image, ImageOps, ImageSequence
from PIL.PngImagePlugin import PngInfo
import comfy.utils import comfy.utils
import folder_paths import folder_paths
import node_helpers import node_helpers
import nodes import nodes
from comfy.cli_args import args
from comfy_api.v3 import io, ui from comfy_api.v3 import io, ui
from server import PromptServer from server import PromptServer
@ -633,48 +630,15 @@ class SaveAnimatedPNG(io.ComfyNodeV3):
@classmethod @classmethod
def execute(cls, images, fps, compress_level, filename_prefix="ComfyUI") -> io.NodeOutput: def execute(cls, images, fps, compress_level, filename_prefix="ComfyUI") -> io.NodeOutput:
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( result = ui.ImageSaveHelper.save_animated_png(
filename_prefix, folder_paths.get_output_directory(), images[0].shape[1], images[0].shape[0] images=images,
) filename_prefix=filename_prefix,
results = [] folder_type=io.FolderType.output,
pil_images = [] cls=cls,
for image in images: fps=fps,
img = Image.fromarray(np.clip(255.0 * image.cpu().numpy(), 0, 255).astype(np.uint8))
pil_images.append(img)
metadata = None
if not args.disable_metadata:
metadata = PngInfo()
if cls.hidden.prompt is not None:
metadata.add(
b"comf",
"prompt".encode("latin-1", "strict")
+ b"\0"
+ json.dumps(cls.hidden.prompt).encode("latin-1", "strict"),
after_idat=True,
)
if cls.hidden.extra_pnginfo is not None:
for x in cls.hidden.extra_pnginfo:
metadata.add(
b"comf",
x.encode("latin-1", "strict")
+ b"\0"
+ json.dumps(cls.hidden.extra_pnginfo[x]).encode("latin-1", "strict"),
after_idat=True,
)
file = f"{filename}_{counter:05}_.png"
pil_images[0].save(
os.path.join(full_output_folder, file),
pnginfo=metadata,
compress_level=compress_level, compress_level=compress_level,
save_all=True,
duration=int(1000.0 / fps),
append_images=pil_images[1:],
) )
results.append(ui.SavedResult(file, subfolder, io.FolderType.output)) return io.NodeOutput(ui={"images": [result], "animated": (len(images) != 1,)})
return io.NodeOutput(ui={"images": results, "animated": (True,)})
class SaveAnimatedWEBP(io.ComfyNodeV3): class SaveAnimatedWEBP(io.ComfyNodeV3):
@ -693,53 +657,24 @@ class SaveAnimatedWEBP(io.ComfyNodeV3):
io.Boolean.Input("lossless", default=True), io.Boolean.Input("lossless", default=True),
io.Int.Input("quality", default=80, min=0, max=100), io.Int.Input("quality", default=80, min=0, max=100),
io.Combo.Input("method", options=list(cls.COMPRESS_METHODS.keys())), io.Combo.Input("method", options=list(cls.COMPRESS_METHODS.keys())),
# "num_frames": ("INT", {"default": 0, "min": 0, "max": 8192}),
], ],
hidden=[io.Hidden.prompt, io.Hidden.extra_pnginfo], hidden=[io.Hidden.prompt, io.Hidden.extra_pnginfo],
is_output_node=True, is_output_node=True,
) )
@classmethod @classmethod
def execute(cls, images, fps, filename_prefix, lossless, quality, method, num_frames=0) -> io.NodeOutput: def execute(cls, images, fps, filename_prefix, lossless, quality, method) -> io.NodeOutput:
method = cls.COMPRESS_METHODS.get(method) result = ui.ImageSaveHelper.save_animated_webp(
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( images=images,
filename_prefix, folder_paths.get_output_directory(), images[0].shape[1], images[0].shape[0] filename_prefix=filename_prefix,
folder_type=io.FolderType.output,
cls=cls,
fps=fps,
lossless=lossless,
quality=quality,
method=cls.COMPRESS_METHODS.get(method)
) )
results = [] return io.NodeOutput(ui={"images": [result], "animated": (len(images) != 1,)})
pil_images = []
for image in images:
img = Image.fromarray(np.clip(255.0 * image.cpu().numpy(), 0, 255).astype(np.uint8))
pil_images.append(img)
metadata = pil_images[0].getexif()
if not args.disable_metadata:
if cls.hidden.prompt is not None:
metadata[0x0110] = "prompt:{}".format(json.dumps(cls.hidden.prompt))
if cls.hidden.extra_pnginfo is not None:
inital_exif = 0x010F
for x in cls.hidden.extra_pnginfo:
metadata[inital_exif] = "{}:{}".format(x, json.dumps(cls.hidden.extra_pnginfo[x]))
inital_exif -= 1
if num_frames == 0:
num_frames = len(pil_images)
for i in range(0, len(pil_images), num_frames):
file = f"{filename}_{counter:05}_.webp"
pil_images[i].save(
os.path.join(full_output_folder, file),
save_all=True,
duration=int(1000.0 / fps),
append_images=pil_images[i + 1 : i + num_frames],
exif=metadata,
lossless=lossless,
quality=quality,
method=method,
)
results.append(ui.SavedResult(file, subfolder, io.FolderType.output))
counter += 1
return io.NodeOutput(ui={"images": results, "animated": (num_frames != 1,)})
class SaveImage(io.ComfyNodeV3): class SaveImage(io.ComfyNodeV3):
@ -768,28 +703,13 @@ class SaveImage(io.ComfyNodeV3):
@classmethod @classmethod
def execute(cls, images, filename_prefix="ComfyUI") -> io.NodeOutput: def execute(cls, images, filename_prefix="ComfyUI") -> io.NodeOutput:
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( results = ui.ImageSaveHelper.save_images(
filename_prefix, folder_paths.get_output_directory(), images[0].shape[1], images[0].shape[0] images,
filename_prefix=filename_prefix,
folder_type=io.FolderType.output,
cls=cls,
compress_level=4,
) )
results = []
for batch_number, image in enumerate(images):
i = 255.0 * image.cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
metadata = None
if not args.disable_metadata:
metadata = PngInfo()
if cls.hidden.prompt is not None:
metadata.add_text("prompt", json.dumps(cls.hidden.prompt))
if cls.hidden.extra_pnginfo is not None:
for x in cls.hidden.extra_pnginfo:
metadata.add_text(x, json.dumps(cls.hidden.extra_pnginfo[x]))
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
file = f"{filename_with_batch_num}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4)
results.append(ui.SavedResult(file, subfolder, io.FolderType.output))
counter += 1
return io.NodeOutput(ui={"images": results}) return io.NodeOutput(ui={"images": results})