From aae60881de2f05e634a8354af0fc0c6ef0d6eeba Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Sun, 20 Jul 2025 11:03:04 +0300 Subject: [PATCH] v3: refactoring of image saving code --- comfy_api/v3/ui.py | 171 ++++++++++++++++++++++++++++---- comfy_extras/v3/nodes_images.py | 128 +++++------------------- 2 files changed, 175 insertions(+), 124 deletions(-) diff --git a/comfy_api/v3/ui.py b/comfy_api/v3/ui.py index 390b986d4..16873b4b2 100644 --- a/comfy_api/v3/ui.py +++ b/comfy_api/v3/ui.py @@ -4,9 +4,11 @@ import json import os import random from io import BytesIO +from typing import Type import av import numpy as np +import torch import torchaudio from PIL import Image as PILImage from PIL.PngImagePlugin import PngInfo @@ -35,32 +37,161 @@ class SavedResult(dict): return FolderType(self["type"]) -class PreviewImage(_UIOutput): - def __init__(self, image: Image.Type, animated: bool=False, cls: ComfyNodeV3=None, **kwargs): - output_dir = folder_paths.get_temp_directory() - prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) - filename_prefix = "ComfyUI" + prefix_append +def _get_directory_by_folder_type(folder_type: FolderType) -> str: + if folder_type == FolderType.input: + return folder_paths.get_input_directory() + if folder_type == FolderType.output: + return folder_paths.get_output_directory() + return folder_paths.get_temp_directory() - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, output_dir, image[0].shape[1], image[0].shape[0]) - results = list() - for (batch_number, image) in enumerate(image): - img = PILImage.fromarray(np.clip(255. * image.cpu().numpy(), 0, 255).astype(np.uint8)) - metadata = None - if not args.disable_metadata and cls is not None: - metadata = PngInfo() - if cls.hidden.prompt is not None: - metadata.add_text("prompt", json.dumps(cls.hidden.prompt)) - if cls.hidden.extra_pnginfo is not None: - for x in cls.hidden.extra_pnginfo: - metadata.add_text(x, json.dumps(cls.hidden.extra_pnginfo[x])) +class ImageSaveHelper: + """A helper class with static methods to handle image saving and metadata.""" + + @staticmethod + def _convert_tensor_to_pil(image_tensor: torch.Tensor) -> PILImage.Image: + """Converts a single torch tensor to a PIL Image.""" + return PILImage.fromarray(np.clip(255.0 * image_tensor.cpu().numpy(), 0, 255).astype(np.uint8)) + + @staticmethod + def _create_png_metadata(cls: Type[ComfyNodeV3] | None) -> PngInfo | None: + """Creates a PngInfo object with prompt and extra_pnginfo.""" + if args.disable_metadata or cls is None or not cls.hidden: + return None + metadata = PngInfo() + if cls.hidden.prompt: + metadata.add_text("prompt", json.dumps(cls.hidden.prompt)) + if cls.hidden.extra_pnginfo: + for x in cls.hidden.extra_pnginfo: + metadata.add_text(x, json.dumps(cls.hidden.extra_pnginfo[x])) + return metadata + + @staticmethod + def _create_animated_png_metadata(cls: Type[ComfyNodeV3] | None) -> PngInfo | None: + """Creates a PngInfo object with prompt and extra_pnginfo for animated PNGs (APNG).""" + if args.disable_metadata or cls is None or not cls.hidden: + return None + metadata = PngInfo() + if cls.hidden.prompt: + metadata.add( + b"comf", + "prompt".encode("latin-1", "strict") + + b"\0" + + json.dumps(cls.hidden.prompt).encode("latin-1", "strict"), + after_idat=True, + ) + if cls.hidden.extra_pnginfo: + for x in cls.hidden.extra_pnginfo: + metadata.add( + b"comf", + x.encode("latin-1", "strict") + + b"\0" + + json.dumps(cls.hidden.extra_pnginfo[x]).encode("latin-1", "strict"), + after_idat=True, + ) + return metadata + + @staticmethod + def _create_webp_metadata(pil_image: PILImage.Image, cls: Type[ComfyNodeV3] | None) -> PILImage.Exif: + """Creates EXIF metadata bytes for WebP images.""" + exif_data = pil_image.getexif() + if args.disable_metadata or cls is None or cls.hidden is None: + return exif_data + if cls.hidden.prompt is not None: + exif_data[0x0110] = "prompt:{}".format(json.dumps(cls.hidden.prompt)) # EXIF 0x0110 = Model + if cls.hidden.extra_pnginfo is not None: + inital_exif_tag = 0x010F # EXIF 0x010f = Make + for key, value in cls.hidden.extra_pnginfo.items(): + exif_data[inital_exif_tag] = "{}:{}".format(key, json.dumps(value)) + inital_exif_tag -= 1 + return exif_data + + @staticmethod + def save_images( + images, filename_prefix: str, folder_type: FolderType, cls: Type[ComfyNodeV3] | None, compress_level = 4, + ) -> list[SavedResult]: + """Saves a batch of images as individual PNG files.""" + full_output_folder, filename, counter, subfolder, _ = folder_paths.get_save_image_path( + filename_prefix, _get_directory_by_folder_type(folder_type), images[0].shape[1], images[0].shape[0] + ) + results = [] + metadata = ImageSaveHelper._create_png_metadata(cls) + for batch_number, image_tensor in enumerate(images): + img = ImageSaveHelper._convert_tensor_to_pil(image_tensor) filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) file = f"{filename_with_batch_num}_{counter:05}_.png" - img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=1) - results.append(SavedResult(file, subfolder, FolderType.temp)) + img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=compress_level) + results.append(SavedResult(file, subfolder, folder_type)) counter += 1 + return results - self.values = results + @staticmethod + def save_animated_png( + images, + filename_prefix: str, + folder_type: FolderType, + cls: Type[ComfyNodeV3] | None, + fps: float, + compress_level: int + ) -> SavedResult: + """Saves a batch of images as a single animated PNG.""" + full_output_folder, filename, counter, subfolder, _ = folder_paths.get_save_image_path( + filename_prefix, _get_directory_by_folder_type(folder_type), images[0].shape[1], images[0].shape[0] + ) + pil_images = [ImageSaveHelper._convert_tensor_to_pil(img) for img in images] + metadata = ImageSaveHelper._create_animated_png_metadata(cls) + file = f"{filename}_{counter:05}_.png" + save_path = os.path.join(full_output_folder, file) + pil_images[0].save( + save_path, + pnginfo=metadata, + compress_level=compress_level, + save_all=True, + duration=int(1000.0 / fps), + append_images=pil_images[1:], + ) + return SavedResult(file, subfolder, folder_type) + + @staticmethod + def save_animated_webp( + images, + filename_prefix: str, + folder_type: FolderType, + cls: Type[ComfyNodeV3] | None, + fps: float, + lossless: bool, + quality: int, + method: int, + ) -> SavedResult: + """Saves a batch of images as a single animated WebP.""" + full_output_folder, filename, counter, subfolder, _ = folder_paths.get_save_image_path( + filename_prefix, _get_directory_by_folder_type(folder_type), images[0].shape[1], images[0].shape[0] + ) + pil_images = [ImageSaveHelper._convert_tensor_to_pil(img) for img in images] + pil_exif = ImageSaveHelper._create_webp_metadata(pil_images[0], cls) + file = f"{filename}_{counter:05}_.webp" + pil_images[0].save( + os.path.join(full_output_folder, file), + save_all=True, + duration=int(1000.0 / fps), + append_images=pil_images[1:], + exif=pil_exif, + lossless=lossless, + quality=quality, + method=method, + ) + return SavedResult(file, subfolder, folder_type) + + +class PreviewImage(_UIOutput): + def __init__(self, image: Image.Type, animated: bool=False, cls: ComfyNodeV3=None, **kwargs): + self.values = ImageSaveHelper.save_images( + image, + filename_prefix="ComfyUI_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for _ in range(5)), + folder_type=FolderType.temp, + cls=cls, + compress_level=1, + ) self.animated = animated def as_dict(self): diff --git a/comfy_extras/v3/nodes_images.py b/comfy_extras/v3/nodes_images.py index 297719e7c..16e77de4a 100644 --- a/comfy_extras/v3/nodes_images.py +++ b/comfy_extras/v3/nodes_images.py @@ -1,17 +1,14 @@ import hashlib -import json import os import numpy as np import torch from PIL import Image, ImageOps, ImageSequence -from PIL.PngImagePlugin import PngInfo import comfy.utils import folder_paths import node_helpers import nodes -from comfy.cli_args import args from comfy_api.v3 import io, ui from server import PromptServer @@ -633,48 +630,15 @@ class SaveAnimatedPNG(io.ComfyNodeV3): @classmethod def execute(cls, images, fps, compress_level, filename_prefix="ComfyUI") -> io.NodeOutput: - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( - filename_prefix, folder_paths.get_output_directory(), images[0].shape[1], images[0].shape[0] - ) - results = [] - pil_images = [] - for image in images: - img = Image.fromarray(np.clip(255.0 * image.cpu().numpy(), 0, 255).astype(np.uint8)) - pil_images.append(img) - - metadata = None - if not args.disable_metadata: - metadata = PngInfo() - if cls.hidden.prompt is not None: - metadata.add( - b"comf", - "prompt".encode("latin-1", "strict") - + b"\0" - + json.dumps(cls.hidden.prompt).encode("latin-1", "strict"), - after_idat=True, - ) - if cls.hidden.extra_pnginfo is not None: - for x in cls.hidden.extra_pnginfo: - metadata.add( - b"comf", - x.encode("latin-1", "strict") - + b"\0" - + json.dumps(cls.hidden.extra_pnginfo[x]).encode("latin-1", "strict"), - after_idat=True, - ) - - file = f"{filename}_{counter:05}_.png" - pil_images[0].save( - os.path.join(full_output_folder, file), - pnginfo=metadata, + result = ui.ImageSaveHelper.save_animated_png( + images=images, + filename_prefix=filename_prefix, + folder_type=io.FolderType.output, + cls=cls, + fps=fps, compress_level=compress_level, - save_all=True, - duration=int(1000.0 / fps), - append_images=pil_images[1:], ) - results.append(ui.SavedResult(file, subfolder, io.FolderType.output)) - - return io.NodeOutput(ui={"images": results, "animated": (True,)}) + return io.NodeOutput(ui={"images": [result], "animated": (len(images) != 1,)}) class SaveAnimatedWEBP(io.ComfyNodeV3): @@ -693,53 +657,24 @@ class SaveAnimatedWEBP(io.ComfyNodeV3): io.Boolean.Input("lossless", default=True), io.Int.Input("quality", default=80, min=0, max=100), io.Combo.Input("method", options=list(cls.COMPRESS_METHODS.keys())), - # "num_frames": ("INT", {"default": 0, "min": 0, "max": 8192}), ], hidden=[io.Hidden.prompt, io.Hidden.extra_pnginfo], is_output_node=True, ) @classmethod - def execute(cls, images, fps, filename_prefix, lossless, quality, method, num_frames=0) -> io.NodeOutput: - method = cls.COMPRESS_METHODS.get(method) - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( - filename_prefix, folder_paths.get_output_directory(), images[0].shape[1], images[0].shape[0] + def execute(cls, images, fps, filename_prefix, lossless, quality, method) -> io.NodeOutput: + result = ui.ImageSaveHelper.save_animated_webp( + images=images, + filename_prefix=filename_prefix, + folder_type=io.FolderType.output, + cls=cls, + fps=fps, + lossless=lossless, + quality=quality, + method=cls.COMPRESS_METHODS.get(method) ) - results = [] - pil_images = [] - for image in images: - img = Image.fromarray(np.clip(255.0 * image.cpu().numpy(), 0, 255).astype(np.uint8)) - pil_images.append(img) - - metadata = pil_images[0].getexif() - if not args.disable_metadata: - if cls.hidden.prompt is not None: - metadata[0x0110] = "prompt:{}".format(json.dumps(cls.hidden.prompt)) - if cls.hidden.extra_pnginfo is not None: - inital_exif = 0x010F - for x in cls.hidden.extra_pnginfo: - metadata[inital_exif] = "{}:{}".format(x, json.dumps(cls.hidden.extra_pnginfo[x])) - inital_exif -= 1 - - if num_frames == 0: - num_frames = len(pil_images) - - for i in range(0, len(pil_images), num_frames): - file = f"{filename}_{counter:05}_.webp" - pil_images[i].save( - os.path.join(full_output_folder, file), - save_all=True, - duration=int(1000.0 / fps), - append_images=pil_images[i + 1 : i + num_frames], - exif=metadata, - lossless=lossless, - quality=quality, - method=method, - ) - results.append(ui.SavedResult(file, subfolder, io.FolderType.output)) - counter += 1 - - return io.NodeOutput(ui={"images": results, "animated": (num_frames != 1,)}) + return io.NodeOutput(ui={"images": [result], "animated": (len(images) != 1,)}) class SaveImage(io.ComfyNodeV3): @@ -768,28 +703,13 @@ class SaveImage(io.ComfyNodeV3): @classmethod def execute(cls, images, filename_prefix="ComfyUI") -> io.NodeOutput: - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( - filename_prefix, folder_paths.get_output_directory(), images[0].shape[1], images[0].shape[0] + results = ui.ImageSaveHelper.save_images( + images, + filename_prefix=filename_prefix, + folder_type=io.FolderType.output, + cls=cls, + compress_level=4, ) - results = [] - for batch_number, image in enumerate(images): - i = 255.0 * image.cpu().numpy() - img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) - metadata = None - if not args.disable_metadata: - metadata = PngInfo() - if cls.hidden.prompt is not None: - metadata.add_text("prompt", json.dumps(cls.hidden.prompt)) - if cls.hidden.extra_pnginfo is not None: - for x in cls.hidden.extra_pnginfo: - metadata.add_text(x, json.dumps(cls.hidden.extra_pnginfo[x])) - - filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) - file = f"{filename_with_batch_num}_{counter:05}_.png" - img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4) - results.append(ui.SavedResult(file, subfolder, io.FolderType.output)) - counter += 1 - return io.NodeOutput(ui={"images": results})