Skip to main content
Glama
test_batch_cleanup.py20 kB
#!/usr/bin/env python3 """ Test Batch Context Cleanup Tools This script tests the new batch cleanup tools for efficiently removing all subjects from contexts in both single-registry and multi-registry modes. """ import asyncio import json import os import sys import pytest import requests from fastmcp import Client from mcp import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client # Add project root to Python path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @pytest.mark.asyncio async def test_single_registry_batch_cleanup_helper(): """Helper function to test batch cleanup with MCP client and timeout protection""" try: # Create a clean environment with only the variables we need env = {} # Copy only essential system variables for key in [ "PATH", "PYTHONPATH", "HOME", "USER", "SHELL", "LANG", "LC_ALL", "VIRTUAL_ENV", ]: if key in os.environ: env[key] = os.environ[key] # Add our registry configuration env["SCHEMA_REGISTRY_NAME_1"] = "dev" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:38081" env["SCHEMA_REGISTRY_NAME_2"] = "prod" env["SCHEMA_REGISTRY_URL_2"] = "http://localhost:38082" # Force test mode to allow localhost env["TESTING"] = "true" print("🔍 Environment variables being passed to subprocess:") for k, v in sorted(env.items()): if "SCHEMA" in k or "TESTING" in k: print(f" {k}={v}") # Get absolute path to server script script_dir = os.path.dirname(os.path.abspath(__file__)) server_script = os.path.join(os.path.dirname(script_dir), "kafka_schema_registry_unified_mcp.py") server_params = StdioServerParameters(command="python", args=[server_script], env=env) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() test_context = "test-cleanup-single" # Test dry run first print("🔍 Testing dry run mode...") dry_run_result = await session.call_tool( "clear_context_batch", { "context": test_context, "delete_context_after": True, "dry_run": True, }, ) if dry_run_result.content and len(dry_run_result.content) > 0: content = json.loads(dry_run_result.content[0].text) if "error" in content: print(f"❌ Dry run failed: {content['error']}") return False print("✅ Dry run completed") print(f" Response keys: {list(content.keys())}") else: print("❌ No response from dry run") return False # Test actual cleanup print("\n🗑️ Testing actual batch cleanup...") cleanup_result = await session.call_tool( "clear_context_batch", { "context": test_context, "delete_context_after": True, "dry_run": False, }, ) if cleanup_result.content and len(cleanup_result.content) > 0: content = json.loads(cleanup_result.content[0].text) if "error" in content: print(f"❌ Cleanup failed: {content['error']}") return False print("✅ Batch cleanup completed") print(f" Response keys: {list(content.keys())}") return True else: print("❌ No response from cleanup") return False except Exception as e: print(f"❌ Single-registry cleanup test failed: {e}") return False @pytest.mark.asyncio async def test_single_registry_batch_cleanup(): """Test batch cleanup in single-registry mode""" print("🧪 Testing Single-Registry Batch Cleanup") print("=" * 50) # First, create a test context with some subjects test_context = "test-cleanup-single" dev_url = "http://localhost:38081" print(f"📝 Setting up test context '{test_context}' with test subjects...") # Create test schemas test_schemas = [ { "subject": "cleanup-test-user", "schema": { "type": "record", "name": "CleanupUser", "namespace": "com.example.cleanup.test", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, ], }, }, { "subject": "cleanup-test-order", "schema": { "type": "record", "name": "CleanupOrder", "namespace": "com.example.cleanup.test", "fields": [ {"name": "orderId", "type": "string"}, {"name": "amount", "type": "double"}, ], }, }, ] # Create subjects in the test context created_subjects = [] for schema_def in test_schemas: try: subject = schema_def["subject"] schema = schema_def["schema"] # Create in context url = f"{dev_url}/contexts/{test_context}/subjects/{subject}/versions" payload = {"schema": json.dumps(schema)} response = requests.post( url, headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json=payload, timeout=10, ) if response.status_code in [200, 409]: created_subjects.append(subject) print(f" ✅ Created {subject} in context '{test_context}'") else: print(f" ❌ Failed to create {subject}: {response.status_code}") except Exception as e: print(f" ❌ Error creating {subject}: {e}") if not created_subjects: print("❌ No test subjects created - skipping cleanup test") return False print(f"📊 Created {len(created_subjects)} test subjects") # Now test the single-registry batch cleanup with timeout protection print("\n🧪 Testing single-registry batch cleanup...") try: # Run the async helper with timeout protection result = await asyncio.wait_for(test_single_registry_batch_cleanup_helper(), timeout=30.0) return result except asyncio.TimeoutError: print("❌ Single-registry cleanup test timed out after 30 seconds") return False except Exception as e: print(f"❌ Single-registry cleanup test failed: {e}") return False @pytest.mark.asyncio async def test_multi_registry_batch_cleanup_helper(): """Helper function to test multi-registry batch cleanup with MCP client and timeout protection""" try: # Create a clean environment with only the variables we need env = {} # Copy only essential system variables for key in [ "PATH", "PYTHONPATH", "HOME", "USER", "SHELL", "LANG", "LC_ALL", "VIRTUAL_ENV", ]: if key in os.environ: env[key] = os.environ[key] # Add our registry configuration env["SCHEMA_REGISTRY_NAME_1"] = "dev" env["SCHEMA_REGISTRY_URL_1"] = "http://localhost:38081" env["SCHEMA_REGISTRY_NAME_2"] = "prod" env["SCHEMA_REGISTRY_URL_2"] = "http://localhost:38082" # Force test mode to allow localhost env["TESTING"] = "true" # Get absolute path to server script script_dir = os.path.dirname(os.path.abspath(__file__)) server_script = os.path.join(os.path.dirname(script_dir), "kafka_schema_registry_unified_mcp.py") server_params = StdioServerParameters(command="python", args=[server_script], env=env) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() test_context = "test-cleanup-multi" # Test multi-registry context cleanup in DEV registry print("🔍 Testing context cleanup in DEV registry...") cleanup_result = await session.call_tool( "clear_context_batch", { "context": test_context, "registry": "dev", "delete_context_after": True, "dry_run": False, }, ) if cleanup_result.content and len(cleanup_result.content) > 0: content = json.loads(cleanup_result.content[0].text) if "error" in content: print(f"❌ Multi-registry cleanup failed: {content['error']}") return False print("✅ Multi-registry cleanup task started") print(f" Response keys: {list(content.keys())}") return True else: print("❌ No response from multi-registry cleanup") return False except Exception as e: print(f"❌ Multi-registry cleanup test failed: {e}") return False @pytest.mark.asyncio async def test_multi_registry_batch_cleanup(): """Test batch cleanup in multi-registry mode""" print("\n🧪 Testing Multi-Registry Batch Cleanup") print("=" * 50) # Test context for multi-registry test_context = "test-cleanup-multi" print(f"📝 Setting up test context '{test_context}' in DEV registry...") # Create test schemas in DEV registry dev_url = "http://localhost:38081" test_schema = { "type": "record", "name": "MultiCleanupTest", "namespace": "com.example.multi.cleanup", "fields": [ {"name": "id", "type": "string"}, {"name": "data", "type": "string"}, ], } test_subject = "multi-cleanup-test" try: # Create subject in DEV registry context url = f"{dev_url}/contexts/{test_context}/subjects/{test_subject}/versions" payload = {"schema": json.dumps(test_schema)} response = requests.post( url, headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json=payload, timeout=10, ) if response.status_code not in [200, 409]: print(f"❌ Failed to create test subject: {response.status_code}") return False print(f" ✅ Created {test_subject} in context '{test_context}' (DEV registry)") except Exception as e: print(f"❌ Error creating test subject: {e}") return False # Test multi-registry batch cleanup with timeout protection print("\n🧪 Testing multi-registry batch cleanup...") try: # Run the async helper with timeout protection result = await asyncio.wait_for(test_multi_registry_batch_cleanup_helper(), timeout=30.0) return result except asyncio.TimeoutError: print("❌ Multi-registry cleanup test timed out after 30 seconds") return False except Exception as e: print(f"❌ Multi-registry cleanup test failed: {e}") return False def test_performance_characteristics(): """Test and demonstrate performance characteristics""" print("\n📊 Testing Performance Characteristics") print("=" * 50) print("🏃 Performance features of batch cleanup tools:") print(" • Parallel deletion (up to 10 concurrent deletions)") print(" • Progress reporting with real-time feedback") print(" • Comprehensive error handling and retry logic") print(" • Detailed performance metrics (subjects/second)") print(" • Dry run mode for safe testing") print(" • Context deletion after subject cleanup") print(" • Support for both single and multi-registry modes") print(" • Cross-registry cleanup for consistent maintenance") print("\n💡 Usage Examples:") print(" Single Registry:") print(" • clear_context_batch('test-context', dry_run=True)") print(" • clear_multiple_contexts_batch(['ctx1', 'ctx2'])") print("\n Multi Registry:") print(" • clear_context_batch('test-context', 'dev-registry')") print(" • clear_context_across_registries_batch('ctx', ['dev', 'prod'])") return True @pytest.mark.asyncio async def test_batch_cleanup(): """Test batch cleanup operations via MCP""" print("🧹 Testing Batch Cleanup Operations") print("=" * 50) # Setup environment os.environ["SCHEMA_REGISTRY_URL"] = "http://localhost:38081" os.environ["VIEWONLY"] = "false" # Get server script path script_dir = os.path.dirname(os.path.abspath(__file__)) server_script = os.path.join(os.path.dirname(script_dir), "kafka_schema_registry_unified_mcp.py") # Create client client = Client(server_script) try: async with client: print("✅ MCP connection established") # Get available tools tools = await client.list_tools() tool_names = [tool.name for tool in tools] print(f"📋 Available tools: {len(tool_names)}") # Test batch cleanup tools if available cleanup_tools = [ "cleanup_schemas", "bulk_cleanup", "batch_delete_schemas", "cleanup_by_pattern", ] available_cleanup_tools = [tool for tool in cleanup_tools if tool in tool_names] print(f"🧹 Cleanup tools available: {available_cleanup_tools}") for tool_name in available_cleanup_tools: print(f"\n🧪 Testing: {tool_name}") try: # Use minimal test arguments args = {} if "pattern" in tool_name: args = {"pattern": "test-*"} elif "bulk" in tool_name: args = {"subjects": ["test-subject-1", "test-subject-2"]} elif "batch" in tool_name: args = {"subjects": ["test-subject"]} else: args = {"dry_run": True} result = await client.call_tool(tool_name, args) print(f"✅ {tool_name}: Executed successfully") # Check if result contains cleanup info result_text = str(result) if "cleanup" in result_text.lower() or "deleted" in result_text.lower(): print(" Result appears to be cleanup-related") except Exception as e: if "connection" in str(e).lower() or "registry" in str(e).lower(): print(f"⚠️ {tool_name}: Connection error (expected): {e}") else: print(f"❌ {tool_name}: Unexpected error: {e}") # Test non-destructive operations safe_tools = [ "count_schemas_by_subject", "count_total_schemas", ] available_safe_tools = [tool for tool in safe_tools if tool in tool_names] print(f"\n📊 Testing safe operations: {available_safe_tools}") for tool_name in available_safe_tools: try: result = await client.call_tool(tool_name, {}) print(f"✅ {tool_name}: Working") except Exception as e: print(f"⚠️ {tool_name}: {e}") print("\n🎉 Batch cleanup test completed!") return True except Exception as e: print(f"❌ Critical error during batch cleanup test: {e}") import traceback traceback.print_exc() return False async def main(): """Main test runner""" print("🚀 Batch Context Cleanup Tools Test Suite") print("=" * 60) # Check registry connectivity print("🔍 Checking registry connectivity...") try: dev_response = requests.get("http://localhost:38081/subjects", timeout=5) prod_response = requests.get("http://localhost:38082/subjects", timeout=5) if dev_response.status_code != 200: print(f"❌ DEV registry not accessible: {dev_response.status_code}") return False if prod_response.status_code != 200: print(f"❌ PROD registry not accessible: {prod_response.status_code}") return False print("✅ Both registries accessible") except Exception as e: print(f"❌ Registry connectivity failed: {e}") print("💡 Make sure multi-registry environment is running:") print(" ./tests/start_multi_registry_environment.sh") return False # Run tests tests = [ ("Single-Registry Batch Cleanup", test_single_registry_batch_cleanup, True), ("Multi-Registry Batch Cleanup", test_multi_registry_batch_cleanup, True), ("Performance Characteristics", test_performance_characteristics, False), ("Batch Cleanup via MCP Client", test_batch_cleanup, True), ] passed = 0 total = len(tests) for test_name, test_func, is_async in tests: print(f"\n{'='*20} {test_name} {'='*20}") try: # Reset task manager before each test for isolation try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("🔄 Task manager reset for test isolation") except Exception as e: print(f"⚠️ Warning: Could not reset task manager: {e}") if is_async: if await test_func(): passed += 1 print(f"✅ {test_name} PASSED") else: print(f"❌ {test_name} FAILED") else: if test_func(): passed += 1 print(f"✅ {test_name} PASSED") else: print(f"❌ {test_name} FAILED") except Exception as e: print(f"❌ {test_name} FAILED with exception: {e}") # Additional cleanup after each test try: import kafka_schema_registry_unified_mcp kafka_schema_registry_unified_mcp.task_manager.reset_for_testing() print("🧹 Post-test cleanup completed") except Exception as e: print(f"⚠️ Warning: Post-test cleanup failed: {e}") print(f"\n📊 Test Results: {passed}/{total} tests passed") if passed == total: print("🎉 ALL BATCH CLEANUP TESTS PASSED!") print("\n🚀 Batch cleanup tools are ready for use!") print(" • Use dry_run=True for safe testing") print(" • Tools support both single and multi-registry modes") print(" • Performance optimized with parallel execution") print(" • Comprehensive error handling and reporting") return True else: print(f"⚠️ {total - passed} tests failed") return False if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server