Skip to main content
Glama
test_elicitation.py45.1 kB
#!/usr/bin/env python3 """ Tests for the elicitation system functionality """ import asyncio import os import sys from datetime import datetime, timedelta from unittest.mock import Mock, patch import pytest # Add parent directory to path to import modules from project root sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import elicitation modules from elicitation import ( ElicitationField, ElicitationManager, ElicitationPriority, ElicitationRequest, ElicitationResponse, ElicitationType, create_compatibility_resolution_elicitation, create_context_metadata_elicitation, create_export_preferences_elicitation, create_migrate_schema_elicitation, create_migration_preferences_elicitation, create_schema_field_elicitation, elicit_with_fallback, is_elicitation_supported, mock_elicit, ) from interactive_tools import ( check_compatibility_interactive, create_context_interactive, export_global_interactive, migrate_context_interactive, migrate_schema_interactive, register_schema_interactive, ) class TestElicitationCore: """Test core elicitation functionality.""" def test_elicitation_field_creation(self): """Test creating elicitation fields with various configurations.""" # Basic field field = ElicitationField(name="test_field", type="text", required=True) assert field.name == "test_field" assert field.type == "text" assert field.required is True assert field.label is None assert field.default is None # Complex field with all options field = ElicitationField( name="complex_field", type="choice", label="Complex Field", description="A complex field for testing", required=False, default="option1", options=["option1", "option2", "option3"], validation={"min_length": 3}, placeholder="Select an option", ) assert field.name == "complex_field" assert field.type == "choice" assert field.label == "Complex Field" assert field.description == "A complex field for testing" assert field.required is False assert field.default == "option1" assert field.options == ["option1", "option2", "option3"] assert field.validation == {"min_length": 3} assert field.placeholder == "Select an option" def test_elicitation_request_creation(self): """Test creating elicitation requests.""" fields = [ ElicitationField("field1", "text", required=True), ElicitationField("field2", "choice", options=["a", "b", "c"]), ] request = ElicitationRequest( type=ElicitationType.FORM, title="Test Request", description="A test elicitation request", fields=fields, priority=ElicitationPriority.HIGH, timeout_seconds=300, ) assert request.type == ElicitationType.FORM assert request.title == "Test Request" assert request.description == "A test elicitation request" assert len(request.fields) == 2 assert request.priority == ElicitationPriority.HIGH assert request.timeout_seconds == 300 assert request.expires_at is not None assert not request.is_expired() def test_elicitation_request_expiration(self): """Test elicitation request expiration logic.""" # Create request that expires in 1 second request = ElicitationRequest(title="Expiring Request", timeout_seconds=1) # Should not be expired immediately assert not request.is_expired() # Manually set expiration to past request.expires_at = datetime.utcnow() - timedelta(seconds=1) assert request.is_expired() def test_elicitation_request_to_dict(self): """Test serialization of elicitation requests.""" fields = [ ElicitationField( name="test_field", type="text", label="Test Field", required=True, default="default_value", ) ] request = ElicitationRequest( type=ElicitationType.TEXT, title="Test Request", fields=fields, context={"test": "context"}, ) data = request.to_dict() assert data["type"] == "text" assert data["title"] == "Test Request" assert len(data["fields"]) == 1 assert data["fields"][0]["name"] == "test_field" assert data["fields"][0]["type"] == "text" assert data["fields"][0]["label"] == "Test Field" assert data["fields"][0]["required"] is True assert data["fields"][0]["default"] == "default_value" assert data["context"] == {"test": "context"} def test_elicitation_response_creation(self): """Test creating elicitation responses.""" response = ElicitationResponse( request_id="test-request-123", values={"field1": "value1", "field2": "value2"}, complete=True, metadata={"source": "user"}, ) assert response.request_id == "test-request-123" assert response.values == {"field1": "value1", "field2": "value2"} assert response.complete is True assert response.metadata == {"source": "user"} assert response.timestamp is not None def test_elicitation_response_to_dict(self): """Test serialization of elicitation responses.""" response = ElicitationResponse( request_id="test-request-123", values={"test": "value"}, metadata={"source": "test"}, ) data = response.to_dict() assert data["request_id"] == "test-request-123" assert data["values"] == {"test": "value"} assert data["complete"] is True assert data["metadata"] == {"source": "test"} assert "timestamp" in data class TestElicitationManager: """Test elicitation manager functionality.""" @pytest.fixture def manager(self): """Create a fresh elicitation manager for each test.""" return ElicitationManager() @pytest.mark.asyncio async def test_create_request(self, manager): """Test creating and storing elicitation requests.""" request = ElicitationRequest(title="Test Request", timeout_seconds=300) request_id = await manager.create_request(request) assert request_id == request.id assert request_id in manager.pending_requests assert manager.get_request(request_id) == request @pytest.mark.asyncio async def test_submit_response(self, manager): """Test submitting responses to elicitation requests.""" request = ElicitationRequest( title="Test Request", fields=[ElicitationField("test", "text", required=True)], ) await manager.create_request(request) response = ElicitationResponse(request_id=request.id, values={"test": "value"}) success = await manager.submit_response(response) assert success is True assert manager.get_response(request.id) == response assert request.id not in manager.pending_requests @pytest.mark.asyncio async def test_submit_invalid_response(self, manager): """Test submitting invalid responses.""" request = ElicitationRequest( title="Test Request", fields=[ElicitationField("required_field", "text", required=True)], ) await manager.create_request(request) # Response missing required field response = ElicitationResponse(request_id=request.id, values={"wrong_field": "value"}) success = await manager.submit_response(response) assert success is False assert manager.get_response(request.id) is None assert request.id in manager.pending_requests @pytest.mark.asyncio async def test_wait_for_response(self, manager): """Test waiting for responses with timeout.""" request = ElicitationRequest(title="Test Request", timeout_seconds=1) # Short timeout for testing await manager.create_request(request) # Test timeout response = await manager.wait_for_response(request.id, timeout=0.5) assert response is None # Test successful response test_response = ElicitationResponse(request_id=request.id, values={"test": "value"}) # Submit response in background async def submit_response(): await asyncio.sleep(0.1) await manager.submit_response(test_response) # Start both tasks response_task = manager.wait_for_response(request.id, timeout=1.0) submit_task = submit_response() response, _ = await asyncio.gather(response_task, submit_task) assert response == test_response def test_list_pending_requests(self, manager): """Test listing pending requests.""" request1 = ElicitationRequest(title="Request 1") request2 = ElicitationRequest(title="Request 2") asyncio.run(manager.create_request(request1)) asyncio.run(manager.create_request(request2)) pending = manager.list_pending_requests() assert len(pending) == 2 assert request1 in pending assert request2 in pending def test_cancel_request(self, manager): """Test cancelling requests.""" request = ElicitationRequest(title="Test Request") asyncio.run(manager.create_request(request)) assert request.id in manager.pending_requests cancelled = manager.cancel_request(request.id) assert cancelled is True assert request.id not in manager.pending_requests @pytest.mark.asyncio async def test_timeout_handling(self, manager): """Test automatic timeout handling.""" request = ElicitationRequest(title="Timeout Test", timeout_seconds=0.1) # Very short timeout await manager.create_request(request) # Wait for timeout to trigger await asyncio.sleep(0.2) # Request should be automatically cleaned up assert request.id not in manager.pending_requests class TestElicitationHelpers: """Test elicitation helper functions.""" def test_create_schema_field_elicitation(self): """Test creating schema field elicitation requests.""" request = create_schema_field_elicitation( context="test-context", existing_fields=["existing_field1", "existing_field2"], ) assert request.type == ElicitationType.FORM assert request.title == "Define Schema Field" assert request.allow_multiple is True assert request.timeout_seconds == 600 assert request.context["existing_fields"] == [ "existing_field1", "existing_field2", ] assert request.context["schema_context"] == "test-context" # Check required fields field_names = [f.name for f in request.fields] assert "field_name" in field_names assert "field_type" in field_names assert "nullable" in field_names def test_create_migration_preferences_elicitation(self): """Test creating migration preferences elicitation requests.""" request = create_migration_preferences_elicitation( source_registry="source", target_registry="target", context="test-context" ) assert request.type == ElicitationType.FORM assert request.title == "Migration Preferences" assert "from source to target" in request.description assert request.context["source_registry"] == "source" assert request.context["target_registry"] == "target" assert request.context["context"] == "test-context" # Check required fields field_names = [f.name for f in request.fields] assert "preserve_ids" in field_names assert "migrate_all_versions" in field_names assert "conflict_resolution" in field_names assert "dry_run" in field_names def test_create_compatibility_resolution_elicitation(self): """Test creating compatibility resolution elicitation requests.""" errors = ["Error 1", "Error 2"] request = create_compatibility_resolution_elicitation(subject="test-subject", compatibility_errors=errors) assert request.type == ElicitationType.FORM assert request.title == "Resolve Compatibility Issues" assert "test-subject" in request.description assert request.context["subject"] == "test-subject" assert request.context["compatibility_errors"] == errors # Check required fields field_names = [f.name for f in request.fields] assert "resolution_strategy" in field_names assert "compatibility_level" in field_names def test_create_context_metadata_elicitation(self): """Test creating context metadata elicitation requests.""" request = create_context_metadata_elicitation("test-context") assert request.type == ElicitationType.FORM assert request.title == "Context Metadata" assert "test-context" in request.description assert request.context["context_name"] == "test-context" # Check fields field_names = [f.name for f in request.fields] assert "description" in field_names assert "owner" in field_names assert "environment" in field_names assert "tags" in field_names def test_create_export_preferences_elicitation(self): """Test creating export preferences elicitation requests.""" request = create_export_preferences_elicitation("global_export") assert request.type == ElicitationType.FORM assert request.title == "Export Preferences" assert "global_export" in request.description assert request.context["operation"] == "global_export" # Check fields field_names = [f.name for f in request.fields] assert "format" in field_names assert "include_metadata" in field_names assert "include_versions" in field_names assert "compression" in field_names def test_create_migrate_schema_elicitation_new_schema(self): """Test creating migrate schema elicitation for new schema (doesn't exist in target).""" request = create_migrate_schema_elicitation( subject="test-subject", source_registry="source-reg", target_registry="target-reg", schema_exists_in_target=False, context="test-context", ) assert request.type == ElicitationType.FORM assert request.title == "Schema Migration Preferences" assert "test-subject" in request.description assert "source-reg" in request.description assert "target-reg" in request.description assert request.context["subject"] == "test-subject" assert request.context["source_registry"] == "source-reg" assert request.context["target_registry"] == "target-reg" assert request.context["schema_exists_in_target"] is False # Check fields - should NOT include replacement fields field_names = [f.name for f in request.fields] assert "replace_existing" not in field_names assert "backup_before_replace" not in field_names assert "preserve_ids" in field_names assert "compare_after_migration" in field_names assert "migrate_all_versions" in field_names assert "dry_run" in field_names def test_create_migrate_schema_elicitation_existing_schema(self): """Test creating migrate schema elicitation for existing schema in target.""" existing_versions = [1, 2, 3] request = create_migrate_schema_elicitation( subject="existing-subject", source_registry="source-reg", target_registry="target-reg", schema_exists_in_target=True, existing_versions=existing_versions, context="test-context", ) assert request.type == ElicitationType.FORM assert request.title == "Schema Migration Preferences" assert "already exists" in request.description assert str(existing_versions) in request.description assert request.context["schema_exists_in_target"] is True assert request.context["existing_versions"] == existing_versions # Check fields - should include replacement fields field_names = [f.name for f in request.fields] assert "replace_existing" in field_names assert "backup_before_replace" in field_names assert "preserve_ids" in field_names assert "compare_after_migration" in field_names assert "migrate_all_versions" in field_names assert "dry_run" in field_names # Check replace_existing field details replace_field = next(f for f in request.fields if f.name == "replace_existing") assert replace_field.required is True assert replace_field.default == "false" assert "already exists" in replace_field.description # Check backup field backup_field = next(f for f in request.fields if f.name == "backup_before_replace") assert backup_field.required is False assert backup_field.default == "true" @pytest.mark.asyncio async def test_mock_elicit(self): """Test mock elicitation function.""" request = ElicitationRequest( title="Test Mock", fields=[ ElicitationField("text_field", "text", placeholder="test placeholder"), ElicitationField("choice_field", "choice", options=["a", "b", "c"]), ElicitationField("default_field", "text", default="default_value"), ], ) response = await mock_elicit(request) assert response is not None assert response.request_id == request.id assert response.complete is True assert response.metadata["source"] == "mock_fallback" assert response.metadata["auto_generated"] is True # Check that defaults are applied assert response.values["choice_field"] == "a" # First option assert response.values["default_field"] == "default_value" assert response.values["text_field"] == "test placeholder" @pytest.mark.asyncio async def test_elicit_with_fallback(self): """Test elicitation with fallback mechanism.""" request = ElicitationRequest( title="Fallback Test", fields=[ElicitationField("test", "text", default="fallback_value")], ) # Should use fallback (mock) implementation response = await elicit_with_fallback(request) assert response is not None assert response.values["test"] == "fallback_value" assert response.metadata["source"] == "mock_fallback" def test_is_elicitation_supported(self): """Test elicitation support detection.""" # Currently always returns True in our implementation assert is_elicitation_supported() is True class TestInteractiveTools: """Test interactive tool implementations.""" def setup_method(self): """Set up test fixtures for each test method.""" # Mock registry components self.mock_registry_manager = Mock() self.mock_auth = Mock() self.mock_headers = {"Content-Type": "application/json"} self.registry_mode = "multi" self.schema_registry_url = "http://test-registry:8081" @pytest.mark.asyncio async def test_register_schema_interactive_complete_schema(self): """Test interactive schema registration with complete schema.""" # Mock the core register_schema_tool mock_register_tool = Mock(return_value={"success": True, "id": 123}) complete_schema = { "type": "record", "name": "TestSchema", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, ], } result = await register_schema_interactive( subject="test-subject", schema_definition=complete_schema, register_schema_tool=mock_register_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Should call the original tool directly without elicitation mock_register_tool.assert_called_once() assert result["success"] is True assert result["elicitation_used"] is False @pytest.mark.asyncio async def test_register_schema_interactive_incomplete_schema(self): """Test interactive schema registration with incomplete schema.""" # Mock the core register_schema_tool mock_register_tool = Mock(return_value={"success": True, "id": 123}) # Schema with no fields incomplete_schema = {"type": "record", "name": "TestSchema", "fields": []} # Mock elicitation to return field definition with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "field_name": "test_field", "field_type": "string", "nullable": "false", "documentation": "Test field", } mock_elicit.return_value = mock_response result = await register_schema_interactive( subject="test-subject", schema_definition=incomplete_schema, register_schema_tool=mock_register_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Should have used elicitation assert result["elicitation_used"] is True assert "test_field" in result["elicited_fields"] mock_register_tool.assert_called_once() @pytest.mark.asyncio async def test_migrate_context_interactive_with_preferences(self): """Test interactive context migration with all preferences provided.""" # Mock the core migrate_context_tool mock_migrate_tool = Mock(return_value={"success": True, "migrated": 5}) result = await migrate_context_interactive( source_registry="source", target_registry="target", preserve_ids=True, dry_run=False, migrate_all_versions=True, migrate_context_tool=mock_migrate_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should call the original tool directly without elicitation mock_migrate_tool.assert_called_once() assert result["success"] is True assert result["elicitation_used"] is False @pytest.mark.asyncio async def test_migrate_context_interactive_missing_preferences(self): """Test interactive context migration with missing preferences.""" # Mock the core migrate_context_tool mock_migrate_tool = Mock(return_value={"success": True, "migrated": 5}) # Mock elicitation to return preferences with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "preserve_ids": "true", "dry_run": "false", "migrate_all_versions": "true", } mock_elicit.return_value = mock_response result = await migrate_context_interactive( source_registry="source", target_registry="target", preserve_ids=None, # Missing dry_run=None, # Missing migrate_all_versions=None, # Missing migrate_context_tool=mock_migrate_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should have used elicitation assert result["elicitation_used"] is True assert result["elicited_preferences"]["preserve_ids"] is True assert result["elicited_preferences"]["dry_run"] is False assert result["elicited_preferences"]["migrate_all_versions"] is True mock_migrate_tool.assert_called_once() @pytest.mark.asyncio async def test_check_compatibility_interactive_compatible(self): """Test interactive compatibility check with compatible schema.""" # Mock compatibility tool to return compatible result mock_compatibility_tool = Mock(return_value={"compatible": True, "messages": []}) result = await check_compatibility_interactive( subject="test-subject", schema_definition={"type": "string"}, check_compatibility_tool=mock_compatibility_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Should not use elicitation for compatible schemas assert result["compatible"] is True assert result["resolution_guidance"]["strategy"] == "none_needed" assert result["resolution_guidance"]["elicitation_used"] is False @pytest.mark.asyncio async def test_check_compatibility_interactive_incompatible(self): """Test interactive compatibility check with incompatible schema.""" # Mock compatibility tool to return incompatible result mock_compatibility_tool = Mock( return_value={ "compatible": False, "messages": ["Field removed", "Type changed"], } ) # Mock elicitation to return resolution strategy with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "resolution_strategy": "modify_schema", "compatibility_level": "FORWARD", "notes": "Make fields optional", } mock_elicit.return_value = mock_response result = await check_compatibility_interactive( subject="test-subject", schema_definition={"type": "string"}, check_compatibility_tool=mock_compatibility_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Should have used elicitation for resolution guidance assert result["compatible"] is False assert result["resolution_guidance"]["strategy"] == "modify_schema" assert result["resolution_guidance"]["compatibility_level"] == "FORWARD" assert result["resolution_guidance"]["notes"] == "Make fields optional" assert result["resolution_guidance"]["elicitation_used"] is True @pytest.mark.asyncio async def test_create_context_interactive_with_metadata(self): """Test interactive context creation with metadata elicitation.""" # Mock the core create_context_tool mock_create_tool = Mock(return_value={"success": True, "context": "test-context"}) # Mock elicitation to return metadata with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "description": "Test context for unit tests", "owner": "test-team", "environment": "testing", "tags": "unit-test,schema", } mock_elicit.return_value = mock_response result = await create_context_interactive( context="test-context", create_context_tool=mock_create_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Should have used elicitation for metadata assert result["elicitation_used"] is True assert result["metadata"]["description"] == "Test context for unit tests" assert result["metadata"]["owner"] == "test-team" assert result["metadata"]["environment"] == "testing" assert result["metadata"]["tags"] == ["unit-test", "schema"] mock_create_tool.assert_called_once() @pytest.mark.asyncio async def test_export_global_interactive_with_preferences(self): """Test interactive global export with preference elicitation.""" # Mock the core export_global_tool mock_export_tool = Mock(return_value={"success": True, "exported": 10}) # Mock elicitation to return export preferences with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "format": "yaml", "include_metadata": "true", "include_versions": "latest", "compression": "gzip", } mock_elicit.return_value = mock_response result = await export_global_interactive( registry="test-registry", export_global_tool=mock_export_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should have used elicitation for preferences assert result["elicitation_used"] is True assert result["export_preferences"]["format"] == "yaml" assert result["export_preferences"]["compression"] == "gzip" assert result["export_preferences"]["include_metadata"] is True assert result["export_preferences"]["include_versions"] == "latest" mock_export_tool.assert_called_once() @pytest.mark.asyncio async def test_migrate_schema_interactive_no_elicitation_needed(self): """Test interactive schema migration when all preferences are provided.""" # Mock the core migrate_schema_tool mock_migrate_tool = Mock(return_value={"success": True, "migrated_versions": [1, 2]}) # Mock registry client for checking schema existence mock_target_client = Mock() mock_target_client.config.url = "http://target-registry:8081" mock_target_client.auth = self.mock_auth mock_target_client.headers = self.mock_headers self.mock_registry_manager.get_registry.return_value = mock_target_client # Mock requests.get to simulate schema doesn't exist (404) with patch("interactive_tools.requests.get") as mock_get: mock_get.return_value.status_code = 404 result = await migrate_schema_interactive( subject="test-subject", source_registry="source", target_registry="target", preserve_ids=True, dry_run=False, migrate_all_versions=True, migrate_schema_tool=mock_migrate_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should call the original tool directly without elicitation mock_migrate_tool.assert_called_once() assert result["success"] is True assert result["elicitation_used"] is False assert result["schema_existed_in_target"] is False @pytest.mark.asyncio async def test_migrate_schema_interactive_schema_exists_allow_replace(self): """Test interactive schema migration when schema exists and user allows replacement.""" # Mock the core migrate_schema_tool mock_migrate_tool = Mock(return_value={"success": True, "migrated_versions": [1, 2]}) mock_export_tool = Mock(return_value={"success": True, "backup_created": True}) # Mock registry client for checking schema existence mock_target_client = Mock() mock_target_client.config.url = "http://target-registry:8081" mock_target_client.auth = self.mock_auth mock_target_client.headers = self.mock_headers self.mock_registry_manager.get_registry.return_value = mock_target_client # Mock requests.get to simulate schema exists with patch("interactive_tools.requests.get") as mock_get: mock_get.return_value.status_code = 200 mock_get.return_value.json.return_value = [1, 2, 3] # Mock elicitation to return user preferences with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "replace_existing": "true", "backup_before_replace": "true", "preserve_ids": "false", "compare_after_migration": "false", "migrate_all_versions": "true", "dry_run": "false", } mock_elicit.return_value = mock_response result = await migrate_schema_interactive( subject="existing-subject", source_registry="source", target_registry="target", migrate_schema_tool=mock_migrate_tool, export_schema_tool=mock_export_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should have used elicitation and allowed replacement assert result["elicitation_used"] is True assert result["schema_existed_in_target"] is True assert result["elicited_preferences"]["replace_existing"] is True assert result["elicited_preferences"]["backup_before_replace"] is True assert result["backup_result"]["success"] is True mock_migrate_tool.assert_called_once() mock_export_tool.assert_called_once() @pytest.mark.asyncio async def test_migrate_schema_interactive_schema_exists_decline_replace(self): """Test interactive schema migration when schema exists and user declines replacement.""" # Mock registry client for checking schema existence mock_target_client = Mock() mock_target_client.config.url = "http://target-registry:8081" mock_target_client.auth = self.mock_auth mock_target_client.headers = self.mock_headers self.mock_registry_manager.get_registry.return_value = mock_target_client # Mock requests.get to simulate schema exists with patch("interactive_tools.requests.get") as mock_get: mock_get.return_value.status_code = 200 mock_get.return_value.json.return_value = [1, 2] # Mock elicitation to return user declining replacement with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "replace_existing": "false", # User declines "backup_before_replace": "true", "preserve_ids": "true", "compare_after_migration": "true", "migrate_all_versions": "false", "dry_run": "true", } mock_elicit.return_value = mock_response result = await migrate_schema_interactive( subject="existing-subject", source_registry="source", target_registry="target", migrate_schema_tool=Mock(), # Should not be called registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should return error indicating replacement was declined assert "error" in result assert "MIGRATION_DECLINED_EXISTING_SCHEMA" in result["error_code"] assert "existing_versions" in result["details"] @pytest.mark.asyncio async def test_migrate_schema_interactive_with_verification(self): """Test interactive schema migration with post-migration verification.""" # Mock the core migrate_schema_tool mock_migrate_tool = Mock(return_value={"success": True, "migrated_versions": [1]}) # Mock registry clients for verification mock_source_client = Mock() mock_target_client = Mock() mock_source_client.get_schema.return_value = { "schema": {"type": "record", "name": "Test"}, "schemaType": "AVRO", "id": 123, } mock_target_client.get_schema.return_value = { "schema": {"type": "record", "name": "Test"}, "schemaType": "AVRO", "id": 123, } def get_registry_side_effect(name): if name == "source": return mock_source_client elif name == "target": return mock_target_client return None self.mock_registry_manager.get_registry.side_effect = get_registry_side_effect # Mock requests.get to simulate schema doesn't exist initially with patch("interactive_tools.requests.get") as mock_get: mock_get.return_value.status_code = 404 # Mock elicitation to request verification with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = Mock() mock_response.complete = True mock_response.values = { "preserve_ids": "true", "compare_after_migration": "true", "migrate_all_versions": "false", "dry_run": "false", } mock_elicit.return_value = mock_response result = await migrate_schema_interactive( subject="verify-subject", source_registry="source", target_registry="target", migrate_schema_tool=mock_migrate_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should have performed verification assert result["elicitation_used"] is True assert result["elicited_preferences"]["compare_after_migration"] is True assert "verification_result" in result assert result["verification_result"]["verification_type"] == "basic" assert result["verification_result"]["overall_success"] is True # Check verification details checks = result["verification_result"]["checks"] check_names = [check["check"] for check in checks] assert "schema_exists_in_target" in check_names assert "schema_content_match" in check_names assert "schema_type_match" in check_names assert "id_preservation" in check_names @pytest.mark.asyncio async def test_migrate_schema_interactive_elicitation_fails(self): """Test graceful fallback when elicitation fails.""" # Mock registry client mock_target_client = Mock() mock_target_client.config.url = "http://target-registry:8081" mock_target_client.auth = self.mock_auth mock_target_client.headers = self.mock_headers self.mock_registry_manager.get_registry.return_value = mock_target_client # Mock requests.get to simulate schema exists (trigger elicitation) with patch("interactive_tools.requests.get") as mock_get: mock_get.return_value.status_code = 200 mock_get.return_value.json.return_value = [1] # Mock elicitation to fail with patch("interactive_tools.elicit_with_fallback", return_value=None): result = await migrate_schema_interactive( subject="fail-subject", source_registry="source", target_registry="target", migrate_schema_tool=Mock(), # Should not be called registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Should return error indicating elicitation failed assert "error" in result assert "INCOMPLETE_MIGRATION_PREFERENCES" in result["error_code"] assert "elicitation_status" in result["details"] @pytest.mark.asyncio async def test_migrate_schema_interactive_single_registry_mode(self): """Test that interactive migration works in single registry mode.""" result = await migrate_schema_interactive( subject="test-subject", source_registry="source", target_registry="target", migrate_schema_tool=Mock(), registry_manager=self.mock_registry_manager, registry_mode="single", # Single registry mode ) # Should work but without existence checking (no elicitation triggered) # The actual behavior depends on the migrate_schema_tool implementation # In this test we're just ensuring no errors occur in single mode assert result is not None # Ensure the function completes without error class TestElicitationIntegration: """Test elicitation integration with MCP server.""" def test_elicitation_management_tools_structure(self): """Test that elicitation management tools are properly structured.""" # This would test the actual MCP tool implementations # For now, we'll test the function signatures and basic structure from kafka_schema_registry_unified_mcp import ( cancel_elicitation_request, get_elicitation_request, get_elicitation_status, list_elicitation_requests, ) # Test that functions exist and are callable assert callable(list_elicitation_requests) assert callable(get_elicitation_request) assert callable(cancel_elicitation_request) assert callable(get_elicitation_status) @pytest.mark.asyncio async def test_elicitation_timeout_cleanup(self): """Test that expired elicitation requests are properly cleaned up.""" manager = ElicitationManager() # Create request with very short timeout request = ElicitationRequest(title="Cleanup Test", timeout_seconds=0.1) await manager.create_request(request) assert request.id in manager.pending_requests # Wait for cleanup await asyncio.sleep(0.2) # Request should be cleaned up assert request.id not in manager.pending_requests def test_elicitation_field_validation(self): """Test field validation in elicitation responses.""" manager = ElicitationManager() # Create request with validation rules fields = [ ElicitationField("email", "email", required=True), ElicitationField("choice", "choice", options=["a", "b", "c"], required=True), ] request = ElicitationRequest(title="Validation Test", fields=fields) # Test invalid email response = ElicitationResponse(request_id=request.id, values={"email": "invalid-email", "choice": "a"}) assert not manager._validate_response(request, response) # Test invalid choice response = ElicitationResponse( request_id=request.id, values={"email": "test@example.com", "choice": "invalid"}, ) assert not manager._validate_response(request, response) # Test valid response response = ElicitationResponse(request_id=request.id, values={"email": "test@example.com", "choice": "a"}) assert manager._validate_response(request, response) if __name__ == "__main__": # Run the tests pytest.main([__file__, "-v"])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server