Skip to main content
Glama
test_compatibility_migration.pyโ€ข6.7 kB
#!/usr/bin/env python3 """ Compatibility validation during migration Tests schema compatibility rules and validation during migration between registries. """ import json import sys import requests def test_test_compatibility_migration(): """Compatibility validation during migration""" # DEV Schema Registry dev_url = "http://localhost:38081" # PROD Schema Registry prod_url = "http://localhost:38082" print("๐Ÿงช Starting compatibility migration test...") try: # Check connectivity dev_response = requests.get(f"{dev_url}/subjects", timeout=5) prod_response = requests.get(f"{prod_url}/subjects", timeout=5) if dev_response.status_code != 200 or prod_response.status_code != 200: print("โŒ Registry connectivity failed") return False print("โœ… Both registries are accessible") # Test subject for compatibility testing test_subject = "compatibility-test-event" # Create base schema (v1) base_schema = { "type": "record", "name": "Event", "fields": [ {"name": "id", "type": "string"}, {"name": "timestamp", "type": "long"}, {"name": "type", "type": "string"}, ], } print("๐Ÿ“ Creating base schema v1 in DEV...") base_payload = {"schema": json.dumps(base_schema)} create_response = requests.post( f"{dev_url}/subjects/{test_subject}-value/versions", headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json=base_payload, timeout=5, ) if create_response.status_code not in [200, 409]: print(f"โŒ Failed to create base schema: {create_response.status_code}") return False print("โœ… Base schema v1 created") # Create backward compatible schema (v2) - add optional field compat_schema = { "type": "record", "name": "Event", "fields": [ {"name": "id", "type": "string"}, {"name": "timestamp", "type": "long"}, {"name": "type", "type": "string"}, {"name": "metadata", "type": ["null", "string"], "default": None}, ], } print("๐Ÿ” Testing backward compatibility...") compat_test = requests.post( f"{dev_url}/compatibility/subjects/{test_subject}-value/versions/latest", headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json={"schema": json.dumps(compat_schema)}, timeout=5, ) if compat_test.status_code == 200: compat_result = compat_test.json() if compat_result.get("is_compatible", False): print("โœ… Backward compatible schema validated") # Register the compatible schema requests.post( f"{dev_url}/subjects/{test_subject}-value/versions", headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json={"schema": json.dumps(compat_schema)}, timeout=5, ) print("โœ… Compatible schema v2 registered") else: print("โš ๏ธ Schema marked as incompatible") else: print(f"โš ๏ธ Compatibility check failed: {compat_test.status_code}") # Test incompatible schema - remove required field incompat_schema = { "type": "record", "name": "Event", "fields": [ {"name": "id", "type": "string"}, {"name": "type", "type": "string"}, # Missing required 'timestamp' field ], } print("๐Ÿ” Testing incompatible schema...") incompat_test = requests.post( f"{dev_url}/compatibility/subjects/{test_subject}-value/versions/latest", headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json={"schema": json.dumps(incompat_schema)}, timeout=5, ) if incompat_test.status_code == 200: incompat_result = incompat_test.json() if not incompat_result.get("is_compatible", True): print("โœ… Incompatible schema correctly rejected") else: print("โš ๏ธ Incompatible schema incorrectly accepted") # Test compatibility levels print("๐Ÿ” Testing compatibility levels...") # Get current compatibility level compat_level_response = requests.get(f"{dev_url}/config/{test_subject}-value", timeout=5) if compat_level_response.status_code == 200: level_data = compat_level_response.json() print(f"โœ… Compatibility level: {level_data.get('compatibilityLevel', 'BACKWARD')}") elif compat_level_response.status_code == 404: # Get global compatibility level global_compat = requests.get(f"{dev_url}/config", timeout=5) if global_compat.status_code == 200: global_data = global_compat.json() print(f"โœ… Global compatibility level: {global_data.get('compatibilityLevel', 'BACKWARD')}") # Test cross-registry compatibility simulation print("๐Ÿ”„ Testing cross-registry compatibility...") # Get all versions from DEV versions_response = requests.get(f"{dev_url}/subjects/{test_subject}-value/versions", timeout=5) if versions_response.status_code == 200: versions = versions_response.json() print(f"โœ… Found {len(versions)} versions in DEV for compatibility testing") # Test each version for migration compatibility for version in versions[-2:]: # Test last 2 versions version_response = requests.get( f"{dev_url}/subjects/{test_subject}-value/versions/{version}", timeout=5, ) if version_response.status_code == 200: version_data = version_response.json() print(f"โœ… Version {version}: Schema ID {version_data.get('id')}") print("โœ… Compatibility migration test completed successfully") return True except requests.exceptions.Timeout: print("โŒ Test failed: Request timeout") return False except Exception as e: print(f"โŒ Test failed: {e}") return False if __name__ == "__main__": success = test_test_compatibility_migration() sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server