Skip to main content
Glama
test_error_handling.pyโ€ข38.3 kB
#!/usr/bin/env python3 """ Error Handling and Edge Case Integration Tests for unified server in multi-registry mode Tests various error conditions and edge cases: - Network connectivity issues - Authentication failures - Invalid configurations - Registry downtime scenarios - VIEWONLY mode enforcement - Resource limits and timeouts - Malformed schema definitions - Cross-registry operation failures """ import asyncio import json import os import sys # Add parent directory to Python path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pytest from fastmcp import Client from mcp import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client # Invalid schemas for testing error handling INVALID_SCHEMAS = { "malformed_json": "this is not valid json", "missing_fields": { "type": "record" # Missing name and fields }, "invalid_field_type": { "type": "record", "name": "TestRecord", "fields": [{"name": "id", "type": "invalid_type"}], }, "circular_reference": { "type": "record", "name": "TestRecord", "fields": [{"name": "self", "type": "TestRecord"}], }, } # Valid test schema for baseline tests VALID_SCHEMA = { "type": "record", "name": "TestUser", "fields": [{"name": "id", "type": "long"}, {"name": "name", "type": "string"}], } async def test_invalid_registry_configuration(): """Test behavior with invalid registry configurations.""" print("\nโŒ Testing Invalid Registry Configuration") print("-" * 50) # Clean up any existing task manager state to prevent deadlocks try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Task manager state cleaned up") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager: {e}") # Test with non-existent registry URL env = os.environ.copy() env.pop("SCHEMA_REGISTRY_URL", None) env["SCHEMA_REGISTRY_NAME_1"] = "invalid_registry" env["SCHEMA_REGISTRY_URL_1"] = "http://nonexistent.registry:9999" env["VIEWONLY_1"] = "false" server_params = StdioServerParameters(command="python", args=["kafka_schema_registry_unified_mcp.py"], env=env) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Test 1: Connection test should fail gracefully print("Test 1: Connection test to invalid registry") result = await session.call_tool("test_registry_connection", {"registry_name": "invalid_registry"}) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Expected failure: {response.get('status', 'unknown')}") assert response.get("status") == "error", "Expected connection test to fail" # Test 2: List subjects should handle connection failure print("\nTest 2: List subjects with invalid registry") result = await session.call_tool("list_subjects", {"registry": "invalid_registry"}) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Error handled: {response.get('error', 'No error field')}") # Test 3: Registry info should show connection failure print("\nTest 3: Registry info with connection failure") result = await session.call_tool("get_registry_info", {"registry_name": "invalid_registry"}) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Connection status: {response.get('connection_status', 'unknown')}") print("\n๐ŸŽ‰ Invalid Registry Configuration Tests Complete!") except Exception as e: print(f"โŒ Invalid registry configuration test failed: {e}") async def test_viewonly_mode_enforcement(): """Test VIEWONLY mode enforcement across operations.""" print("\n๐Ÿ”’ Testing VIEWONLY Mode Enforcement") print("-" * 40) # Clean up any existing task manager state to prevent deadlocks try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Task manager state cleaned up") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager: {e}") # Setup with viewonly registry env = os.environ.copy() env.pop("SCHEMA_REGISTRY_URL", None) # Remove any existing numbered registries for i in range(1, 10): for var in [f"SCHEMA_REGISTRY_NAME_{i}", f"SCHEMA_REGISTRY_URL_{i}", f"VIEWONLY_{i}"]: env.pop(var, None) env["SCHEMA_REGISTRY_NAME_1"] = "viewonly_test" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:8081" env["VIEWONLY_1"] = "true" # Set to viewonly server_params = StdioServerParameters(command="python", args=["kafka_schema_registry_unified_mcp.py"], env=env) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Test operations that should be blocked viewonly_operations = [ ( "register_schema", { "subject": "test-subject", "schema_definition": {"type": "string"}, "registry": "viewonly_test", }, ), ("create_context", {"context": "test-context", "registry": "viewonly_test"}), ("delete_context", {"context": "default", "registry": "viewonly_test"}), ("delete_subject", {"subject": "test-subject", "registry": "viewonly_test"}), ("update_global_config", {"compatibility": "BACKWARD", "registry": "viewonly_test"}), ("update_mode", {"mode": "READWRITE", "registry": "viewonly_test"}), ] for operation_name, params in viewonly_operations: try: response = await session.call_tool(operation_name, params) result = json.loads(response.content[0].text) if response.content else {} if "viewonly" in result.get("error", "").lower() or result.get("viewonly_mode"): print(f" โœ… {operation_name} properly blocked by viewonly mode") else: print(f" โš ๏ธ {operation_name} not blocked (may not be implemented)") except Exception as e: print(f" โš ๏ธ {operation_name} error: {e}") # Test operations that should work read_operations = [ ("list_subjects", {"registry": "viewonly_test"}), ("list_contexts", {"registry": "viewonly_test"}), ("get_global_config", {"registry": "viewonly_test"}), ("get_mode", {"registry": "viewonly_test"}), ] for operation_name, params in read_operations: try: response = await session.call_tool(operation_name, params) print(f" โœ… {operation_name} works in viewonly mode") except Exception as e: print(f" โš ๏ธ {operation_name} failed: {e}") print("\n๐ŸŽ‰ VIEWONLY Mode Enforcement Tests Complete!") except Exception as e: print(f"โŒ VIEWONLY mode enforcement test failed: {e}") raise async def test_invalid_parameters(): """Test handling of invalid parameters and edge cases.""" print("\n๐Ÿ”ง Testing Invalid Parameters and Edge Cases") print("-" * 50) # Clean up any existing task manager state to prevent deadlocks try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Task manager state cleaned up") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager: {e}") env = os.environ.copy() env.pop("SCHEMA_REGISTRY_URL", None) env["SCHEMA_REGISTRY_NAME_1"] = "param_test" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:38081" env["VIEWONLY_1"] = "false" server_params = StdioServerParameters(command="python", args=["kafka_schema_registry_unified_mcp.py"], env=env) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Test 1: Non-existent registry parameter print("Test 1: Non-existent registry") result = await session.call_tool("list_subjects", {"registry": "nonexistent_registry"}) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Non-existent registry error: {response.get('error', 'No error')}") # Test 2: Invalid schema definitions print("\nTest 2: Invalid schema definitions") for schema_name, invalid_schema in INVALID_SCHEMAS.items(): print(f" Testing {schema_name}:") try: result = await session.call_tool( "register_schema", { "subject": f"test-{schema_name}", "schema_definition": invalid_schema, "registry": "param_test", }, ) response = json.loads(result.content[0].text) if result.content else {} if response.get("error"): print(f" โœ… Properly rejected: {response['error'][:50]}...") else: print(" โš ๏ธ Unexpectedly accepted invalid schema") except Exception as e: print(f" โœ… Exception caught: {str(e)[:50]}...") # Test 3: Invalid compatibility levels print("\nTest 3: Invalid compatibility levels") invalid_compatibility_levels = ["INVALID", "UNKNOWN", "", "123"] for level in invalid_compatibility_levels: result = await session.call_tool( "update_global_config", {"compatibility": level, "registry": "param_test"}, ) response = json.loads(result.content[0].text) if result.content else {} print(f" Testing '{level}': {response.get('error', 'Accepted')[:50]}") # Test 4: Invalid modes print("\nTest 4: Invalid modes") invalid_modes = ["INVALID", "UNKNOWN", "", "123"] for mode in invalid_modes: result = await session.call_tool("update_mode", {"mode": mode, "registry": "param_test"}) response = json.loads(result.content[0].text) if result.content else {} print(f" Testing '{mode}': {response.get('error', 'Accepted')[:50]}") # Test 5: Empty and special character subjects print("\nTest 5: Edge case subject names") edge_case_subjects = [ "", " ", "subject with spaces", "subject-with-special-chars!@#", ] for subject in edge_case_subjects: result = await session.call_tool("get_schema", {"subject": subject, "registry": "param_test"}) response = json.loads(result.content[0].text) if result.content else {} print(f" Subject '{subject}': {response.get('error', 'No error')[:50]}") print("\n๐ŸŽ‰ Invalid Parameters and Edge Cases Tests Complete!") except Exception as e: print(f"โŒ Invalid parameters test failed: {e}") async def test_cross_registry_error_scenarios(): """Test error scenarios in cross-registry operations.""" print("\n๐Ÿ”„ Testing Cross-Registry Error Scenarios") print("-" * 50) # Clean up any existing task manager state to prevent deadlocks try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Task manager state cleaned up") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager: {e}") env = os.environ.copy() env.pop("SCHEMA_REGISTRY_URL", None) # Setup one valid and one invalid registry env["SCHEMA_REGISTRY_NAME_1"] = "valid_registry" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:38081" env["VIEWONLY_1"] = "false" env["SCHEMA_REGISTRY_NAME_2"] = "invalid_registry" env["SCHEMA_REGISTRY_URL_2"] = "http://invalid.host:9999" env["VIEWONLY_2"] = "false" env["SCHEMA_REGISTRY_NAME_3"] = "viewonly_registry" env["SCHEMA_REGISTRY_URL_3"] = "http://localhost:38081" env["VIEWONLY_3"] = "true" server_params = StdioServerParameters(command="python", args=["kafka_schema_registry_unified_mcp.py"], env=env) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Test 1: Compare valid with invalid registry print("Test 1: Compare valid with invalid registry") result = await session.call_tool( "compare_registries", { "source_registry": "valid_registry", "target_registry": "invalid_registry", }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Comparison result: {response.get('error', 'Success')}") # Test 2: Migration from valid to invalid registry print("\nTest 2: Migration from valid to invalid registry") result = await session.call_tool( "migrate_schema", { "subject": "test-subject", "source_registry": "valid_registry", "target_registry": "invalid_registry", "dry_run": True, }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Migration result: {response.get('error', 'Success')}") # Test 3: Migration to viewonly registry print("\nTest 3: Migration to viewonly registry") result = await session.call_tool( "migrate_schema", { "subject": "error-test-subject", "source_registry": "valid_registry", "target_registry": "viewonly_registry", "dry_run": False, }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Viewonly migration result: {response.get('error', 'Success')}") # Test 4: Find missing schemas with connection issues print("\nTest 4: Find missing schemas with connection issues") result = await session.call_tool( "find_missing_schemas", { "source_registry": "valid_registry", "target_registry": "invalid_registry", }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Missing schemas result: {response.get('error', 'Success')}") # Test 5: Sync operations with mixed registry states print("\nTest 5: Sync operations with mixed registry states") result = await session.call_tool( "sync_schema", { "subject": "test-subject", "source_registry": "valid_registry", "target_registry": "viewonly_registry", "dry_run": True, }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Sync result: {response.get('error', 'Success')}") print("\n๐ŸŽ‰ Cross-Registry Error Scenarios Tests Complete!") except Exception as e: print(f"โŒ Cross-registry error scenarios test failed: {e}") async def test_resource_limits_and_timeouts(): """Test behavior under resource constraints and timeouts.""" print("\nโฑ๏ธ Testing Resource Limits and Timeouts") print("-" * 50) # Clean up any existing task manager state to prevent deadlocks try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Task manager state cleaned up") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager: {e}") env = os.environ.copy() env.pop("SCHEMA_REGISTRY_URL", None) env["SCHEMA_REGISTRY_NAME_1"] = "timeout_test" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:38081" env["VIEWONLY_1"] = "false" server_params = StdioServerParameters(command="python", args=["kafka_schema_registry_unified_mcp.py"], env=env) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Test 1: Very large schema definition print("Test 1: Large schema definition") large_schema = { "type": "record", "name": "LargeRecord", "fields": [{"name": f"field_{i}", "type": "string"} for i in range(100)], # 100 fields } result = await session.call_tool( "register_schema", { "subject": "large-schema-test", "schema_definition": large_schema, "registry": "timeout_test", }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Large schema result: {response.get('error', 'Success')}") # Test 2: Rapid sequential operations print("\nTest 2: Rapid sequential operations") rapid_test_results = [] for i in range(10): result = await session.call_tool("list_subjects", {"registry": "timeout_test"}) response = json.loads(result.content[0].text) if result.content else {} rapid_test_results.append("success" if not response.get("error") else "error") success_count = rapid_test_results.count("success") print(f" โœ… Rapid operations: {success_count}/10 successful") # Test 3: Multiple registries test (stress test) print("\nTest 3: Test all registries simultaneously") result = await session.call_tool("test_all_registries", {}) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Multi-registry test: {response.get('connected', 0)} connected") print("\n๐ŸŽ‰ Resource Limits and Timeouts Tests Complete!") except Exception as e: print(f"โŒ Resource limits and timeouts test failed: {e}") async def test_authentication_errors(): """Test authentication and authorization error handling.""" print("\n๐Ÿ” Testing Authentication Error Handling") print("-" * 50) # Clean up any existing task manager state to prevent deadlocks try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Task manager state cleaned up") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager: {e}") # Test with invalid credentials env = os.environ.copy() env.pop("SCHEMA_REGISTRY_URL", None) env["SCHEMA_REGISTRY_NAME_1"] = "auth_test" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:38081" env["SCHEMA_REGISTRY_USER_1"] = "invalid_user" env["SCHEMA_REGISTRY_PASSWORD_1"] = "invalid_password" env["VIEWONLY_1"] = "false" server_params = StdioServerParameters(command="python", args=["kafka_schema_registry_unified_mcp.py"], env=env) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Test 1: Connection test with invalid auth print("Test 1: Connection test with invalid credentials") result = await session.call_tool("test_registry_connection", {"registry_name": "auth_test"}) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Auth test result: {response.get('status', 'unknown')}") # Test 2: List subjects with auth issues print("\nTest 2: List subjects with invalid credentials") result = await session.call_tool("list_subjects", {"registry": "auth_test"}) response = json.loads(result.content[0].text) if result.content else {} if response.get("error") or isinstance(response, list) and len(response) == 0: print(" โœ… Auth error properly handled") else: print(f" โš ๏ธ Unexpected response: {response}") # Test 3: Schema registration with auth issues print("\nTest 3: Schema registration with invalid credentials") result = await session.call_tool( "register_schema", { "subject": "auth-test-subject", "schema_definition": VALID_SCHEMA, "registry": "auth_test", }, ) response = json.loads(result.content[0].text) if result.content else {} print(f" โœ… Auth registration result: {response.get('error', 'Success')}") print("\n๐ŸŽ‰ Authentication Error Handling Tests Complete!") except Exception as e: print(f"โŒ Authentication error handling test failed: {e}") async def test_error_handling(): """Test error handling and recovery mechanisms.""" # Get the path to the parent directory where the server script is located parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) server_script = os.path.join(parent_dir, "kafka_schema_registry_unified_mcp.py") print("โš ๏ธ Testing Error Handling and Recovery...") try: client = Client( server_script, env={ "SCHEMA_REGISTRY_URL": "http://localhost:38081", "MULTI_REGISTRY_CONFIG": json.dumps( { "dev": {"url": "http://localhost:38081"}, "invalid": {"url": "http://localhost:99999"}, # Invalid port for testing } ), }, ) async with client: print("โœ… Connected to MCP server!") # Test 1: Invalid schema registration print("\nโŒ Test 1: Invalid schema registration...") try: result = await client.call_tool( "register_schema", { "subject": "test-invalid-schema", "schema_definition": {"invalid": "schema"}, # Invalid Avro schema "schema_type": "AVRO", }, ) if result and "error" in result.lower(): print(" โœ… Error properly handled for invalid schema") else: print(f" โš ๏ธ Unexpected result: {result}") except Exception as e: print(f" โœ… Exception properly raised: {e}") # Test 2: Non-existent subject operations print("\nโŒ Test 2: Non-existent subject operations...") try: result = await client.call_tool( "get_schema", {"subject": "non-existent-subject-12345", "version": "latest"}, ) if result and ("error" in result.lower() or "not found" in result.lower()): print(" โœ… Error properly handled for non-existent subject") else: print(f" โš ๏ธ Unexpected result: {result}") except Exception as e: print(f" โœ… Exception properly raised: {e}") # Test 3: Invalid registry operations print("\nโŒ Test 3: Invalid registry operations...") try: result = await client.call_tool("list_subjects", {"registry": "invalid"}) # This should fail if result and "error" in result.lower(): print(" โœ… Error properly handled for invalid registry") else: print(f" โš ๏ธ Unexpected result: {result}") except Exception as e: print(f" โœ… Exception properly raised: {e}") # Test 4: Invalid tool parameters print("\nโŒ Test 4: Invalid tool parameters...") try: result = await client.call_tool( "register_schema", { "subject": "", # Empty subject "schema_definition": {"type": "string"}, "schema_type": "AVRO", }, ) if result and "error" in result.lower(): print(" โœ… Error properly handled for empty subject") else: print(f" โš ๏ธ Unexpected result: {result}") except Exception as e: print(f" โœ… Exception properly raised: {e}") # Test 5: Tool call with missing required parameters print("\nโŒ Test 5: Missing required parameters...") try: result = await client.call_tool( "register_schema", { "subject": "test-subject" # Missing schema_definition and schema_type }, ) if result and "error" in result.lower(): print(" โœ… Error properly handled for missing parameters") else: print(f" โš ๏ธ Unexpected result: {result}") except Exception as e: print(f" โœ… Exception properly raised: {e}") # Test 6: Recovery after errors print("\n๐Ÿ”„ Test 6: Recovery after errors...") try: # First, cause an error await client.call_tool("get_schema", {"subject": "non-existent", "version": "latest"}) # Then, perform a valid operation result = await client.call_tool("list_subjects", {}) print(" โœ… Server recovered and handles valid operations after errors") except Exception as e: print(f" โœ… Server continues to function: {e}") # Test 7: Invalid JSON in schema definitions print("\nโŒ Test 7: Invalid JSON handling...") try: result = await client.call_tool( "register_schema", { "subject": "test-invalid-json", "schema_definition": "not-valid-json", # String instead of dict "schema_type": "AVRO", }, ) if result and "error" in result.lower(): print(" โœ… Error properly handled for invalid JSON") else: print(f" โš ๏ธ Unexpected result: {result}") except Exception as e: print(f" โœ… Exception properly raised: {e}") print("\n๐ŸŽ‰ Error handling testing completed!") print("โœ… Server demonstrates robust error handling and recovery") except Exception as e: print(f"โŒ Error during error handling test: {e}") raise @pytest.mark.asyncio async def test_connection_error_handling(): """Test MCP connection error handling""" print("๐Ÿ”Œ Testing Connection Error Handling") print("=" * 50) # Setup environment with invalid registry URL os.environ["SCHEMA_REGISTRY_URL"] = "http://localhost:99999" # Invalid port os.environ["VIEWONLY"] = "false" # Get server script path script_dir = os.path.dirname(os.path.abspath(__file__)) server_script = os.path.join(os.path.dirname(script_dir), "kafka_schema_registry_unified_mcp.py") # Create client client = Client(server_script) try: async with client: print("โœ… MCP connection established") # Test operations that should fail gracefully due to connection issues error_prone_operations = [ ("list_subjects", {}), ("get_global_config", {}), ("list_contexts", {}), ( "register_schema", { "subject": "test-subject", "schema_definition": {"type": "string"}, "schema_type": "AVRO", }, ), ] connection_errors = 0 graceful_failures = 0 for operation, args in error_prone_operations: print(f"\n๐Ÿงช Testing: {operation}") try: result = await client.call_tool(operation, args) print(f"โš ๏ธ {operation}: Unexpected success - {result}") except Exception as e: error_text = str(e).lower() if any( keyword in error_text for keyword in [ "connection", "refused", "timeout", "unreachable", ] ): print(f"โœ… {operation}: Graceful connection error - {e}") graceful_failures += 1 else: print(f"โŒ {operation}: Non-connection error - {e}") connection_errors += 1 print("\n๐Ÿ“Š Connection Error Summary:") print(f" Graceful failures: {graceful_failures}") print(f" Unexpected errors: {connection_errors}") return graceful_failures > 0 and connection_errors == 0 except Exception as e: print(f"โŒ Critical error during connection error test: {e}") return False @pytest.mark.asyncio async def test_invalid_input_handling(): """Test handling of invalid inputs""" print("๐Ÿšซ Testing Invalid Input Handling") print("=" * 50) # Setup environment with valid registry URL os.environ["SCHEMA_REGISTRY_URL"] = "http://localhost:38081" os.environ["VIEWONLY"] = "false" # Get server script path script_dir = os.path.dirname(os.path.abspath(__file__)) server_script = os.path.join(os.path.dirname(script_dir), "kafka_schema_registry_unified_mcp.py") # Create client client = Client(server_script) try: async with client: print("โœ… MCP connection established") # Test operations with invalid arguments invalid_operations = [ ( "register_schema", {"subject": "", "schema_definition": {}}, ), # Empty subject ( "register_schema", {"subject": "test", "schema_definition": "invalid"}, ), # Invalid schema ("get_schema_by_id", {"schema_id": -1}), # Invalid ID ("export_subject", {"subject": ""}), # Empty subject ( "check_compatibility", {"subject": "test", "schema_definition": None}, ), # Null schema ] validation_errors = 0 unexpected_successes = 0 for operation, args in invalid_operations: print(f"\n๐Ÿงช Testing: {operation} with invalid args") try: result = await client.call_tool(operation, args) print(f"โš ๏ธ {operation}: Unexpected success with invalid input") unexpected_successes += 1 except Exception as e: error_text = str(e).lower() if any(keyword in error_text for keyword in ["invalid", "validation", "error", "bad request"]): print(f"โœ… {operation}: Proper validation error - {e}") validation_errors += 1 else: print(f"โš ๏ธ {operation}: Other error - {e}") print("\n๐Ÿ“Š Input Validation Summary:") print(f" Proper validation errors: {validation_errors}") print(f" Unexpected successes: {unexpected_successes}") return validation_errors > 0 and unexpected_successes == 0 except Exception as e: print(f"โŒ Critical error during input validation test: {e}") return False @pytest.mark.asyncio async def test_error_recovery(): """Test error recovery mechanisms""" print("๐Ÿ”„ Testing Error Recovery") print("=" * 50) # Setup environment os.environ["SCHEMA_REGISTRY_URL"] = "http://localhost:38081" os.environ["VIEWONLY"] = "false" # Get server script path script_dir = os.path.dirname(os.path.abspath(__file__)) server_script = os.path.join(os.path.dirname(script_dir), "kafka_schema_registry_unified_mcp.py") # Create client client = Client(server_script) try: async with client: print("โœ… MCP connection established") # Test recovery after error print("\n๐Ÿงช Testing recovery after error...") # First, try an operation that might fail try: await client.call_tool( "register_schema", { "subject": "test-recovery", "schema_definition": "invalid-schema", # Invalid schema }, ) print("โš ๏ธ Invalid schema registration unexpectedly succeeded") except Exception as e: print(f"โœ… Expected error occurred: {e}") # Then, try a valid operation to test recovery try: result = await client.call_tool("list_subjects", {}) print("โœ… Recovery successful: Can still perform operations after error") return True except Exception as e: print(f"โŒ Recovery failed: {e}") return False except Exception as e: print(f"โŒ Critical error during recovery test: {e}") return False async def main(): """Run all error handling and edge case tests.""" print("๐Ÿงช Starting Error Handling and Edge Case Integration Tests") print("=" * 70) # Clean up any existing task manager state before starting try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("๐Ÿงน Initial task manager cleanup completed") except Exception as e: print(f"โš ๏ธ Warning: Could not cleanup task manager initially: {e}") try: await test_invalid_registry_configuration() await test_viewonly_mode_enforcement() await test_invalid_parameters() await test_cross_registry_error_scenarios() await test_resource_limits_and_timeouts() await test_authentication_errors() await test_error_handling() await test_connection_error_handling() await test_invalid_input_handling() await test_error_recovery() print("\n" + "=" * 70) print("๐ŸŽ‰ All Error Handling and Edge Case Tests Complete!") print("\nโœ… **Error Scenarios Tested:**") print("โ€ข Invalid registry configurations") print("โ€ข VIEWONLY mode enforcement") print("โ€ข Invalid parameters and edge cases") print("โ€ข Cross-registry operation failures") print("โ€ข Resource limits and timeouts") print("โ€ข Authentication and authorization errors") except KeyboardInterrupt: print("\nโš ๏ธ Tests interrupted by user") except Exception as e: print(f"\nโŒ Error handling tests failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server