Skip to main content
Glama
test_mcp_server.pyโ€ข4.46 kB
#!/usr/bin/env python3 """ Test script for the Kafka Schema Registry MCP Server This script demonstrates how to connect to and test the MCP server using the official MCP Python SDK client. """ import asyncio import os import sys from fastmcp import Client async def test_mcp_server(): """Test the MCP server functionality.""" # Path to the unified server script server_script = "kafka_schema_registry_unified_mcp.py" # Environment variables for the server env = { "SCHEMA_REGISTRY_URL": "http://localhost:38081", # Actual Schema Registry port "SCHEMA_REGISTRY_USER": "", "SCHEMA_REGISTRY_PASSWORD": "", } # Update the current environment for key, value in env.items(): os.environ[key] = value print("๐Ÿš€ Starting MCP Server test...") print(f"๐Ÿ” Python version: {sys.version}") print(f"๐Ÿ“ Current directory: {os.getcwd()}") try: # Create client that will run the server script client = Client(server_script) # Add timeout to prevent hanging async with client: print("๐Ÿ“ก Connection established successfully!") # List available tools print("\n๐Ÿ”ง Available tools:") tools = await client.list_tools() for tool in tools: print(f" - {tool.name}: {tool.description}") # List available resources print("\n๐Ÿ“ฆ Available resources:") resources = await client.list_resources() for resource in resources: print(f" - {resource.uri}: {resource.description}") # Test a simple resource read first - check server status print("\n๐Ÿ” Testing server status...") try: result = await client.read_resource("registry://status") print(f"โœ… Server status: {result[0].text}") except Exception as e: print(f"โš ๏ธ Server status check failed: {e}") # Test basic tool call (list_subjects is simple and safe) print("\n๐Ÿ“„ Testing basic tool call (list_subjects)...") try: result = await client.call_tool("list_subjects", {}) print(f"โœ… Tool call successful: {len(result)} items") if result: print(f"๐Ÿ“‹ Response: {result[0].text[:200]}...") except Exception as e: print(f"โš ๏ธ Tool call failed (might be expected if no schema registry): {e}") print("\nโœ… MCP Server basic connectivity test completed!") except Exception as e: print(f"โŒ Error during test: {e}") import traceback traceback.print_exc() raise async def test_dependencies(): """Test if required dependencies are available.""" print("๐Ÿ” Checking dependencies...") try: import fastmcp print(f"โœ… FastMCP available: {fastmcp.__version__ if hasattr(fastmcp, '__version__') else 'Unknown version'}") except ImportError as e: print(f"โŒ FastMCP not available: {e}") return False try: import requests print(f"โœ… Requests available: {requests.__version__}") except ImportError as e: print(f"โŒ Requests not available: {e}") return False try: import asyncio print("โœ… Asyncio available") except ImportError as e: print(f"โŒ Asyncio not available: {e}") return False # Check if the unified MCP server file exists mcp_server_path = "kafka_schema_registry_unified_mcp.py" if os.path.exists(mcp_server_path): print(f"โœ… Unified MCP server file found: {mcp_server_path}") else: print(f"โŒ Unified MCP server file not found: {mcp_server_path}") return False return True async def main(): """Main function with overall timeout.""" try: # Test dependencies first deps_ok = await test_dependencies() if not deps_ok: print("โŒ Dependency check failed, aborting test") return print("\n" + "=" * 50) await asyncio.wait_for(test_mcp_server(), timeout=60) # 1 minute total timeout except asyncio.TimeoutError: print("โŒ Test timed out after 60 seconds") raise except Exception as e: print(f"โŒ Test failed: {e}") raise if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server