diff --git a/tests-api/conftest.py b/tests-api/conftest.py index fa64bb53..e124279b 100644 --- a/tests-api/conftest.py +++ b/tests-api/conftest.py @@ -21,13 +21,13 @@ DEFAULT_SERVER_URL = "http://127.0.0.1:8188" def api_spec_path() -> str: """ Get the path to the OpenAPI specification file - + Returns: Path to the OpenAPI specification file """ return os.path.abspath(os.path.join( - os.path.dirname(__file__), - "..", + os.path.dirname(__file__), + "..", "openapi.yaml" )) @@ -36,10 +36,10 @@ def api_spec_path() -> str: def api_spec(api_spec_path: str) -> Dict[str, Any]: """ Load the OpenAPI specification - + Args: api_spec_path: Path to the spec file - + Returns: Parsed OpenAPI specification """ @@ -51,7 +51,7 @@ def api_spec(api_spec_path: str) -> Dict[str, Any]: def base_url() -> str: """ Get the base URL for the API server - + Returns: Base URL string """ @@ -63,10 +63,10 @@ def base_url() -> str: def server_available(base_url: str) -> bool: """ Check if the server is available - + Args: base_url: Base URL for the API - + Returns: True if the server is available, False otherwise """ @@ -82,24 +82,24 @@ def server_available(base_url: str) -> bool: def api_client(base_url: str) -> Generator[Optional[requests.Session], None, None]: """ Create a requests session for API testing - + Args: base_url: Base URL for the API - + Yields: Requests session configured for the API """ session = requests.Session() - + # Helper function to construct URLs def get_url(path: str) -> str: return urljoin(base_url, path) - + # Add url helper to the session session.get_url = get_url # type: ignore - + yield session - + # Cleanup session.close() @@ -108,24 +108,24 @@ def api_client(base_url: str) -> Generator[Optional[requests.Session], None, Non def api_get_json(api_client: requests.Session): """ Helper fixture for making GET requests and parsing JSON responses - + Args: api_client: API client session - + Returns: Function that makes GET requests and returns JSON """ def _get_json(path: str, **kwargs): url = api_client.get_url(path) # type: ignore response = api_client.get(url, **kwargs) - + if response.status_code == 200: try: return response.json() except ValueError: return None return None - + return _get_json @@ -133,9 +133,9 @@ def api_get_json(api_client: requests.Session): def require_server(server_available): """ Skip tests if server is not available - + Args: server_available: Whether the server is available """ if not server_available: - pytest.skip("Server is not available") \ No newline at end of file + pytest.skip("Server is not available") diff --git a/tests-api/test_api_by_tag.py b/tests-api/test_api_by_tag.py index cc22fc38..54c97826 100644 --- a/tests-api/test_api_by_tag.py +++ b/tests-api/test_api_by_tag.py @@ -5,7 +5,7 @@ import pytest import logging import sys import os -from typing import Dict, Any, List, Set +from typing import Dict, Any, Set # Use a direct import with the full path current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -17,12 +17,12 @@ def get_all_endpoints(spec): Extract all endpoints from an OpenAPI spec """ endpoints = [] - + for path, path_item in spec['paths'].items(): for method, operation in path_item.items(): if method.lower() not in ['get', 'post', 'put', 'delete', 'patch']: continue - + endpoints.append({ 'path': path, 'method': method.lower(), @@ -30,7 +30,7 @@ def get_all_endpoints(spec): 'operation_id': operation.get('operationId', ''), 'summary': operation.get('summary', '') }) - + return endpoints def get_all_tags(spec): @@ -38,12 +38,12 @@ def get_all_tags(spec): Get all tags used in the API spec """ tags = set() - + for path_item in spec['paths'].values(): for operation in path_item.values(): if isinstance(operation, dict) and 'tags' in operation: tags.update(operation['tags']) - + return tags def extract_endpoints_by_tag(spec, tag): @@ -51,12 +51,12 @@ def extract_endpoints_by_tag(spec, tag): Extract all endpoints with a specific tag """ endpoints = [] - + for path, path_item in spec['paths'].items(): for method, operation in path_item.items(): if method.lower() not in ['get', 'post', 'put', 'delete', 'patch']: continue - + if tag in operation.get('tags', []): endpoints.append({ 'path': path, @@ -64,7 +64,7 @@ def extract_endpoints_by_tag(spec, tag): 'operation_id': operation.get('operationId', ''), 'summary': operation.get('summary', '') }) - + return endpoints # Setup logging @@ -76,10 +76,10 @@ logger = logging.getLogger(__name__) def api_tags(api_spec: Dict[str, Any]) -> Set[str]: """ Get all tags from the API spec - + Args: api_spec: Loaded OpenAPI spec - + Returns: Set of tag names """ @@ -89,12 +89,12 @@ def api_tags(api_spec: Dict[str, Any]) -> Set[str]: def test_api_has_tags(api_tags: Set[str]): """ Test that the API has defined tags - + Args: api_tags: Set of tags """ assert len(api_tags) > 0, "API spec should have at least one tag" - + # Log the tags logger.info(f"API spec has the following tags: {sorted(api_tags)}") @@ -109,7 +109,7 @@ def test_api_has_tags(api_tags: Set[str]): def test_core_tags_exist(api_tags: Set[str], tag: str): """ Test that core tags exist in the API spec - + Args: api_tags: Set of tags tag: Tag to check @@ -120,18 +120,18 @@ def test_core_tags_exist(api_tags: Set[str], tag: str): def test_workflow_tag_has_endpoints(api_spec: Dict[str, Any]): """ Test that the 'workflow' tag has appropriate endpoints - + Args: api_spec: Loaded OpenAPI spec """ endpoints = extract_endpoints_by_tag(api_spec, "workflow") - + assert len(endpoints) > 0, "No endpoints found with 'workflow' tag" - + # Check for key workflow endpoints endpoint_paths = [e["path"] for e in endpoints] assert "/prompt" in endpoint_paths, "Workflow tag should include /prompt endpoint" - + # Log the endpoints logger.info(f"Found {len(endpoints)} endpoints with 'workflow' tag:") for e in endpoints: @@ -141,19 +141,19 @@ def test_workflow_tag_has_endpoints(api_spec: Dict[str, Any]): def test_image_tag_has_endpoints(api_spec: Dict[str, Any]): """ Test that the 'image' tag has appropriate endpoints - + Args: api_spec: Loaded OpenAPI spec """ endpoints = extract_endpoints_by_tag(api_spec, "image") - + assert len(endpoints) > 0, "No endpoints found with 'image' tag" - + # Check for key image endpoints endpoint_paths = [e["path"] for e in endpoints] assert "/upload/image" in endpoint_paths, "Image tag should include /upload/image endpoint" assert "/view" in endpoint_paths, "Image tag should include /view endpoint" - + # Log the endpoints logger.info(f"Found {len(endpoints)} endpoints with 'image' tag:") for e in endpoints: @@ -163,18 +163,18 @@ def test_image_tag_has_endpoints(api_spec: Dict[str, Any]): def test_model_tag_has_endpoints(api_spec: Dict[str, Any]): """ Test that the 'model' tag has appropriate endpoints - + Args: api_spec: Loaded OpenAPI spec """ endpoints = extract_endpoints_by_tag(api_spec, "model") - + assert len(endpoints) > 0, "No endpoints found with 'model' tag" - + # Check for key model endpoints endpoint_paths = [e["path"] for e in endpoints] assert "/models" in endpoint_paths, "Model tag should include /models endpoint" - + # Log the endpoints logger.info(f"Found {len(endpoints)} endpoints with 'model' tag:") for e in endpoints: @@ -184,18 +184,18 @@ def test_model_tag_has_endpoints(api_spec: Dict[str, Any]): def test_node_tag_has_endpoints(api_spec: Dict[str, Any]): """ Test that the 'node' tag has appropriate endpoints - + Args: api_spec: Loaded OpenAPI spec """ endpoints = extract_endpoints_by_tag(api_spec, "node") - + assert len(endpoints) > 0, "No endpoints found with 'node' tag" - + # Check for key node endpoints endpoint_paths = [e["path"] for e in endpoints] assert "/object_info" in endpoint_paths, "Node tag should include /object_info endpoint" - + # Log the endpoints logger.info(f"Found {len(endpoints)} endpoints with 'node' tag:") for e in endpoints: @@ -205,18 +205,18 @@ def test_node_tag_has_endpoints(api_spec: Dict[str, Any]): def test_system_tag_has_endpoints(api_spec: Dict[str, Any]): """ Test that the 'system' tag has appropriate endpoints - + Args: api_spec: Loaded OpenAPI spec """ endpoints = extract_endpoints_by_tag(api_spec, "system") - + assert len(endpoints) > 0, "No endpoints found with 'system' tag" - + # Check for key system endpoints endpoint_paths = [e["path"] for e in endpoints] assert "/system_stats" in endpoint_paths, "System tag should include /system_stats endpoint" - + # Log the endpoints logger.info(f"Found {len(endpoints)} endpoints with 'system' tag:") for e in endpoints: @@ -226,18 +226,18 @@ def test_system_tag_has_endpoints(api_spec: Dict[str, Any]): def test_internal_tag_has_endpoints(api_spec: Dict[str, Any]): """ Test that the 'internal' tag has appropriate endpoints - + Args: api_spec: Loaded OpenAPI spec """ endpoints = extract_endpoints_by_tag(api_spec, "internal") - + assert len(endpoints) > 0, "No endpoints found with 'internal' tag" - + # Check for key internal endpoints endpoint_paths = [e["path"] for e in endpoints] assert "/internal/logs" in endpoint_paths, "Internal tag should include /internal/logs endpoint" - + # Log the endpoints logger.info(f"Found {len(endpoints)} endpoints with 'internal' tag:") for e in endpoints: @@ -247,22 +247,22 @@ def test_internal_tag_has_endpoints(api_spec: Dict[str, Any]): def test_operation_ids_match_tag(api_spec: Dict[str, Any]): """ Test that operation IDs follow a consistent pattern with their tag - + Args: api_spec: Loaded OpenAPI spec """ failures = [] - + for path, path_item in api_spec['paths'].items(): for method, operation in path_item.items(): if method in ['get', 'post', 'put', 'delete', 'patch']: if 'operationId' in operation and 'tags' in operation and operation['tags']: op_id = operation['operationId'] primary_tag = operation['tags'][0].lower() - + # Check if operationId starts with primary tag prefix # This is a common convention, but might need adjusting - if not (op_id.startswith(primary_tag) or + if not (op_id.startswith(primary_tag) or any(op_id.lower().startswith(f"{tag.lower()}") for tag in operation['tags'])): failures.append({ 'path': path, @@ -270,10 +270,10 @@ def test_operation_ids_match_tag(api_spec: Dict[str, Any]): 'operationId': op_id, 'primary_tag': primary_tag }) - + # Log failures for diagnosis but don't fail the test # as this is a style/convention check if failures: logger.warning(f"Found {len(failures)} operationIds that don't align with their tags:") for f in failures: - logger.warning(f" {f['method'].upper()} {f['path']} - operationId: {f['operationId']}, primary tag: {f['primary_tag']}") \ No newline at end of file + logger.warning(f" {f['method'].upper()} {f['path']} - operationId: {f['operationId']}, primary tag: {f['primary_tag']}") diff --git a/tests-api/test_endpoint_existence.py b/tests-api/test_endpoint_existence.py index 3b5111ab..6cf13c3f 100644 --- a/tests-api/test_endpoint_existence.py +++ b/tests-api/test_endpoint_existence.py @@ -17,20 +17,20 @@ sys.path.insert(0, current_dir) def get_all_endpoints(spec): """ Extract all endpoints from an OpenAPI spec - + Args: spec: Parsed OpenAPI specification - + Returns: List of dicts with path, method, and tags for each endpoint """ endpoints = [] - + for path, path_item in spec['paths'].items(): for method, operation in path_item.items(): if method.lower() not in ['get', 'post', 'put', 'delete', 'patch']: continue - + endpoints.append({ 'path': path, 'method': method.lower(), @@ -38,7 +38,7 @@ def get_all_endpoints(spec): 'operation_id': operation.get('operationId', ''), 'summary': operation.get('summary', '') }) - + return endpoints # Setup logging @@ -50,10 +50,10 @@ logger = logging.getLogger(__name__) def all_endpoints(api_spec: Dict[str, Any]) -> List[Dict[str, Any]]: """ Get all endpoints from the API spec - + Args: api_spec: Loaded OpenAPI spec - + Returns: List of endpoint information """ @@ -63,13 +63,13 @@ def all_endpoints(api_spec: Dict[str, Any]) -> List[Dict[str, Any]]: def test_endpoints_exist(all_endpoints: List[Dict[str, Any]]): """ Test that endpoints are defined in the spec - + Args: all_endpoints: List of endpoint information """ # Simple check that we have endpoints defined assert len(all_endpoints) > 0, "No endpoints defined in the OpenAPI spec" - + # Log the endpoints for informational purposes logger.info(f"Found {len(all_endpoints)} endpoints in the OpenAPI spec") for endpoint in all_endpoints: @@ -87,23 +87,23 @@ def test_endpoints_exist(all_endpoints: List[Dict[str, Any]]): def test_basic_get_endpoints(require_server, api_client, endpoint_path: str): """ Test that basic GET endpoints exist and respond - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture endpoint_path: Path to test """ url = api_client.get_url(endpoint_path) # type: ignore - + try: response = api_client.get(url) - + # We're just checking that the endpoint exists and returns some kind of response # Not necessarily a 200 status code assert response.status_code not in [404, 405], f"Endpoint {endpoint_path} does not exist" - + logger.info(f"Endpoint {endpoint_path} exists with status code {response.status_code}") - + except requests.RequestException as e: pytest.fail(f"Request to {endpoint_path} failed: {str(e)}") @@ -111,24 +111,24 @@ def test_basic_get_endpoints(require_server, api_client, endpoint_path: str): def test_websocket_endpoint_exists(require_server, base_url: str): """ Test that the WebSocket endpoint exists - + Args: require_server: Fixture that skips if server is not available base_url: Base server URL """ ws_url = urljoin(base_url, "/ws") - + # For WebSocket, we can't use a normal GET request # Instead, we make a HEAD request to check if the endpoint exists try: response = requests.head(ws_url) - + # WebSocket endpoints often return a 400 Bad Request for HEAD requests # but a 404 would indicate the endpoint doesn't exist assert response.status_code != 404, "WebSocket endpoint /ws does not exist" - + logger.info(f"WebSocket endpoint exists with status code {response.status_code}") - + except requests.RequestException as e: pytest.fail(f"Request to WebSocket endpoint failed: {str(e)}") @@ -136,35 +136,35 @@ def test_websocket_endpoint_exists(require_server, base_url: str): def test_api_models_folder_endpoint(require_server, api_client): """ Test that the /models/{folder} endpoint exists and responds - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture """ # First get available model types models_url = api_client.get_url("/models") # type: ignore - + try: models_response = api_client.get(models_url) assert models_response.status_code == 200, "Failed to get model types" - + model_types = models_response.json() - + # Skip if no model types available if not model_types: pytest.skip("No model types available to test") - + # Test with the first model type model_type = model_types[0] models_folder_url = api_client.get_url(f"/models/{model_type}") # type: ignore - + folder_response = api_client.get(models_folder_url) - + # We're just checking that the endpoint exists assert folder_response.status_code != 404, f"Endpoint /models/{model_type} does not exist" - + logger.info(f"Endpoint /models/{model_type} exists with status code {folder_response.status_code}") - + except requests.RequestException as e: pytest.fail(f"Request failed: {str(e)}") except (ValueError, KeyError, IndexError) as e: @@ -174,35 +174,35 @@ def test_api_models_folder_endpoint(require_server, api_client): def test_api_object_info_node_endpoint(require_server, api_client): """ Test that the /object_info/{node_class} endpoint exists and responds - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture """ # First get available node classes objects_url = api_client.get_url("/object_info") # type: ignore - + try: objects_response = api_client.get(objects_url) assert objects_response.status_code == 200, "Failed to get object info" - + node_classes = objects_response.json() - + # Skip if no node classes available if not node_classes: pytest.skip("No node classes available to test") - + # Test with the first node class node_class = next(iter(node_classes.keys())) node_url = api_client.get_url(f"/object_info/{node_class}") # type: ignore - + node_response = api_client.get(node_url) - + # We're just checking that the endpoint exists assert node_response.status_code != 404, f"Endpoint /object_info/{node_class} does not exist" - + logger.info(f"Endpoint /object_info/{node_class} exists with status code {node_response.status_code}") - + except requests.RequestException as e: pytest.fail(f"Request failed: {str(e)}") except (ValueError, KeyError, StopIteration) as e: @@ -212,7 +212,7 @@ def test_api_object_info_node_endpoint(require_server, api_client): def test_internal_endpoints_exist(require_server, api_client): """ Test that internal endpoints exist - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture @@ -223,18 +223,18 @@ def test_internal_endpoints_exist(require_server, api_client): "/internal/folder_paths", "/internal/files/output" ] - + for endpoint in internal_endpoints: url = api_client.get_url(endpoint) # type: ignore - + try: response = api_client.get(url) - + # We're just checking that the endpoint exists assert response.status_code != 404, f"Endpoint {endpoint} does not exist" - + logger.info(f"Endpoint {endpoint} exists with status code {response.status_code}") - + except requests.RequestException as e: logger.warning(f"Request to {endpoint} failed: {str(e)}") - # Don't fail the test as internal endpoints might be restricted \ No newline at end of file + # Don't fail the test as internal endpoints might be restricted diff --git a/tests-api/test_schema_validation.py b/tests-api/test_schema_validation.py index 87a7f27c..4273f81d 100644 --- a/tests-api/test_schema_validation.py +++ b/tests-api/test_schema_validation.py @@ -7,7 +7,7 @@ import logging import sys import os import json -from typing import Dict, Any, List +from typing import Dict, Any # Use a direct import with the full path current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -15,40 +15,40 @@ sys.path.insert(0, current_dir) # Define validation functions inline to avoid import issues def get_endpoint_schema( - spec, - path, - method, + spec, + path, + method, status_code = '200' ): """ Extract response schema for a specific endpoint from OpenAPI spec """ method = method.lower() - + # Handle path not found if path not in spec['paths']: return None - + # Handle method not found if method not in spec['paths'][path]: return None - + # Handle status code not found responses = spec['paths'][path][method].get('responses', {}) if status_code not in responses: return None - + # Handle no content defined if 'content' not in responses[status_code]: return None - + # Get schema from first content type content_types = responses[status_code]['content'] first_content_type = next(iter(content_types)) - + if 'schema' not in content_types[first_content_type]: return None - + return content_types[first_content_type]['schema'] def resolve_schema_refs(schema, spec): @@ -57,9 +57,9 @@ def resolve_schema_refs(schema, spec): """ if not isinstance(schema, dict): return schema - + result = {} - + for key, value in schema.items(): if key == '$ref' and isinstance(value, str) and value.startswith('#/'): # Handle reference @@ -67,7 +67,7 @@ def resolve_schema_refs(schema, spec): ref_value = spec for path_part in ref_path: ref_value = ref_value.get(path_part, {}) - + # Recursively resolve any refs in the referenced schema ref_value = resolve_schema_refs(ref_value, spec) result.update(ref_value) @@ -83,7 +83,7 @@ def resolve_schema_refs(schema, spec): else: # Pass through other values result[key] = value - + return result def validate_response( @@ -97,16 +97,16 @@ def validate_response( Validate a response against the OpenAPI schema """ schema = get_endpoint_schema(spec, path, method, status_code) - + if schema is None: return { 'valid': False, 'errors': [f"No schema found for {method.upper()} {path} with status {status_code}"] } - + # Resolve any $ref in the schema resolved_schema = resolve_schema_refs(schema, spec) - + try: import jsonschema jsonschema.validate(instance=response_data, schema=resolved_schema) @@ -116,14 +116,14 @@ def validate_response( path = ".".join(str(p) for p in e.path) if e.path else "root" instance = e.instance if not isinstance(e.instance, dict) else "..." schema_path = ".".join(str(p) for p in e.schema_path) if e.schema_path else "unknown" - + detailed_error = ( f"Validation error at path: {path}\n" f"Schema path: {schema_path}\n" f"Error message: {e.message}\n" f"Failed instance: {instance}\n" ) - + return {'valid': False, 'errors': [detailed_error]} # Setup logging @@ -139,15 +139,15 @@ logger = logging.getLogger(__name__) ("/embeddings", "get") ]) def test_response_schema_validation( - require_server, - api_client, + require_server, + api_client, api_spec: Dict[str, Any], endpoint_path: str, method: str ): """ Test that API responses match the defined schema - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture @@ -156,47 +156,47 @@ def test_response_schema_validation( method: HTTP method to test """ url = api_client.get_url(endpoint_path) # type: ignore - + # Skip if no schema defined schema = get_endpoint_schema(api_spec, endpoint_path, method) if not schema: pytest.skip(f"No schema defined for {method.upper()} {endpoint_path}") - + try: if method.lower() == "get": response = api_client.get(url) else: pytest.skip(f"Method {method} not implemented for automated testing") return - + # Skip if response is not 200 if response.status_code != 200: pytest.skip(f"Endpoint {endpoint_path} returned status {response.status_code}") return - + # Skip if response is not JSON try: response_data = response.json() except ValueError: pytest.skip(f"Endpoint {endpoint_path} did not return valid JSON") return - + # Validate the response validation_result = validate_response( - response_data, - api_spec, - endpoint_path, + response_data, + api_spec, + endpoint_path, method ) - + if validation_result['valid']: logger.info(f"Response from {method.upper()} {endpoint_path} matches schema") else: for error in validation_result['errors']: logger.error(f"Validation error for {method.upper()} {endpoint_path}: {error}") - + assert validation_result['valid'], f"Response from {method.upper()} {endpoint_path} does not match schema" - + except requests.RequestException as e: pytest.fail(f"Request to {endpoint_path} failed: {str(e)}") @@ -204,67 +204,67 @@ def test_response_schema_validation( def test_system_stats_response(require_server, api_client, api_spec: Dict[str, Any]): """ Test the system_stats endpoint response in detail - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture api_spec: Loaded OpenAPI spec """ url = api_client.get_url("/system_stats") # type: ignore - + try: response = api_client.get(url) - + assert response.status_code == 200, "Failed to get system stats" - + # Parse response stats = response.json() - + # Validate high-level structure assert 'system' in stats, "Response missing 'system' field" assert 'devices' in stats, "Response missing 'devices' field" - + # Validate system fields system = stats['system'] assert 'os' in system, "System missing 'os' field" assert 'ram_total' in system, "System missing 'ram_total' field" assert 'ram_free' in system, "System missing 'ram_free' field" assert 'comfyui_version' in system, "System missing 'comfyui_version' field" - + # Validate devices fields devices = stats['devices'] assert isinstance(devices, list), "Devices should be a list" - + if devices: device = devices[0] assert 'name' in device, "Device missing 'name' field" assert 'type' in device, "Device missing 'type' field" assert 'vram_total' in device, "Device missing 'vram_total' field" assert 'vram_free' in device, "Device missing 'vram_free' field" - + # Perform schema validation validation_result = validate_response( - stats, - api_spec, - "/system_stats", + stats, + api_spec, + "/system_stats", "get" ) - + # Print detailed error if validation fails if not validation_result['valid']: for error in validation_result['errors']: logger.error(f"Validation error for /system_stats: {error}") - + # Print schema details for debugging schema = get_endpoint_schema(api_spec, "/system_stats", "get") if schema: logger.error(f"Schema structure:\n{json.dumps(schema, indent=2)}") - + # Print sample of the response logger.error(f"Response:\n{json.dumps(stats, indent=2)}") - + assert validation_result['valid'], "System stats response does not match schema" - + except requests.RequestException as e: pytest.fail(f"Request to /system_stats failed: {str(e)}") @@ -272,53 +272,53 @@ def test_system_stats_response(require_server, api_client, api_spec: Dict[str, A def test_models_listing_response(require_server, api_client, api_spec: Dict[str, Any]): """ Test the models endpoint response - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture api_spec: Loaded OpenAPI spec """ url = api_client.get_url("/models") # type: ignore - + try: response = api_client.get(url) - + assert response.status_code == 200, "Failed to get models" - + # Parse response models = response.json() - + # Validate it's a list assert isinstance(models, list), "Models response should be a list" - + # Each item should be a string for model in models: assert isinstance(model, str), "Each model type should be a string" - + # Perform schema validation validation_result = validate_response( - models, - api_spec, - "/models", + models, + api_spec, + "/models", "get" ) - + # Print detailed error if validation fails if not validation_result['valid']: for error in validation_result['errors']: logger.error(f"Validation error for /models: {error}") - + # Print schema details for debugging schema = get_endpoint_schema(api_spec, "/models", "get") if schema: logger.error(f"Schema structure:\n{json.dumps(schema, indent=2)}") - + # Print response sample_models = models[:5] if isinstance(models, list) else models logger.error(f"Models response:\n{json.dumps(sample_models, indent=2)}") - + assert validation_result['valid'], "Models response does not match schema" - + except requests.RequestException as e: pytest.fail(f"Request to /models failed: {str(e)}") @@ -326,60 +326,60 @@ def test_models_listing_response(require_server, api_client, api_spec: Dict[str, def test_object_info_response(require_server, api_client, api_spec: Dict[str, Any]): """ Test the object_info endpoint response - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture api_spec: Loaded OpenAPI spec """ url = api_client.get_url("/object_info") # type: ignore - + try: response = api_client.get(url) - + assert response.status_code == 200, "Failed to get object info" - + # Parse response objects = response.json() - + # Validate it's an object assert isinstance(objects, dict), "Object info response should be an object" - + # Check if we have any objects if objects: # Get the first object first_obj_name = next(iter(objects.keys())) first_obj = objects[first_obj_name] - + # Validate first object has required fields assert 'input' in first_obj, "Object missing 'input' field" assert 'output' in first_obj, "Object missing 'output' field" assert 'name' in first_obj, "Object missing 'name' field" - + # Perform schema validation validation_result = validate_response( - objects, - api_spec, - "/object_info", + objects, + api_spec, + "/object_info", "get" ) - + # Print detailed error if validation fails if not validation_result['valid']: for error in validation_result['errors']: logger.error(f"Validation error for /object_info: {error}") - + # Print schema details for debugging schema = get_endpoint_schema(api_spec, "/object_info", "get") if schema: logger.error(f"Schema structure:\n{json.dumps(schema, indent=2)}") - + # Also print a small sample of the response sample = dict(list(objects.items())[:1]) if objects else {} logger.error(f"Sample response:\n{json.dumps(sample, indent=2)}") - + assert validation_result['valid'], "Object info response does not match schema" - + except requests.RequestException as e: pytest.fail(f"Request to /object_info failed: {str(e)}") except (KeyError, StopIteration) as e: @@ -389,52 +389,52 @@ def test_object_info_response(require_server, api_client, api_spec: Dict[str, An def test_queue_response(require_server, api_client, api_spec: Dict[str, Any]): """ Test the queue endpoint response - + Args: require_server: Fixture that skips if server is not available api_client: API client fixture api_spec: Loaded OpenAPI spec """ url = api_client.get_url("/queue") # type: ignore - + try: response = api_client.get(url) - + assert response.status_code == 200, "Failed to get queue" - + # Parse response queue = response.json() - + # Validate structure assert 'queue_running' in queue, "Queue missing 'queue_running' field" assert 'queue_pending' in queue, "Queue missing 'queue_pending' field" - + # Each should be a list assert isinstance(queue['queue_running'], list), "queue_running should be a list" assert isinstance(queue['queue_pending'], list), "queue_pending should be a list" - + # Perform schema validation validation_result = validate_response( - queue, - api_spec, - "/queue", + queue, + api_spec, + "/queue", "get" ) - + # Print detailed error if validation fails if not validation_result['valid']: for error in validation_result['errors']: logger.error(f"Validation error for /queue: {error}") - + # Print schema details for debugging schema = get_endpoint_schema(api_spec, "/queue", "get") if schema: logger.error(f"Schema structure:\n{json.dumps(schema, indent=2)}") - + # Print response logger.error(f"Queue response:\n{json.dumps(queue, indent=2)}") - + assert validation_result['valid'], "Queue response does not match schema" - + except requests.RequestException as e: - pytest.fail(f"Request to /queue failed: {str(e)}") \ No newline at end of file + pytest.fail(f"Request to /queue failed: {str(e)}") diff --git a/tests-api/test_spec_validation.py b/tests-api/test_spec_validation.py index 9fc9db6f..57c493a2 100644 --- a/tests-api/test_spec_validation.py +++ b/tests-api/test_spec_validation.py @@ -10,7 +10,7 @@ from typing import Dict, Any def test_openapi_spec_is_valid(api_spec: Dict[str, Any]): """ Test that the OpenAPI specification is valid - + Args: api_spec: Loaded OpenAPI spec """ @@ -23,7 +23,7 @@ def test_openapi_spec_is_valid(api_spec: Dict[str, Any]): def test_spec_has_info(api_spec: Dict[str, Any]): """ Test that the OpenAPI spec has the required info section - + Args: api_spec: Loaded OpenAPI spec """ @@ -35,7 +35,7 @@ def test_spec_has_info(api_spec: Dict[str, Any]): def test_spec_has_paths(api_spec: Dict[str, Any]): """ Test that the OpenAPI spec has paths defined - + Args: api_spec: Loaded OpenAPI spec """ @@ -46,7 +46,7 @@ def test_spec_has_paths(api_spec: Dict[str, Any]): def test_spec_has_components(api_spec: Dict[str, Any]): """ Test that the OpenAPI spec has components defined - + Args: api_spec: Loaded OpenAPI spec """ @@ -57,7 +57,7 @@ def test_spec_has_components(api_spec: Dict[str, Any]): def test_workflow_endpoints_exist(api_spec: Dict[str, Any]): """ Test that core workflow endpoints are defined - + Args: api_spec: Loaded OpenAPI spec """ @@ -69,7 +69,7 @@ def test_workflow_endpoints_exist(api_spec: Dict[str, Any]): def test_image_endpoints_exist(api_spec: Dict[str, Any]): """ Test that core image endpoints are defined - + Args: api_spec: Loaded OpenAPI spec """ @@ -80,7 +80,7 @@ def test_image_endpoints_exist(api_spec: Dict[str, Any]): def test_model_endpoints_exist(api_spec: Dict[str, Any]): """ Test that core model endpoints are defined - + Args: api_spec: Loaded OpenAPI spec """ @@ -91,18 +91,18 @@ def test_model_endpoints_exist(api_spec: Dict[str, Any]): def test_operation_ids_are_unique(api_spec: Dict[str, Any]): """ Test that all operationIds are unique - + Args: api_spec: Loaded OpenAPI spec """ operation_ids = [] - + for path, path_item in api_spec['paths'].items(): for method, operation in path_item.items(): if method in ['get', 'post', 'put', 'delete', 'patch']: if 'operationId' in operation: operation_ids.append(operation['operationId']) - + # Check for duplicates duplicates = set([op_id for op_id in operation_ids if operation_ids.count(op_id) > 1]) assert len(duplicates) == 0, f"Found duplicate operationIds: {duplicates}" @@ -111,34 +111,34 @@ def test_operation_ids_are_unique(api_spec: Dict[str, Any]): def test_all_endpoints_have_operation_ids(api_spec: Dict[str, Any]): """ Test that all endpoints have operationIds - + Args: api_spec: Loaded OpenAPI spec """ missing = [] - + for path, path_item in api_spec['paths'].items(): for method, operation in path_item.items(): if method in ['get', 'post', 'put', 'delete', 'patch']: if 'operationId' not in operation: missing.append(f"{method.upper()} {path}") - + assert len(missing) == 0, f"Found endpoints without operationIds: {missing}" def test_all_endpoints_have_tags(api_spec: Dict[str, Any]): """ Test that all endpoints have tags - + Args: api_spec: Loaded OpenAPI spec """ missing = [] - + for path, path_item in api_spec['paths'].items(): for method, operation in path_item.items(): if method in ['get', 'post', 'put', 'delete', 'patch']: if 'tags' not in operation or not operation['tags']: missing.append(f"{method.upper()} {path}") - - assert len(missing) == 0, f"Found endpoints without tags: {missing}" \ No newline at end of file + + assert len(missing) == 0, f"Found endpoints without tags: {missing}" diff --git a/tests-api/utils/schema_utils.py b/tests-api/utils/schema_utils.py index c354f11b..862d29d6 100644 --- a/tests-api/utils/schema_utils.py +++ b/tests-api/utils/schema_utils.py @@ -1,111 +1,109 @@ """ Utilities for working with OpenAPI schemas """ -import json -import os from typing import Any, Dict, List, Optional, Set, Tuple def extract_required_parameters( - spec: Dict[str, Any], - path: str, + spec: Dict[str, Any], + path: str, method: str ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """ Extract required parameters for a specific endpoint - + Args: spec: Parsed OpenAPI specification path: API path (e.g., '/prompt') method: HTTP method (e.g., 'get', 'post') - + Returns: Tuple of (path_params, query_params) containing required parameters """ method = method.lower() path_params = [] query_params = [] - + # Handle path not found if path not in spec['paths']: return path_params, query_params - + # Handle method not found if method not in spec['paths'][path]: return path_params, query_params - + # Get parameters params = spec['paths'][path][method].get('parameters', []) - + for param in params: if param.get('required', False): if param.get('in') == 'path': path_params.append(param) elif param.get('in') == 'query': query_params.append(param) - + return path_params, query_params def get_request_body_schema( - spec: Dict[str, Any], - path: str, + spec: Dict[str, Any], + path: str, method: str ) -> Optional[Dict[str, Any]]: """ Get request body schema for a specific endpoint - + Args: spec: Parsed OpenAPI specification path: API path (e.g., '/prompt') method: HTTP method (e.g., 'get', 'post') - + Returns: Request body schema or None if not found """ method = method.lower() - + # Handle path not found if path not in spec['paths']: return None - + # Handle method not found if method not in spec['paths'][path]: return None - + # Handle no request body request_body = spec['paths'][path][method].get('requestBody', {}) if not request_body or 'content' not in request_body: return None - + # Get schema from first content type content_types = request_body['content'] first_content_type = next(iter(content_types)) - + if 'schema' not in content_types[first_content_type]: return None - + return content_types[first_content_type]['schema'] def extract_endpoints_by_tag(spec: Dict[str, Any], tag: str) -> List[Dict[str, Any]]: """ Extract all endpoints with a specific tag - + Args: spec: Parsed OpenAPI specification tag: Tag to filter by - + Returns: List of endpoint details """ endpoints = [] - + for path, path_item in spec['paths'].items(): for method, operation in path_item.items(): if method.lower() not in ['get', 'post', 'put', 'delete', 'patch']: continue - + if tag in operation.get('tags', []): endpoints.append({ 'path': path, @@ -113,47 +111,47 @@ def extract_endpoints_by_tag(spec: Dict[str, Any], tag: str) -> List[Dict[str, A 'operation_id': operation.get('operationId', ''), 'summary': operation.get('summary', '') }) - + return endpoints def get_all_tags(spec: Dict[str, Any]) -> Set[str]: """ Get all tags used in the API spec - + Args: spec: Parsed OpenAPI specification - + Returns: Set of tag names """ tags = set() - + for path_item in spec['paths'].values(): for operation in path_item.values(): if isinstance(operation, dict) and 'tags' in operation: tags.update(operation['tags']) - + return tags def get_schema_examples(spec: Dict[str, Any]) -> Dict[str, Any]: """ Extract all examples from component schemas - + Args: spec: Parsed OpenAPI specification - + Returns: Dict mapping schema names to examples """ examples = {} - + if 'components' not in spec or 'schemas' not in spec['components']: return examples - + for name, schema in spec['components']['schemas'].items(): if 'example' in schema: examples[name] = schema['example'] - - return examples \ No newline at end of file + + return examples diff --git a/tests-api/utils/validation.py b/tests-api/utils/validation.py index 9e07663a..2ed9a5a4 100644 --- a/tests-api/utils/validation.py +++ b/tests-api/utils/validation.py @@ -1,8 +1,6 @@ """ Utilities for API response validation against OpenAPI spec """ -import json -import os import yaml import jsonschema from typing import Any, Dict, List, Optional, Union @@ -11,10 +9,10 @@ from typing import Any, Dict, List, Optional, Union def load_openapi_spec(spec_path: str) -> Dict[str, Any]: """ Load the OpenAPI specification from a YAML file - + Args: spec_path: Path to the OpenAPI specification file - + Returns: Dict containing the parsed OpenAPI spec """ @@ -23,68 +21,68 @@ def load_openapi_spec(spec_path: str) -> Dict[str, Any]: def get_endpoint_schema( - spec: Dict[str, Any], - path: str, - method: str, + spec: Dict[str, Any], + path: str, + method: str, status_code: str = '200' ) -> Optional[Dict[str, Any]]: """ Extract response schema for a specific endpoint from OpenAPI spec - + Args: spec: Parsed OpenAPI specification path: API path (e.g., '/prompt') method: HTTP method (e.g., 'get', 'post') status_code: HTTP status code to get schema for - + Returns: Schema dict or None if not found """ method = method.lower() - + # Handle path not found if path not in spec['paths']: return None - + # Handle method not found if method not in spec['paths'][path]: return None - + # Handle status code not found responses = spec['paths'][path][method].get('responses', {}) if status_code not in responses: return None - + # Handle no content defined if 'content' not in responses[status_code]: return None - + # Get schema from first content type content_types = responses[status_code]['content'] first_content_type = next(iter(content_types)) - + if 'schema' not in content_types[first_content_type]: return None - + return content_types[first_content_type]['schema'] def resolve_schema_refs(schema: Dict[str, Any], spec: Dict[str, Any]) -> Dict[str, Any]: """ Resolve $ref references in a schema - + Args: schema: Schema that may contain references spec: Full OpenAPI spec with component definitions - + Returns: Schema with references resolved """ if not isinstance(schema, dict): return schema - + result = {} - + for key, value in schema.items(): if key == '$ref' and isinstance(value, str) and value.startswith('#/'): # Handle reference @@ -92,7 +90,7 @@ def resolve_schema_refs(schema: Dict[str, Any], spec: Dict[str, Any]) -> Dict[st ref_value = spec for path_part in ref_path: ref_value = ref_value.get(path_part, {}) - + # Recursively resolve any refs in the referenced schema ref_value = resolve_schema_refs(ref_value, spec) result.update(ref_value) @@ -108,7 +106,7 @@ def resolve_schema_refs(schema: Dict[str, Any], spec: Dict[str, Any]) -> Dict[st else: # Pass through other values result[key] = value - + return result @@ -121,30 +119,30 @@ def validate_response( ) -> Dict[str, Any]: """ Validate a response against the OpenAPI schema - + Args: response_data: Response data to validate spec: Parsed OpenAPI specification path: API path (e.g., '/prompt') method: HTTP method (e.g., 'get', 'post') status_code: HTTP status code to validate against - + Returns: Dict with validation result containing: - valid: bool indicating if validation passed - errors: List of validation errors if any """ schema = get_endpoint_schema(spec, path, method, status_code) - + if schema is None: return { 'valid': False, 'errors': [f"No schema found for {method.upper()} {path} with status {status_code}"] } - + # Resolve any $ref in the schema resolved_schema = resolve_schema_refs(schema, spec) - + try: jsonschema.validate(instance=response_data, schema=resolved_schema) return {'valid': True, 'errors': []} @@ -155,20 +153,20 @@ def validate_response( def get_all_endpoints(spec: Dict[str, Any]) -> List[Dict[str, Any]]: """ Extract all endpoints from an OpenAPI spec - + Args: spec: Parsed OpenAPI specification - + Returns: List of dicts with path, method, and tags for each endpoint """ endpoints = [] - + for path, path_item in spec['paths'].items(): for method, operation in path_item.items(): if method.lower() not in ['get', 'post', 'put', 'delete', 'patch']: continue - + endpoints.append({ 'path': path, 'method': method.lower(), @@ -176,5 +174,5 @@ def get_all_endpoints(spec: Dict[str, Any]) -> List[Dict[str, Any]]: 'operation_id': operation.get('operationId', ''), 'summary': operation.get('summary', '') }) - - return endpoints \ No newline at end of file + + return endpoints