diff --git a/.github/workflows/test-execution.yml b/.github/workflows/test-execution.yml new file mode 100644 index 000000000..af20594c1 --- /dev/null +++ b/.github/workflows/test-execution.yml @@ -0,0 +1,30 @@ +name: Execution Tests + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + test: + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + continue-on-error: true + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + - name: Install requirements + run: | + python -m pip install --upgrade pip + pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu + pip install -r requirements.txt + pip install websocket-client + - name: Run Execution Tests + run: | + python -m pytest tests/execution -v \ No newline at end of file diff --git a/comfy_execution/progress.py b/comfy_execution/progress.py index e8f5ede1e..f951a3350 100644 --- a/comfy_execution/progress.py +++ b/comfy_execution/progress.py @@ -181,8 +181,9 @@ class WebUIProgressHandler(ProgressHandler): } # Send a combined progress_state message with all node states + # Include client_id to ensure message is only sent to the initiating client self.server_instance.send_sync( - "progress_state", {"prompt_id": prompt_id, "nodes": active_nodes} + "progress_state", {"prompt_id": prompt_id, "nodes": active_nodes}, self.server_instance.client_id ) @override diff --git a/tests/inference/extra_model_paths.yaml b/tests/execution/extra_model_paths.yaml similarity index 100% rename from tests/inference/extra_model_paths.yaml rename to tests/execution/extra_model_paths.yaml diff --git a/tests/inference/test_async_nodes.py b/tests/execution/test_async_nodes.py similarity index 99% rename from tests/inference/test_async_nodes.py rename to tests/execution/test_async_nodes.py index f029953dd..1e170e4f6 100644 --- a/tests/inference/test_async_nodes.py +++ b/tests/execution/test_async_nodes.py @@ -7,7 +7,7 @@ import subprocess from pytest import fixture from comfy_execution.graph_utils import GraphBuilder -from tests.inference.test_execution import ComfyClient, run_warmup +from tests.execution.test_execution import ComfyClient, run_warmup @pytest.mark.execution @@ -23,7 +23,7 @@ class TestAsyncNodes: '--output-directory', args_pytest["output_dir"], '--listen', args_pytest["listen"], '--port', str(args_pytest["port"]), - '--extra-model-paths-config', 'tests/inference/extra_model_paths.yaml', + '--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml', '--cpu', ] use_lru, lru_size = request.param diff --git a/tests/inference/test_execution.py b/tests/execution/test_execution.py similarity index 99% rename from tests/inference/test_execution.py rename to tests/execution/test_execution.py index e7b29302e..e33555a09 100644 --- a/tests/inference/test_execution.py +++ b/tests/execution/test_execution.py @@ -149,7 +149,7 @@ class TestExecution: '--output-directory', args_pytest["output_dir"], '--listen', args_pytest["listen"], '--port', str(args_pytest["port"]), - '--extra-model-paths-config', 'tests/inference/extra_model_paths.yaml', + '--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml', '--cpu', ] use_lru, lru_size = request.param diff --git a/tests/execution/test_progress_isolation.py b/tests/execution/test_progress_isolation.py new file mode 100644 index 000000000..93dc0d41b --- /dev/null +++ b/tests/execution/test_progress_isolation.py @@ -0,0 +1,233 @@ +"""Test that progress updates are properly isolated between WebSocket clients.""" + +import json +import pytest +import time +import threading +import uuid +import websocket +from typing import List, Dict, Any +from comfy_execution.graph_utils import GraphBuilder +from tests.execution.test_execution import ComfyClient + + +class ProgressTracker: + """Tracks progress messages received by a WebSocket client.""" + + def __init__(self, client_id: str): + self.client_id = client_id + self.progress_messages: List[Dict[str, Any]] = [] + self.lock = threading.Lock() + + def add_message(self, message: Dict[str, Any]): + """Thread-safe addition of progress messages.""" + with self.lock: + self.progress_messages.append(message) + + def get_messages_for_prompt(self, prompt_id: str) -> List[Dict[str, Any]]: + """Get all progress messages for a specific prompt_id.""" + with self.lock: + return [ + msg for msg in self.progress_messages + if msg.get('data', {}).get('prompt_id') == prompt_id + ] + + def has_cross_contamination(self, own_prompt_id: str) -> bool: + """Check if this client received progress for other prompts.""" + with self.lock: + for msg in self.progress_messages: + msg_prompt_id = msg.get('data', {}).get('prompt_id') + if msg_prompt_id and msg_prompt_id != own_prompt_id: + return True + return False + + +class IsolatedClient(ComfyClient): + """Extended ComfyClient that tracks all WebSocket messages.""" + + def __init__(self): + super().__init__() + self.progress_tracker = None + self.all_messages: List[Dict[str, Any]] = [] + + def connect(self, listen='127.0.0.1', port=8188, client_id=None): + """Connect with a specific client_id and set up message tracking.""" + if client_id is None: + client_id = str(uuid.uuid4()) + super().connect(listen, port, client_id) + self.progress_tracker = ProgressTracker(client_id) + + def listen_for_messages(self, duration: float = 5.0): + """Listen for WebSocket messages for a specified duration.""" + end_time = time.time() + duration + self.ws.settimeout(0.5) # Non-blocking with timeout + + while time.time() < end_time: + try: + out = self.ws.recv() + if isinstance(out, str): + message = json.loads(out) + self.all_messages.append(message) + + # Track progress_state messages + if message.get('type') == 'progress_state': + self.progress_tracker.add_message(message) + except websocket.WebSocketTimeoutException: + continue + except Exception: + # Log error silently in test context + break + + +@pytest.mark.execution +class TestProgressIsolation: + """Test suite for verifying progress update isolation between clients.""" + + @pytest.fixture(scope="class", autouse=True) + def _server(self, args_pytest): + """Start the ComfyUI server for testing.""" + import subprocess + pargs = [ + 'python', 'main.py', + '--output-directory', args_pytest["output_dir"], + '--listen', args_pytest["listen"], + '--port', str(args_pytest["port"]), + '--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml', + '--cpu', + ] + p = subprocess.Popen(pargs) + yield + p.kill() + + def start_client_with_retry(self, listen: str, port: int, client_id: str = None): + """Start client with connection retries.""" + client = IsolatedClient() + # Connect to server (with retries) + n_tries = 5 + for i in range(n_tries): + time.sleep(4) + try: + client.connect(listen, port, client_id) + return client + except ConnectionRefusedError as e: + print(e) # noqa: T201 + print(f"({i+1}/{n_tries}) Retrying...") # noqa: T201 + raise ConnectionRefusedError(f"Failed to connect after {n_tries} attempts") + + def test_progress_isolation_between_clients(self, args_pytest): + """Test that progress updates are isolated between different clients.""" + listen = args_pytest["listen"] + port = args_pytest["port"] + + # Create two separate clients with unique IDs + client_a_id = "client_a_" + str(uuid.uuid4()) + client_b_id = "client_b_" + str(uuid.uuid4()) + + try: + # Connect both clients with retries + client_a = self.start_client_with_retry(listen, port, client_a_id) + client_b = self.start_client_with_retry(listen, port, client_b_id) + + # Create simple workflows for both clients + graph_a = GraphBuilder(prefix="client_a") + image_a = graph_a.node("StubImage", content="BLACK", height=256, width=256, batch_size=1) + graph_a.node("PreviewImage", images=image_a.out(0)) + + graph_b = GraphBuilder(prefix="client_b") + image_b = graph_b.node("StubImage", content="WHITE", height=256, width=256, batch_size=1) + graph_b.node("PreviewImage", images=image_b.out(0)) + + # Submit workflows from both clients + prompt_a = graph_a.finalize() + prompt_b = graph_b.finalize() + + response_a = client_a.queue_prompt(prompt_a) + prompt_id_a = response_a['prompt_id'] + + response_b = client_b.queue_prompt(prompt_b) + prompt_id_b = response_b['prompt_id'] + + # Start threads to listen for messages on both clients + def listen_client_a(): + client_a.listen_for_messages(duration=10.0) + + def listen_client_b(): + client_b.listen_for_messages(duration=10.0) + + thread_a = threading.Thread(target=listen_client_a) + thread_b = threading.Thread(target=listen_client_b) + + thread_a.start() + thread_b.start() + + # Wait for threads to complete + thread_a.join() + thread_b.join() + + # Verify isolation + # Client A should only receive progress for prompt_id_a + assert not client_a.progress_tracker.has_cross_contamination(prompt_id_a), \ + f"Client A received progress updates for other clients' workflows. " \ + f"Expected only {prompt_id_a}, but got messages for multiple prompts." + + # Client B should only receive progress for prompt_id_b + assert not client_b.progress_tracker.has_cross_contamination(prompt_id_b), \ + f"Client B received progress updates for other clients' workflows. " \ + f"Expected only {prompt_id_b}, but got messages for multiple prompts." + + # Verify each client received their own progress updates + client_a_messages = client_a.progress_tracker.get_messages_for_prompt(prompt_id_a) + client_b_messages = client_b.progress_tracker.get_messages_for_prompt(prompt_id_b) + + assert len(client_a_messages) > 0, \ + "Client A did not receive any progress updates for its own workflow" + assert len(client_b_messages) > 0, \ + "Client B did not receive any progress updates for its own workflow" + + # Ensure no cross-contamination + client_a_other = client_a.progress_tracker.get_messages_for_prompt(prompt_id_b) + client_b_other = client_b.progress_tracker.get_messages_for_prompt(prompt_id_a) + + assert len(client_a_other) == 0, \ + f"Client A incorrectly received {len(client_a_other)} progress updates for Client B's workflow" + assert len(client_b_other) == 0, \ + f"Client B incorrectly received {len(client_b_other)} progress updates for Client A's workflow" + + finally: + # Clean up connections + if hasattr(client_a, 'ws'): + client_a.ws.close() + if hasattr(client_b, 'ws'): + client_b.ws.close() + + def test_progress_with_missing_client_id(self, args_pytest): + """Test that progress updates handle missing client_id gracefully.""" + listen = args_pytest["listen"] + port = args_pytest["port"] + + try: + # Connect client with retries + client = self.start_client_with_retry(listen, port) + + # Create a simple workflow + graph = GraphBuilder(prefix="test_missing_id") + image = graph.node("StubImage", content="BLACK", height=128, width=128, batch_size=1) + graph.node("PreviewImage", images=image.out(0)) + + # Submit workflow + prompt = graph.finalize() + response = client.queue_prompt(prompt) + prompt_id = response['prompt_id'] + + # Listen for messages + client.listen_for_messages(duration=5.0) + + # Should still receive progress updates for own workflow + messages = client.progress_tracker.get_messages_for_prompt(prompt_id) + assert len(messages) > 0, \ + "Client did not receive progress updates even though it initiated the workflow" + + finally: + if hasattr(client, 'ws'): + client.ws.close() + diff --git a/tests/inference/testing_nodes/testing-pack/__init__.py b/tests/execution/testing_nodes/testing-pack/__init__.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/__init__.py rename to tests/execution/testing_nodes/testing-pack/__init__.py diff --git a/tests/inference/testing_nodes/testing-pack/api_test_nodes.py b/tests/execution/testing_nodes/testing-pack/api_test_nodes.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/api_test_nodes.py rename to tests/execution/testing_nodes/testing-pack/api_test_nodes.py diff --git a/tests/inference/testing_nodes/testing-pack/async_test_nodes.py b/tests/execution/testing_nodes/testing-pack/async_test_nodes.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/async_test_nodes.py rename to tests/execution/testing_nodes/testing-pack/async_test_nodes.py diff --git a/tests/inference/testing_nodes/testing-pack/conditions.py b/tests/execution/testing_nodes/testing-pack/conditions.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/conditions.py rename to tests/execution/testing_nodes/testing-pack/conditions.py diff --git a/tests/inference/testing_nodes/testing-pack/flow_control.py b/tests/execution/testing_nodes/testing-pack/flow_control.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/flow_control.py rename to tests/execution/testing_nodes/testing-pack/flow_control.py diff --git a/tests/inference/testing_nodes/testing-pack/specific_tests.py b/tests/execution/testing_nodes/testing-pack/specific_tests.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/specific_tests.py rename to tests/execution/testing_nodes/testing-pack/specific_tests.py diff --git a/tests/inference/testing_nodes/testing-pack/stubs.py b/tests/execution/testing_nodes/testing-pack/stubs.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/stubs.py rename to tests/execution/testing_nodes/testing-pack/stubs.py diff --git a/tests/inference/testing_nodes/testing-pack/tools.py b/tests/execution/testing_nodes/testing-pack/tools.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/tools.py rename to tests/execution/testing_nodes/testing-pack/tools.py diff --git a/tests/inference/testing_nodes/testing-pack/util.py b/tests/execution/testing_nodes/testing-pack/util.py similarity index 100% rename from tests/inference/testing_nodes/testing-pack/util.py rename to tests/execution/testing_nodes/testing-pack/util.py