From ac05d9a5fadeedc6270b531054b1112f0960f6f6 Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Sun, 13 Jul 2025 08:28:50 +0300 Subject: [PATCH] V3 Nodes: LoadAudio and PreviewAudio --- comfy_api/v3/ui.py | 109 +++++++++++++++++++++++++++++++++ comfy_extras/v3/nodes_audio.py | 78 +++++++++++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 comfy_extras/v3/nodes_audio.py diff --git a/comfy_api/v3/ui.py b/comfy_api/v3/ui.py index a4b624f0b..976202244 100644 --- a/comfy_api/v3/ui.py +++ b/comfy_api/v3/ui.py @@ -1,5 +1,9 @@ from __future__ import annotations +from abc import ABC, abstractmethod +from io import BytesIO +import av +import torchaudio from comfy_api.v3.io import Image, FolderType, _UIOutput, ComfyNodeV3 # used for image preview from comfy.cli_args import args @@ -119,6 +123,111 @@ class PreviewAudio(_UIOutput): def __init__(self, values: list[SavedResult | dict], **kwargs): self.values = values + def __init__(self, audio, cls: ComfyNodeV3=None, **kwargs): + output_dir = folder_paths.get_temp_directory() + type = "temp" + prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) + filename_prefix = "ComfyUI" + quality = "128k" + format = "flac" + + filename_prefix += prefix_append + full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path( + filename_prefix, output_dir + ) + + # Prepare metadata dictionary + metadata = {} + if not args.disable_metadata and cls is not None: + if cls.hidden.prompt is not None: + metadata["prompt"] = json.dumps(cls.hidden.prompt) + if cls.hidden.extra_pnginfo is not None: + for x in cls.hidden.extra_pnginfo: + metadata[x] = json.dumps(cls.hidden.extra_pnginfo[x]) + + # Opus supported sample rates + OPUS_RATES = [8000, 12000, 16000, 24000, 48000] + results = [] + for (batch_number, waveform) in enumerate(audio["waveform"].cpu()): + filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) + file = f"{filename_with_batch_num}_{counter:05}_.{format}" + output_path = os.path.join(full_output_folder, file) + + # Use original sample rate initially + sample_rate = audio["sample_rate"] + + # Handle Opus sample rate requirements + if format == "opus": + if sample_rate > 48000: + sample_rate = 48000 + elif sample_rate not in OPUS_RATES: + # Find the next highest supported rate + for rate in sorted(OPUS_RATES): + if rate > sample_rate: + sample_rate = rate + break + if sample_rate not in OPUS_RATES: # Fallback if still not supported + sample_rate = 48000 + + # Resample if necessary + if sample_rate != audio["sample_rate"]: + waveform = torchaudio.functional.resample(waveform, audio["sample_rate"], sample_rate) + + # Create output with specified format + output_buffer = BytesIO() + output_container = av.open(output_buffer, mode='w', format=format) + + # Set metadata on the container + for key, value in metadata.items(): + output_container.metadata[key] = value + + # Set up the output stream with appropriate properties + if format == "opus": + out_stream = output_container.add_stream("libopus", rate=sample_rate) + if quality == "64k": + out_stream.bit_rate = 64000 + elif quality == "96k": + out_stream.bit_rate = 96000 + elif quality == "128k": + out_stream.bit_rate = 128000 + elif quality == "192k": + out_stream.bit_rate = 192000 + elif quality == "320k": + out_stream.bit_rate = 320000 + elif format == "mp3": + out_stream = output_container.add_stream("libmp3lame", rate=sample_rate) + if quality == "V0": + # TODO i would really love to support V3 and V5 but there doesn't seem to be a way to set the qscale level, the property below is a bool + out_stream.codec_context.qscale = 1 + elif quality == "128k": + out_stream.bit_rate = 128000 + elif quality == "320k": + out_stream.bit_rate = 320000 + else: # format == "flac": + out_stream = output_container.add_stream("flac", rate=sample_rate) + + frame = av.AudioFrame.from_ndarray(waveform.movedim(0, 1).reshape(1, -1).float().numpy(), format='flt', + layout='mono' if waveform.shape[0] == 1 else 'stereo') + frame.sample_rate = sample_rate + frame.pts = 0 + output_container.mux(out_stream.encode(frame)) + + # Flush encoder + output_container.mux(out_stream.encode(None)) + + # Close containers + output_container.close() + + # Write the output to file + output_buffer.seek(0) + with open(output_path, 'wb') as f: + f.write(output_buffer.getbuffer()) + + results.append(SavedResult(file, subfolder, type)) + counter += 1 + + self.values = results + def as_dict(self): return {"audio": self.values} diff --git a/comfy_extras/v3/nodes_audio.py b/comfy_extras/v3/nodes_audio.py new file mode 100644 index 000000000..ae20a1da9 --- /dev/null +++ b/comfy_extras/v3/nodes_audio.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import torchaudio +import folder_paths +import os +import io +import hashlib +from comfy_api.v3 import io, ui + + +class PreviewAudio_V3(io.ComfyNodeV3): + @classmethod + def DEFINE_SCHEMA(cls): + return io.SchemaV3( + node_id="PreviewAudio_V3", + display_name="Preview Audio _V3", + category="audio", + inputs=[ + io.Audio.Input("audio"), + ], + hidden=[io.Hidden.prompt, io.Hidden.extra_pnginfo], + is_output_node=True, + ) + + @classmethod + def execute(cls, audio): + return io.NodeOutput(ui=ui.PreviewAudio(audio, cls=cls)) + + +class LoadAudio_V3(io.ComfyNodeV3): + @classmethod + def DEFINE_SCHEMA(cls): + return io.SchemaV3( + node_id="LoadAudio_V3", + display_name="Load Audio _V3", + category="audio", + inputs=[ + io.Combo.Input("audio", upload=io.UploadType.audio, options=cls.get_files_options()), + ], + outputs=[io.Audio.Output()], + ) + + @classmethod + def get_files_options(cls) -> list[str]: + input_dir = folder_paths.get_input_directory() + return sorted(folder_paths.filter_files_content_types(os.listdir(input_dir), ["audio", "video"])) + + @classmethod + def execute(cls, audio) -> io.NodeOutput: + waveform, sample_rate = torchaudio.load(folder_paths.get_annotated_filepath(audio)) + return io.NodeOutput({"waveform": waveform.unsqueeze(0), "sample_rate": sample_rate}) + + @classmethod + def fingerprint_inputs(s, audio): + image_path = folder_paths.get_annotated_filepath(audio) + m = hashlib.sha256() + with open(image_path, "rb") as f: + m.update(f.read()) + return m.digest().hex() + + @classmethod + def validate_inputs(s, audio): + if not folder_paths.exists_annotated_filepath(audio): + return "Invalid audio file: {}".format(audio) + return True + + +NODES_LIST: list[type[io.ComfyNodeV3]] = [ + # EmptyLatentAudio_V3, + # VAEEncodeAudio_V3, + # VAEDecodeAudio_V3, + # SaveAudio_V3, + # SaveAudioMP3_V3, + # SaveAudioOpus_V3, + LoadAudio_V3, + PreviewAudio_V3, + # ConditioningStableAudio_V3, +]