Skip to main content
Glama
test_mcp_compliance.pyโ€ข8.25 kB
#!/usr/bin/env python3 """ MCP-Protocol-Version Header Validation Compliance Tests Tests for MCP 2025-06-18 specification compliance in the Kafka Schema Registry MCP Server. Tests cover: - MCP-Protocol-Version header validation middleware - Exempt path functionality - Error responses for missing/invalid headers - Compliance status verification Usage: python test_mcp_compliance.py """ import os import sys import unittest # Add project root to Python path PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, PROJECT_ROOT) def run_basic_compliance_tests(): """Run basic MCP compliance tests.""" print("๐Ÿงช Running MCP 2025-06-18 Compliance Tests") print("=" * 60) try: # Test 1: Check if middleware constants are defined try: from kafka_schema_registry_unified_mcp import ( MCP_PROTOCOL_VERSION, SUPPORTED_MCP_VERSIONS, is_exempt_path, ) print(f"โœ… MCP Protocol Version: {MCP_PROTOCOL_VERSION}") print(f"โœ… Supported Versions: {SUPPORTED_MCP_VERSIONS}") test1_passed = True except ImportError as e: print(f"โŒ Failed to import MCP constants: {e}") test1_passed = False # Test 2: Check exempt path functionality try: if test1_passed: exempt_paths = ["/health", "/metrics", "/ready", "/.well-known/test"] non_exempt_paths = ["/mcp", "/api/test", "/some/path"] all_exempt_correct = all(is_exempt_path(path) for path in exempt_paths) all_non_exempt_correct = all(not is_exempt_path(path) for path in non_exempt_paths) if all_exempt_correct and all_non_exempt_correct: print("โœ… Exempt path detection working correctly") test2_passed = True else: print("โŒ Exempt path detection has issues") test2_passed = False else: test2_passed = False except Exception as e: print(f"โŒ Error testing exempt paths: {e}") test2_passed = False # Test 3: Check FastMCP app has middleware try: from kafka_schema_registry_unified_mcp import mcp if hasattr(mcp, "app") and hasattr(mcp.app, "middleware_stack"): print("โœ… FastMCP app with middleware detected") test3_passed = True else: print("โš ๏ธ FastMCP app structure not as expected") test3_passed = True # Don't fail for this except Exception as e: print(f"โš ๏ธ Could not verify middleware: {e}") test3_passed = True # Don't fail for this # Test 4: Verify compliance status tool try: from kafka_schema_registry_unified_mcp import get_mcp_compliance_status compliance_status = get_mcp_compliance_status() required_fields = [ "protocol_version", "header_validation_enabled", "compliance_status", ] all_fields_present = all(field in compliance_status for field in required_fields) if all_fields_present and compliance_status.get("compliance_status") == "COMPLIANT": print("โœ… MCP compliance status tool working correctly") test4_passed = True else: print("โŒ MCP compliance status tool has issues") test4_passed = False except Exception as e: print(f"โŒ Error testing compliance status: {e}") test4_passed = False # Summary tests_passed = [test1_passed, test2_passed, test3_passed, test4_passed] total_tests = len(tests_passed) passed_tests = sum(tests_passed) print("\n๐Ÿ“Š MCP Compliance Test Results:") print(f" Total Tests: {total_tests}") print(f" Passed: {passed_tests}") print(f" Failed: {total_tests - passed_tests}") print(f" Success Rate: {(passed_tests/total_tests)*100:.1f}%") if passed_tests == total_tests: print("๐ŸŽ‰ All MCP compliance tests passed!") return True else: print("โŒ Some MCP compliance tests failed") return False except Exception as e: print(f"โŒ Error in MCP compliance tests: {e}") return False class MCPComplianceTestSuite(unittest.TestCase): """Unit test suite for MCP compliance that can be run by unittest.""" def test_mcp_constants_available(self): """Test that MCP constants are properly defined.""" try: from kafka_schema_registry_unified_mcp import ( MCP_PROTOCOL_VERSION, SUPPORTED_MCP_VERSIONS, ) self.assertEqual(MCP_PROTOCOL_VERSION, "2025-06-18") self.assertIn("2025-06-18", SUPPORTED_MCP_VERSIONS) except ImportError: self.fail("MCP constants not available") def test_exempt_path_function(self): """Test that exempt path detection works correctly.""" try: from kafka_schema_registry_unified_mcp import is_exempt_path # Test exempt paths self.assertTrue(is_exempt_path("/health")) self.assertTrue(is_exempt_path("/metrics")) self.assertTrue(is_exempt_path("/ready")) self.assertTrue(is_exempt_path("/.well-known/test")) # Test non-exempt paths self.assertFalse(is_exempt_path("/mcp")) self.assertFalse(is_exempt_path("/api/test")) self.assertFalse(is_exempt_path("/some/path")) except ImportError: self.fail("Exempt path function not available") def test_compliance_status_tool(self): """Test that compliance status tool returns correct information.""" try: from kafka_schema_registry_unified_mcp import get_mcp_compliance_status status = get_mcp_compliance_status() self.assertIn("protocol_version", status) self.assertIn("header_validation_enabled", status) self.assertIn("compliance_status", status) self.assertEqual(status["protocol_version"], "2025-06-18") # Accept either True (middleware enabled) or False (compatibility mode) # Both are valid for testing environments self.assertIn(status["header_validation_enabled"], [True, False]) self.assertEqual(status["compliance_status"], "COMPLIANT") except ImportError: self.fail("Compliance status function not available") def test_middleware_constants(self): """Test that middleware constants are correctly defined.""" try: from kafka_schema_registry_unified_mcp import ( EXEMPT_PATHS, MCP_PROTOCOL_VERSION, SUPPORTED_MCP_VERSIONS, ) # Verify protocol version self.assertEqual(MCP_PROTOCOL_VERSION, "2025-06-18") # Verify supported versions list self.assertIsInstance(SUPPORTED_MCP_VERSIONS, list) self.assertIn("2025-06-18", SUPPORTED_MCP_VERSIONS) # Verify exempt paths self.assertIsInstance(EXEMPT_PATHS, list) expected_exempt_paths = ["/health", "/metrics", "/ready", "/.well-known"] for path in expected_exempt_paths: self.assertIn(path, EXEMPT_PATHS) except ImportError: self.fail("MCP middleware constants not available") def main(): """Main test execution.""" print("๐Ÿš€ MCP Compliance Test Suite") print(" For Kafka Schema Registry MCP Server") print(" MCP 2025-06-18 Specification Compliance") print("") # Run the compliance tests success = run_basic_compliance_tests() if success: print("\nโœ… MCP compliance tests completed successfully!") return 0 else: print("\nโŒ MCP compliance tests failed!") return 1 if __name__ == "__main__": exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server