Skip to main content
Glama
test_migration_integration.pyโ€ข11.7 kB
#!/usr/bin/env python3 """ Comprehensive Migration Integration Tests Tests the actual migration functionality in the MCP server to ensure: 1. migrate_context actually migrates schemas (not just returns metadata) 2. migrate_schema works end-to-end 3. Migration counts are accurate 4. Error handling works properly 5. Dry run functionality works 6. Edge cases are handled """ import json import logging import os import sys import uuid import pytest import requests # Add parent directory to path to import the MCP server sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import kafka_schema_registry_unified_mcp as mcp_server from migration_tools import ( get_migration_status_tool, list_migrations_tool, migrate_schema_tool, ) # Setup logging for debugging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @pytest.fixture def test_env(): """Setup test environment with multi-registry configuration""" dev_url = "http://localhost:38081" prod_url = "http://localhost:38082" test_context = f"test-migration-{uuid.uuid4().hex[:8]}" # Setup environment for multi-registry mode os.environ["SCHEMA_REGISTRY_NAME_1"] = "dev" os.environ["SCHEMA_REGISTRY_URL_1"] = dev_url os.environ["VIEWONLY_1"] = "false" os.environ["SCHEMA_REGISTRY_NAME_2"] = "prod" os.environ["SCHEMA_REGISTRY_URL_2"] = prod_url os.environ["VIEWONLY_2"] = "false" # Allow writes to prod for testing # Clear any other registry configurations for i in range(3, 9): for var in [ f"SCHEMA_REGISTRY_NAME_{i}", f"SCHEMA_REGISTRY_URL_{i}", f"VIEWONLY_{i}", ]: if var in os.environ: del os.environ[var] # Clear global VIEWONLY setting if "VIEWONLY" in os.environ: del os.environ["VIEWONLY"] # Force reload the registry manager with new configuration mcp_server.registry_manager._load_multi_registries() yield {"dev_url": dev_url, "prod_url": prod_url, "test_context": test_context} # Cleanup after tests - remove any test schemas try: for registry in ["dev", "prod"]: try: subjects = mcp_server.list_subjects(context=test_context, registry=registry) if subjects and not isinstance(subjects, dict): # Not an error response for subject in subjects: mcp_server.delete_subject( subject, context=test_context, registry=registry, permanent=True, ) except Exception: pass # Ignore cleanup errors except Exception as e: logger.warning(f"Cleanup error: {e}") def create_test_schema_via_api(registry_url: str, context: str, subject: str, schema_def: dict) -> bool: """Create a test schema directly via the registry API""" try: if context and context != ".": url = f"{registry_url}/contexts/{context}/subjects/{subject}/versions" else: url = f"{registry_url}/subjects/{subject}/versions" payload = {"schema": json.dumps(schema_def)} response = requests.post( url, headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json=payload, timeout=10, ) return response.status_code in [200, 409] # 409 = already exists except Exception as e: logger.error(f"Failed to create schema {subject}: {e}") return False @pytest.mark.asyncio async def test_migrate_schema_functionality(test_env): """Test that migrate_schema works for individual schemas""" logger.info("Testing migrate_schema functionality...") dev_url = test_env["dev_url"] test_context = test_env["test_context"] test_subject = f"migration-test-{uuid.uuid4().hex[:8]}" # Create a test schema in dev test_schema = { "type": "record", "name": "MigrationTestEvent", "namespace": "com.example.migration.test", "fields": [ {"name": "testId", "type": "string"}, {"name": "testValue", "type": "int"}, ], } # Create schema via API success = create_test_schema_via_api(dev_url, test_context, test_subject, test_schema) assert success, f"Failed to create test schema {test_subject}" try: # Migrate schema using direct function call result = migrate_schema_tool( subject=test_subject, source_registry="dev", target_registry="prod", registry_manager=mcp_server.registry_manager, registry_mode=mcp_server.REGISTRY_MODE, source_context=test_context, target_context=test_context, migrate_all_versions=True, preserve_ids=False, dry_run=False, ) assert "error" not in result, f"Schema migration failed: {result.get('error')}" logger.info("Migration Results:") logger.info(f" Total versions: {result.get('total_versions', 0)}") logger.info(f" Successful: {result.get('successful_migrations', 0)}") logger.info(f" Failed: {result.get('failed_migrations', 0)}") logger.info(f" Skipped: {result.get('skipped_migrations', 0)}") # Verify migration was successful OR appropriately skipped successful_count = result.get("successful_migrations", 0) skipped_count = result.get("skipped_migrations", 0) assert successful_count > 0 or skipped_count > 0, "No schema versions migrated or skipped" logger.info("โœ… Schema migration test passed") finally: # Cleanup try: mcp_server.delete_subject(test_subject, context=test_context, registry="dev", permanent=True) mcp_server.delete_subject(test_subject, context=test_context, registry="prod", permanent=True) except Exception: pass @pytest.mark.asyncio async def test_migration_task_tracking(test_env): """Test that migration tasks are properly tracked""" logger.info("Testing migration task tracking...") dev_url = test_env["dev_url"] test_context = test_env["test_context"] # Get migration count before migrations_before = list_migrations_tool(mcp_server.REGISTRY_MODE) if isinstance(migrations_before, dict) and "migrations" in migrations_before: before_count = len(migrations_before["migrations"]) elif isinstance(migrations_before, list): before_count = len(migrations_before) else: before_count = 0 test_subject = f"tracking-test-{uuid.uuid4().hex[:8]}" # Create a test schema test_schema = { "type": "record", "name": "TrackingTestEvent", "namespace": "com.example.migration.test", "fields": [{"name": "trackingId", "type": "string"}], } success = create_test_schema_via_api(dev_url, test_context, test_subject, test_schema) assert success, f"Failed to create tracking test schema {test_subject}" try: # Perform migration result = migrate_schema_tool( subject=test_subject, source_registry="dev", target_registry="prod", registry_manager=mcp_server.registry_manager, registry_mode=mcp_server.REGISTRY_MODE, source_context=test_context, migrate_all_versions=False, preserve_ids=False, dry_run=False, ) # Get migration count after migrations_after = list_migrations_tool(mcp_server.REGISTRY_MODE) if isinstance(migrations_after, dict) and "migrations" in migrations_after: after_count = len(migrations_after["migrations"]) elif isinstance(migrations_after, list): after_count = len(migrations_after) else: after_count = 0 assert after_count > before_count, f"Migration task not tracked properly ({before_count} -> {after_count})" # Test getting specific migration status if migration_id is provided if "migration_id" in result: migration_id = result["migration_id"] status = get_migration_status_tool(migration_id, mcp_server.REGISTRY_MODE) assert status and isinstance(status, dict), "Failed to get migration status" if "error" not in status: logger.info("โœ… Migration status retrieval works") else: logger.warning(f"Migration status error (non-critical): {status['error']}") logger.info("โœ… Migration task tracking test passed") finally: # Cleanup try: mcp_server.delete_subject(test_subject, context=test_context, registry="dev", permanent=True) mcp_server.delete_subject(test_subject, context=test_context, registry="prod", permanent=True) except Exception: pass @pytest.mark.asyncio async def test_migration_error_handling(test_env): """Test migration error handling""" logger.info("Testing migration error handling...") test_context = test_env["test_context"] # Try to migrate non-existent subject result = migrate_schema_tool( subject="non-existent-subject", source_registry="dev", target_registry="prod", registry_manager=mcp_server.registry_manager, registry_mode=mcp_server.REGISTRY_MODE, source_context=test_context, ) # Should return a result indicating no versions to migrate, not an error assert "total_versions" in result, "Expected migration result with version count" assert result.get("total_versions", 0) == 0, "Expected 0 versions for non-existent subject" # Try to migrate to non-existent registry result = migrate_schema_tool( subject="test-subject", source_registry="dev", target_registry="non-existent-registry", registry_manager=mcp_server.registry_manager, registry_mode=mcp_server.REGISTRY_MODE, ) assert "error" in result, "Expected error for non-existent registry" logger.info("โœ… Migration error handling test passed") def test_registry_connectivity(): """Test that both registries are accessible before running tests""" logger.info("Testing registry connectivity...") dev_response = requests.get("http://localhost:38081/subjects", timeout=5) prod_response = requests.get("http://localhost:38082/subjects", timeout=5) if dev_response.status_code != 200: pytest.skip(f"DEV registry not accessible: {dev_response.status_code}") if prod_response.status_code != 200: pytest.skip(f"PROD registry not accessible: {prod_response.status_code}") logger.info("โœ… Both registries accessible") @pytest.mark.asyncio async def test_all_migration_features(test_env): """Run all migration integration tests""" logger.info("๐Ÿš€ Starting Migration Integration Tests") # Test individual features await test_migrate_schema_functionality(test_env) await test_migration_task_tracking(test_env) await test_migration_error_handling(test_env) logger.info("๐ŸŽ‰ All Migration Integration Tests passed!") if __name__ == "__main__": # Check connectivity first test_registry_connectivity() # Run the tests using pytest import subprocess import sys result = subprocess.run( [ sys.executable, "-m", "pytest", __file__ + "::test_all_migration_features", "-v", "-s", ] ) sys.exit(result.returncode)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server