Skip to main content
Glama
test_integration.py46.9 kB
import asyncio import json import os import httpx import pytest import pytest_asyncio # Test configuration MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "http://localhost:38000") SCHEMA_REGISTRY_URL = os.getenv("SCHEMA_REGISTRY_URL", "http://localhost:38081") # Test schemas AVRO_SCHEMA_V1 = { "type": "record", "name": "User", "fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}], } AVRO_SCHEMA_V2 = { "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "email", "type": ["null", "string"], "default": None}, ], } INCOMPATIBLE_SCHEMA = { "type": "record", "name": "User", "fields": [ {"name": "user_id", "type": "string"}, # Changed field name and type {"name": "full_name", "type": "string"}, ], } @pytest_asyncio.fixture async def client(): """Create an async HTTP client for testing.""" async with httpx.AsyncClient() as client: yield client @pytest.fixture def test_subject(): """Generate a unique test subject name.""" import uuid return f"test-subject-{uuid.uuid4().hex[:8]}" async def wait_for_service(url: str, timeout: int = 60): """Wait for a service to be available.""" start_time = asyncio.get_event_loop().time() while True: try: async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code == 200: return True except httpx.RequestError: pass if asyncio.get_event_loop().time() - start_time > timeout: raise TimeoutError(f"Service at {url} did not become available within {timeout} seconds") await asyncio.sleep(1) @pytest.mark.asyncio async def test_services_healthy(client: httpx.AsyncClient): """Test that both MCP server and Schema Registry are accessible.""" # Test MCP server health response = await client.get(f"{MCP_SERVER_URL}/") assert response.status_code == 200 assert ( response.json()["message"] == "Kafka Schema Registry MCP Server with Context Support, Configuration Management, Mode Control, and Schema Export" ) @pytest.mark.asyncio async def test_register_schema(client: httpx.AsyncClient, test_subject: str): """Test registering a new schema.""" response = await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) assert response.status_code == 200 data = response.json() assert "id" in data assert data["id"] > 0 @pytest.mark.asyncio async def test_get_schema(client: httpx.AsyncClient, test_subject: str): """Test retrieving a schema.""" # First register a schema await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Then retrieve it response = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}") assert response.status_code == 200 data = response.json() assert data["subject"] == test_subject assert data["version"] == 1 assert "schema" in data @pytest.mark.asyncio async def test_get_schema_versions(client: httpx.AsyncClient, test_subject: str): """Test getting all versions of a schema.""" # Register two versions await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) # Get versions response = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}/versions") assert response.status_code == 200 versions = response.json() assert isinstance(versions, list) assert 1 in versions assert 2 in versions @pytest.mark.asyncio async def test_check_compatibility_compatible(client: httpx.AsyncClient, test_subject: str): """Test checking compatibility with a compatible schema.""" # Register initial schema await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Check compatibility with backward compatible schema response = await client.post( f"{MCP_SERVER_URL}/compatibility", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) assert response.status_code == 200 data = response.json() assert data.get("is_compatible", False) is True @pytest.mark.asyncio async def test_check_compatibility_incompatible(client: httpx.AsyncClient, test_subject: str): """Test checking compatibility with an incompatible schema.""" # Register initial schema await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Check compatibility with incompatible schema response = await client.post( f"{MCP_SERVER_URL}/compatibility", json={ "subject": test_subject, "schema": INCOMPATIBLE_SCHEMA, "schemaType": "AVRO", }, ) assert response.status_code == 200 data = response.json() assert data.get("is_compatible", True) is False @pytest.mark.asyncio async def test_schema_not_found(client: httpx.AsyncClient): """Test retrieving a non-existent schema.""" response = await client.get(f"{MCP_SERVER_URL}/schemas/non-existent-subject") assert response.status_code == 404 @pytest.mark.asyncio async def test_invalid_schema_format(client: httpx.AsyncClient, test_subject: str): """Test registering an invalid schema.""" response = await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": {"invalid": "schema"}, "schemaType": "AVRO", }, ) assert response.status_code == 500 # Test with authentication (if configured) @pytest.mark.asyncio @pytest.mark.skipif(not os.getenv("SCHEMA_REGISTRY_USER"), reason="Authentication not configured") async def test_with_authentication(): """Test that authentication is working when configured.""" # This test assumes authentication is configured in the environment async with httpx.AsyncClient() as client: response = await client.get(f"{MCP_SERVER_URL}/") assert response.status_code == 200 # Schema Context Tests @pytest.mark.asyncio async def test_list_contexts(client: httpx.AsyncClient): """Test listing all available contexts.""" response = await client.get(f"{MCP_SERVER_URL}/contexts") assert response.status_code == 200 data = response.json() assert "contexts" in data assert isinstance(data["contexts"], list) @pytest.mark.asyncio async def test_create_context(client: httpx.AsyncClient): """Test creating a new schema context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" response = await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") assert response.status_code == 200 data = response.json() assert "message" in data assert context_name in data["message"] assert "created successfully" in data["message"] @pytest.mark.asyncio async def test_context_aware_schema_registration(client: httpx.AsyncClient, test_subject: str): """Test registering a schema within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context first await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") # Register schema in context via request body response = await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) assert response.status_code == 200 data = response.json() assert "id" in data assert data["id"] > 0 @pytest.mark.asyncio async def test_context_aware_schema_registration_query_param(client: httpx.AsyncClient, test_subject: str): """Test registering a schema with context via query parameter.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context first await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") # Register schema in context via query parameter response = await client.post( f"{MCP_SERVER_URL}/schemas?context={context_name}", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) assert response.status_code == 200 data = response.json() assert "id" in data assert data["id"] > 0 @pytest.mark.asyncio async def test_context_aware_schema_retrieval(client: httpx.AsyncClient, test_subject: str): """Test retrieving a schema from a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Retrieve schema from context response = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}?context={context_name}") assert response.status_code == 200 data = response.json() # Schema Registry returns subject with context prefix: ":.context-name:subject-name" expected_subject = f":.{context_name}:{test_subject}" assert data["subject"] == expected_subject assert "id" in data assert "schema" in data @pytest.mark.asyncio async def test_context_aware_schema_versions(client: httpx.AsyncClient, test_subject: str): """Test getting schema versions within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register multiple schema versions await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO", "context": context_name, }, ) # Get versions within context response = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}/versions?context={context_name}") assert response.status_code == 200 versions = response.json() assert isinstance(versions, list) assert 1 in versions assert 2 in versions @pytest.mark.asyncio async def test_context_aware_compatibility_check(client: httpx.AsyncClient, test_subject: str): """Test compatibility checking within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register initial schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Check compatibility within context response = await client.post( f"{MCP_SERVER_URL}/compatibility", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO", "context": context_name, }, ) assert response.status_code == 200 data = response.json() assert "is_compatible" in data @pytest.mark.asyncio async def test_context_aware_compatibility_check_query_param(client: httpx.AsyncClient, test_subject: str): """Test compatibility checking with context via query parameter.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register initial schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas?context={context_name}", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Check compatibility within context via query param response = await client.post( f"{MCP_SERVER_URL}/compatibility?context={context_name}", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) assert response.status_code == 200 data = response.json() assert "is_compatible" in data @pytest.mark.asyncio async def test_list_subjects_with_context(client: httpx.AsyncClient, test_subject: str): """Test listing subjects within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register a schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # List subjects in context response = await client.get(f"{MCP_SERVER_URL}/subjects?context={context_name}") assert response.status_code == 200 subjects = response.json() assert isinstance(subjects, list) # Schema Registry returns subject with context prefix: ":.context-name:subject-name" expected_subject = f":.{context_name}:{test_subject}" assert expected_subject in subjects @pytest.mark.asyncio async def test_list_subjects_default_context(client: httpx.AsyncClient, test_subject: str): """Test listing subjects in default context (no context specified).""" # Register a schema in default context await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # List subjects (default context) response = await client.get(f"{MCP_SERVER_URL}/subjects") assert response.status_code == 200 subjects = response.json() assert isinstance(subjects, list) assert test_subject in subjects @pytest.mark.asyncio async def test_delete_subject_with_context(client: httpx.AsyncClient, test_subject: str): """Test deleting a subject within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register a schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Delete subject within context response = await client.delete(f"{MCP_SERVER_URL}/subjects/{test_subject}?context={context_name}") assert response.status_code == 200 # Verify deletion response = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}?context={context_name}") assert response.status_code == 404 @pytest.mark.asyncio async def test_delete_context(client: httpx.AsyncClient): """Test deleting a schema context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context first create_response = await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") assert create_response.status_code == 200 # Delete context (should work for empty context) response = await client.delete(f"{MCP_SERVER_URL}/contexts/{context_name}") # Note: Some Schema Registry versions may not support context deletion # or may require the context to be empty if response.status_code == 200: data = response.json() assert "message" in data assert context_name in data["message"] assert "deleted successfully" in data["message"] elif response.status_code == 404: # Context already deleted or doesn't exist - this is also acceptable pass elif response.status_code == 500: # Some Schema Registry versions may not support context deletion # This is acceptable for testing purposes pass else: # Unexpected status code assert False, f"Unexpected status code: {response.status_code}, response: {response.text}" @pytest.mark.asyncio async def test_context_isolation(client: httpx.AsyncClient, test_subject: str): """Test that contexts provide proper isolation between schemas.""" import uuid context1 = f"test-context-1-{uuid.uuid4().hex[:8]}" context2 = f"test-context-2-{uuid.uuid4().hex[:8]}" # Create both contexts await client.post(f"{MCP_SERVER_URL}/contexts/{context1}") await client.post(f"{MCP_SERVER_URL}/contexts/{context2}") # Register same subject in different contexts with different schemas await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context1, }, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO", "context": context2, }, ) # Verify schemas are different in each context response1 = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}?context={context1}") response2 = await client.get(f"{MCP_SERVER_URL}/schemas/{test_subject}?context={context2}") assert response1.status_code == 200 assert response2.status_code == 200 schema1 = response1.json()["schema"] schema2 = response2.json()["schema"] # Schemas should be different (one has email field, one doesn't) assert schema1 != schema2 # Configuration Management Tests @pytest.mark.asyncio async def test_get_global_config(client: httpx.AsyncClient): """Test getting global configuration settings.""" response = await client.get(f"{MCP_SERVER_URL}/config") assert response.status_code == 200 data = response.json() assert "compatibilityLevel" in data # Common compatibility levels: BACKWARD, FORWARD, FULL, NONE assert data["compatibilityLevel"] in [ "BACKWARD", "FORWARD", "FULL", "NONE", "BACKWARD_TRANSITIVE", "FORWARD_TRANSITIVE", "FULL_TRANSITIVE", ] @pytest.mark.asyncio async def test_update_global_config(client: httpx.AsyncClient): """Test updating global configuration settings.""" # Get current config first current_response = await client.get(f"{MCP_SERVER_URL}/config") assert current_response.status_code == 200 current_config = current_response.json() # Update to a different compatibility level new_compatibility = "FULL" if current_config["compatibilityLevel"] != "FULL" else "BACKWARD" response = await client.put(f"{MCP_SERVER_URL}/config", json={"compatibility": new_compatibility}) assert response.status_code == 200 data = response.json() assert "compatibility" in data assert data["compatibility"] == new_compatibility @pytest.mark.asyncio async def test_get_subject_config(client: httpx.AsyncClient, test_subject: str): """Test getting configuration for a specific subject.""" # First register a schema to ensure subject exists await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Try to get subject config (might return 404 if no specific config set) response = await client.get(f"{MCP_SERVER_URL}/config/{test_subject}") # Either returns the subject-specific config or inherits from global assert response.status_code in [200, 404] if response.status_code == 200: data = response.json() assert "compatibilityLevel" in data @pytest.mark.asyncio async def test_update_subject_config(client: httpx.AsyncClient, test_subject: str): """Test updating configuration for a specific subject.""" # First register a schema to ensure subject exists await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Set subject-specific config response = await client.put(f"{MCP_SERVER_URL}/config/{test_subject}", json={"compatibility": "FORWARD"}) assert response.status_code == 200 data = response.json() assert "compatibility" in data assert data["compatibility"] == "FORWARD" @pytest.mark.asyncio async def test_config_with_context(client: httpx.AsyncClient, test_subject: str): """Test configuration management within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context first await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") # Register schema in context await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Get global config for context (may not be supported by all Schema Registry versions) response = await client.get(f"{MCP_SERVER_URL}/config?context={context_name}") # Context-aware config might not be supported, so we accept both outcomes assert response.status_code in [200, 404, 500] # Mode Management Tests @pytest.mark.asyncio async def test_get_mode(client: httpx.AsyncClient): """Test getting the current Schema Registry mode.""" response = await client.get(f"{MCP_SERVER_URL}/mode") assert response.status_code == 200 data = response.json() assert "mode" in data # Common modes: IMPORT, READONLY, READWRITE assert data["mode"] in ["IMPORT", "READONLY", "READWRITE"] @pytest.mark.asyncio async def test_update_mode(client: httpx.AsyncClient): """Test updating the Schema Registry mode.""" # Get current mode first current_response = await client.get(f"{MCP_SERVER_URL}/mode") assert current_response.status_code == 200 current_mode = current_response.json() # Try to update mode (this might fail in some Schema Registry setups) response = await client.put(f"{MCP_SERVER_URL}/mode", json={"mode": "READWRITE"}) # Mode changes might be restricted in some configurations assert response.status_code in [200, 403, 422, 500] if response.status_code == 200: data = response.json() assert "mode" in data @pytest.mark.asyncio async def test_get_subject_mode(client: httpx.AsyncClient, test_subject: str): """Test getting mode for a specific subject.""" # First register a schema to ensure subject exists await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Try to get subject mode (might return 404 if no specific mode set) response = await client.get(f"{MCP_SERVER_URL}/mode/{test_subject}") # Either returns the subject-specific mode or 404/500 if not supported assert response.status_code in [200, 404, 500] if response.status_code == 200: data = response.json() assert "mode" in data @pytest.mark.asyncio async def test_update_subject_mode(client: httpx.AsyncClient, test_subject: str): """Test updating mode for a specific subject.""" # First register a schema to ensure subject exists await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Try to set subject-specific mode (might not be supported) response = await client.put(f"{MCP_SERVER_URL}/mode/{test_subject}", json={"mode": "READWRITE"}) # Subject-level mode might not be supported in all Schema Registry versions assert response.status_code in [200, 404, 422, 500] @pytest.mark.asyncio async def test_mode_with_context(client: httpx.AsyncClient, test_subject: str): """Test mode management within a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context first await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") # Register schema in context await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Get mode for context (may not be supported by all Schema Registry versions) response = await client.get(f"{MCP_SERVER_URL}/mode?context={context_name}") # Context-aware mode might not be supported, so we accept various outcomes assert response.status_code in [200, 404, 500] # Integration Tests for Config and Mode @pytest.mark.asyncio async def test_config_mode_integration(client: httpx.AsyncClient, test_subject: str): """Test integration between configuration and mode settings.""" # Register a schema await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Get both config and mode to ensure they're accessible config_response = await client.get(f"{MCP_SERVER_URL}/config") mode_response = await client.get(f"{MCP_SERVER_URL}/mode") assert config_response.status_code == 200 assert mode_response.status_code == 200 config_data = config_response.json() mode_data = mode_response.json() assert "compatibilityLevel" in config_data assert "mode" in mode_data # Error Handling Tests for Config and Mode @pytest.mark.asyncio async def test_invalid_compatibility_level(client: httpx.AsyncClient): """Test handling of invalid compatibility levels.""" response = await client.put(f"{MCP_SERVER_URL}/config", json={"compatibility": "INVALID_LEVEL"}) # Should return error for invalid compatibility level assert response.status_code in [400, 422, 500] @pytest.mark.asyncio async def test_invalid_mode(client: httpx.AsyncClient): """Test handling of invalid modes.""" response = await client.put(f"{MCP_SERVER_URL}/mode", json={"mode": "INVALID_MODE"}) # Should return error for invalid mode assert response.status_code in [400, 422, 500] @pytest.mark.asyncio async def test_config_nonexistent_subject(client: httpx.AsyncClient): """Test getting config for non-existent subject.""" response = await client.get(f"{MCP_SERVER_URL}/config/non-existent-subject") # Should return 404 for non-existent subject assert response.status_code == 404 @pytest.mark.asyncio async def test_mode_nonexistent_subject(client: httpx.AsyncClient): """Test getting mode for non-existent subject.""" response = await client.get(f"{MCP_SERVER_URL}/mode/non-existent-subject") # Should return 404 for non-existent subject or inherit global mode assert response.status_code in [404, 500] # Schema Export Tests @pytest.mark.asyncio async def test_export_single_schema_json(client: httpx.AsyncClient, test_subject: str): """Test exporting a single schema in JSON format.""" # Register a schema first await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Export schema in JSON format response = await client.get(f"{MCP_SERVER_URL}/export/schemas/{test_subject}?format=json") assert response.status_code == 200 data = response.json() assert data["subject"] == test_subject assert data["version"] == 1 assert "id" in data assert "schema" in data assert data["schemaType"] == "AVRO" assert "metadata" in data assert data["metadata"]["exported_at"] @pytest.mark.asyncio async def test_export_single_schema_avro_idl(client: httpx.AsyncClient, test_subject: str): """Test exporting a single schema in Avro IDL format.""" # Register a schema first await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Export schema in IDL format response = await client.get(f"{MCP_SERVER_URL}/export/schemas/{test_subject}?format=avro_idl") assert response.status_code == 200 assert response.headers["content-type"] == "text/plain; charset=utf-8" idl_content = response.text assert "record User" in idl_content assert "int id" in idl_content assert "string name" in idl_content @pytest.mark.asyncio async def test_export_schema_with_context(client: httpx.AsyncClient, test_subject: str): """Test exporting a schema from a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Export schema from context response = await client.get(f"{MCP_SERVER_URL}/export/schemas/{test_subject}?context={context_name}") assert response.status_code == 200 data = response.json() # Schema Registry returns subject with context prefix expected_subject = f":.{context_name}:{test_subject}" assert data["subject"] == expected_subject assert data["metadata"]["context"] == context_name @pytest.mark.asyncio async def test_export_subject_all_versions(client: httpx.AsyncClient, test_subject: str): """Test exporting all versions of a subject.""" # Register multiple versions await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) # Export all versions response = await client.post( f"{MCP_SERVER_URL}/export/subjects/{test_subject}", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 data = response.json() assert data["subject"] == test_subject assert len(data["versions"]) == 2 assert data["versions"][0]["version"] == 1 assert data["versions"][1]["version"] == 2 # Check that both schemas are different schema1 = data["versions"][0]["schema"] schema2 = data["versions"][1]["schema"] assert schema1 != schema2 @pytest.mark.asyncio async def test_export_subject_latest_only(client: httpx.AsyncClient, test_subject: str): """Test exporting only the latest version of a subject.""" # Register multiple versions await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) # Export latest version only response = await client.post( f"{MCP_SERVER_URL}/export/subjects/{test_subject}", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "latest", }, ) assert response.status_code == 200 data = response.json() assert data["subject"] == test_subject assert len(data["versions"]) == 1 assert data["versions"][0]["version"] == 2 # Should be the latest @pytest.mark.asyncio async def test_export_context_json(client: httpx.AsyncClient, test_subject: str): """Test exporting all schemas from a context in JSON format.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register multiple subjects await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") # Register first subject await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Register second subject test_subject_2 = f"{test_subject}-2" await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject_2, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO", "context": context_name, }, ) # Export entire context response = await client.post( f"{MCP_SERVER_URL}/export/contexts/{context_name}", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 data = response.json() assert data["context"] == context_name assert "exported_at" in data assert isinstance(data["subjects"], list) assert len(data["subjects"]) >= 2 # Should have at least our two subjects # Find our subjects in the export exported_subjects = {subj["subject"] for subj in data["subjects"]} assert test_subject in exported_subjects assert test_subject_2 in exported_subjects @pytest.mark.asyncio async def test_export_context_bundle(client: httpx.AsyncClient, test_subject: str): """Test exporting a context as a ZIP bundle.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # Export context as bundle response = await client.post( f"{MCP_SERVER_URL}/export/contexts/{context_name}", json={ "format": "bundle", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 assert response.headers["content-type"] == "application/zip" assert "attachment" in response.headers["content-disposition"] assert f"{context_name}_export.zip" in response.headers["content-disposition"] # Check that we got a ZIP file zip_content = response.content assert len(zip_content) > 0 assert zip_content[:2] == b"PK" # ZIP file magic number @pytest.mark.asyncio async def test_list_exportable_subjects_default_context(client: httpx.AsyncClient, test_subject: str): """Test listing exportable subjects in default context.""" # Register a schema in default context await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # List exportable subjects response = await client.get(f"{MCP_SERVER_URL}/export/subjects") assert response.status_code == 200 data = response.json() assert data["context"] is None # Default context assert isinstance(data["subjects"], list) assert data["total_subjects"] > 0 # Find our subject subject_found = False for subj in data["subjects"]: if subj["subject"] == test_subject: subject_found = True assert subj["version_count"] == 1 assert subj["latest_version"] == 1 break assert subject_found, f"Subject {test_subject} not found in export list" @pytest.mark.asyncio async def test_list_exportable_subjects_with_context(client: httpx.AsyncClient, test_subject: str): """Test listing exportable subjects in a specific context.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context and register schema await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) # List exportable subjects in context response = await client.get(f"{MCP_SERVER_URL}/export/subjects?context={context_name}") assert response.status_code == 200 data = response.json() assert data["context"] == context_name assert isinstance(data["subjects"], list) assert data["total_subjects"] > 0 # Find our subject subject_found = False for subj in data["subjects"]: if subj["subject"] == test_subject: subject_found = True assert subj["context"] == context_name assert subj["version_count"] == 1 assert subj["latest_version"] == 1 break assert subject_found, f"Subject {test_subject} not found in context export list" @pytest.mark.asyncio async def test_export_global_json(client: httpx.AsyncClient, test_subject: str): """Test exporting all schemas globally in JSON format.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Register schema in default context await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Create context and register schema there too await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": f"{test_subject}-context", "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO", "context": context_name, }, ) # Export everything globally response = await client.post( f"{MCP_SERVER_URL}/export/global", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 data = response.json() assert "exported_at" in data assert "contexts" in data assert "default_context" in data # Should have our created context context_names = [ctx["context"] for ctx in data["contexts"]] assert context_name in context_names # Default context should have our default schema default_subjects = [subj["subject"] for subj in data["default_context"]["subjects"]] assert test_subject in default_subjects @pytest.mark.asyncio async def test_export_global_bundle(client: httpx.AsyncClient, test_subject: str): """Test exporting all schemas globally as a ZIP bundle.""" # Register a schema in default context await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Export everything as bundle response = await client.post( f"{MCP_SERVER_URL}/export/global", json={ "format": "bundle", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 assert response.headers["content-type"] == "application/zip" assert "attachment" in response.headers["content-disposition"] assert "schema_registry_export_" in response.headers["content-disposition"] # Check that we got a ZIP file zip_content = response.content assert len(zip_content) > 0 assert zip_content[:2] == b"PK" # ZIP file magic number @pytest.mark.asyncio async def test_export_schema_specific_version(client: httpx.AsyncClient, test_subject: str): """Test exporting a specific version of a schema.""" # Register multiple versions await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) # Export version 1 specifically response = await client.get(f"{MCP_SERVER_URL}/export/schemas/{test_subject}?version=1") assert response.status_code == 200 data = response.json() assert data["subject"] == test_subject assert data["version"] == 1 # The schema should be the V1 schema (no email field) schema_obj = json.loads(data["schema"]) field_names = [field["name"] for field in schema_obj["fields"]] assert "id" in field_names assert "name" in field_names assert "email" not in field_names # V1 doesn't have email @pytest.mark.asyncio async def test_export_subject_specific_version(client: httpx.AsyncClient, test_subject: str): """Test exporting a specific version using subject export.""" # Register multiple versions await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO"}, ) # Export version 2 specifically response = await client.post( f"{MCP_SERVER_URL}/export/subjects/{test_subject}", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "2", }, ) assert response.status_code == 200 data = response.json() assert data["subject"] == test_subject assert len(data["versions"]) == 1 assert data["versions"][0]["version"] == 2 # The schema should be the V2 schema (has email field) schema_obj = json.loads(data["versions"][0]["schema"]) field_names = [field["name"] for field in schema_obj["fields"]] assert "id" in field_names assert "name" in field_names assert "email" in field_names # V2 has email @pytest.mark.asyncio async def test_export_nonexistent_schema(client: httpx.AsyncClient): """Test exporting a non-existent schema.""" response = await client.get(f"{MCP_SERVER_URL}/export/schemas/non-existent-subject") assert response.status_code == 500 # Should fail with server error @pytest.mark.asyncio async def test_export_nonexistent_context(client: httpx.AsyncClient): """Test exporting from a non-existent context.""" response = await client.post( f"{MCP_SERVER_URL}/export/contexts/non-existent-context", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 # Should handle gracefully data = response.json() assert data["context"] == "non-existent-context" assert data["subjects"] == [] # Should be empty @pytest.mark.asyncio async def test_export_metadata_inclusion(client: httpx.AsyncClient, test_subject: str): """Test that export metadata is properly included.""" # Register a schema await client.post( f"{MCP_SERVER_URL}/schemas", json={"subject": test_subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO"}, ) # Export with metadata response = await client.get(f"{MCP_SERVER_URL}/export/schemas/{test_subject}?include_metadata=true") assert response.status_code == 200 data = response.json() assert "metadata" in data assert "exported_at" in data["metadata"] assert "registry_url" in data["metadata"] assert data["metadata"]["registry_url"] == SCHEMA_REGISTRY_URL # Export without metadata response = await client.get(f"{MCP_SERVER_URL}/export/schemas/{test_subject}?include_metadata=false") assert response.status_code == 200 data = response.json() # Metadata should still be present as it's part of the model, but might be minimal assert "metadata" in data # Performance and Edge Case Tests @pytest.mark.asyncio async def test_export_large_context(client: httpx.AsyncClient): """Test exporting a context with multiple subjects and versions.""" import uuid context_name = f"test-context-{uuid.uuid4().hex[:8]}" # Create context await client.post(f"{MCP_SERVER_URL}/contexts/{context_name}") # Register multiple subjects with multiple versions subjects = [f"subject-{i}" for i in range(5)] for subject in subjects: # Register 2 versions for each subject await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": subject, "schema": AVRO_SCHEMA_V1, "schemaType": "AVRO", "context": context_name, }, ) await client.post( f"{MCP_SERVER_URL}/schemas", json={ "subject": subject, "schema": AVRO_SCHEMA_V2, "schemaType": "AVRO", "context": context_name, }, ) # Export the entire context response = await client.post( f"{MCP_SERVER_URL}/export/contexts/{context_name}", json={ "format": "json", "include_metadata": True, "include_config": True, "include_versions": "all", }, ) assert response.status_code == 200 data = response.json() assert data["context"] == context_name assert len(data["subjects"]) >= 5 # Should have at least our 5 subjects # Each subject should have 2 versions for subject_export in data["subjects"]: if subject_export["subject"] in subjects: assert len(subject_export["versions"]) == 2

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server