Skip to main content
Glama
test_elicitation_integration.py37.9 kB
#!/usr/bin/env python3 """ Integration tests for the elicitation system with MCP """ import asyncio import logging import os import sys from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest # Add project root to Python path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from elicitation import ( ElicitationField, ElicitationManager, ElicitationRequest, ElicitationResponse, ElicitationType, ) from interactive_tools import ( check_compatibility_interactive, create_context_interactive, export_global_interactive, migrate_context_interactive, migrate_schema_interactive, register_schema_interactive, ) logger = logging.getLogger(__name__) class TestElicitationIntegration: """Test elicitation integration with the full system.""" def setup_method(self): """Set up test fixtures for each test method.""" # Create a fresh elicitation manager for each test self.manager = ElicitationManager() # Mock MCP instance self.mock_mcp = MagicMock() self.mock_mcp.send_elicitation_request = AsyncMock() self.mock_mcp.send_notification = AsyncMock() self.mock_mcp.call_method = AsyncMock() self.mock_mcp.create_resource = AsyncMock() # Mock registry components self.mock_registry_manager = MagicMock() self.mock_auth = MagicMock() self.mock_headers = {"Content-Type": "application/json"} self.registry_mode = "multi" self.schema_registry_url = "http://test-registry:8081" @pytest.mark.asyncio async def test_end_to_end_schema_registration_workflow(self): """Test complete interactive schema registration workflow.""" # Test scenario: User wants to register a schema but provides incomplete information # Step 1: Call interactive schema registration with minimal info subject = "test-user-events" incomplete_schema = { "type": "record", "name": "UserEvent", "fields": [], # No fields defined - should trigger elicitation } # Mock the underlying schema registration tool mock_register_tool = MagicMock(return_value={"success": True, "id": 123, "subject": subject, "version": 1}) # Mock elicitation response simulating user input with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = ElicitationResponse( request_id="test-request-123", values={ "field_name": "user_id", "field_type": "string", "nullable": "false", "default_value": "", "documentation": "Unique identifier for the user", }, complete=True, metadata={"source": "user_input"}, ) mock_elicit.return_value = mock_response # Execute the interactive registration result = await register_schema_interactive( subject=subject, schema_definition=incomplete_schema, schema_type="AVRO", context="user-events", registry="test-registry", register_schema_tool=mock_register_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Verify elicitation was triggered mock_elicit.assert_called_once() elicitation_request = mock_elicit.call_args[0][0] assert elicitation_request.type == ElicitationType.FORM assert elicitation_request.title == "Define Schema Field" # Verify result indicates elicitation was used assert result["success"] is True assert result["elicitation_used"] is True assert "user_id" in result["elicited_fields"] # Verify the underlying tool was called with enhanced schema mock_register_tool.assert_called_once() call_args = mock_register_tool.call_args enhanced_schema = call_args[1] # Second positional argument is schema_definition # The schema should now have the user-defined field assert len(enhanced_schema["fields"]) == 1 assert enhanced_schema["fields"][0]["name"] == "user_id" assert enhanced_schema["fields"][0]["type"] == "string" @pytest.mark.asyncio async def test_migration_preferences_elicitation_workflow(self): """Test interactive migration with preference elicitation.""" # Test scenario: User wants to migrate context but doesn't specify preferences source_registry = "prod-registry" target_registry = "dev-registry" context = "user-schemas" # Mock the underlying migration tool mock_migrate_tool = AsyncMock( return_value={ "success": True, "migrated_schemas": 5, "dry_run": True, "preserved_ids": True, } ) # Mock elicitation response with user preferences with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = ElicitationResponse( request_id="migration-request-456", values={ "preserve_ids": "true", "dry_run": "false", "migrate_all_versions": "true", "conflict_resolution": "overwrite", "batch_size": "25", }, complete=True, metadata={"source": "user_preferences"}, ) mock_elicit.return_value = mock_response # Execute interactive migration with missing preferences result = await migrate_context_interactive( source_registry=source_registry, target_registry=target_registry, context=context, preserve_ids=None, # Not specified - should trigger elicitation dry_run=None, # Not specified migrate_all_versions=None, # Not specified migrate_context_tool=mock_migrate_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Verify elicitation was triggered for migration preferences mock_elicit.assert_called_once() elicitation_request = mock_elicit.call_args[0][0] assert elicitation_request.type == ElicitationType.FORM assert elicitation_request.title == "Migration Preferences" assert source_registry in elicitation_request.description assert target_registry in elicitation_request.description # Verify the migration tool was called with elicited preferences mock_migrate_tool.assert_called_once() call_kwargs = mock_migrate_tool.call_args.kwargs assert call_kwargs["preserve_ids"] is True assert call_kwargs["dry_run"] is False assert call_kwargs["migrate_all_versions"] is True # Verify result indicates elicitation was used assert result["success"] is True assert result["elicitation_used"] is True assert result["elicited_preferences"]["preserve_ids"] is True @pytest.mark.asyncio async def test_compatibility_resolution_guidance(self): """Test interactive compatibility checking with resolution guidance.""" # Test scenario: Schema has compatibility issues, system guides user to resolution subject = "payment-events" incompatible_schema = { "type": "record", "name": "PaymentEvent", "fields": [ {"name": "payment_id", "type": "string"}, # This field removal would cause compatibility issues # {"name": "amount", "type": "double"}, # Missing field ], } # Mock compatibility check that finds issues mock_compatibility_tool = MagicMock( return_value={ "compatible": False, "messages": [ "Field 'amount' was removed, violating BACKWARD compatibility", "Schema evolution requires all existing fields to be preserved", ], } ) # Mock elicitation response with resolution strategy with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = ElicitationResponse( request_id="compatibility-request-789", values={ "resolution_strategy": "add_default_values", "compatibility_level": "FORWARD", "notes": "Add default values to maintain backward compatibility", }, complete=True, metadata={"source": "resolution_guidance"}, ) mock_elicit.return_value = mock_response # Execute interactive compatibility check result = await check_compatibility_interactive( subject=subject, schema_definition=incompatible_schema, schema_type="AVRO", context="payments", registry="prod-registry", check_compatibility_tool=mock_compatibility_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Verify compatibility check was performed mock_compatibility_tool.assert_called_once() # Verify elicitation was triggered for resolution mock_elicit.assert_called_once() elicitation_request = mock_elicit.call_args[0][0] assert elicitation_request.type == ElicitationType.FORM assert elicitation_request.title == "Resolve Compatibility Issues" assert subject in elicitation_request.description # Verify result contains resolution guidance assert result["compatible"] is False assert result["resolution_guidance"]["strategy"] == "add_default_values" assert result["resolution_guidance"]["compatibility_level"] == "FORWARD" assert result["resolution_guidance"]["elicitation_used"] is True @pytest.mark.asyncio async def test_context_metadata_collection(self): """Test interactive context creation with metadata collection.""" # Test scenario: Creating a new context with guided metadata collection context_name = "analytics-events" # Mock context creation tool mock_create_tool = MagicMock(return_value={"success": True, "context": context_name, "created": True}) # Mock elicitation response with metadata with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = ElicitationResponse( request_id="context-metadata-101", values={ "description": "Analytics and behavioral event schemas for data pipeline", "owner": "data-platform-team", "environment": "production", "tags": "analytics,events,pipeline,gdpr-compliant", }, complete=True, metadata={"source": "context_setup"}, ) mock_elicit.return_value = mock_response # Execute interactive context creation result = await create_context_interactive( context=context_name, registry="analytics-registry", # All metadata parameters are None - should trigger elicitation description=None, owner=None, environment=None, tags=None, create_context_tool=mock_create_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Verify elicitation was triggered for metadata mock_elicit.assert_called_once() elicitation_request = mock_elicit.call_args[0][0] assert elicitation_request.type == ElicitationType.FORM assert elicitation_request.title == "Context Metadata" assert context_name in elicitation_request.description # Verify context creation tool was called mock_create_tool.assert_called_once() # Verify result contains collected metadata assert result["success"] is True assert result["elicitation_used"] is True assert result["metadata"]["description"] == "Analytics and behavioral event schemas for data pipeline" assert result["metadata"]["owner"] == "data-platform-team" assert result["metadata"]["environment"] == "production" assert result["metadata"]["tags"] == [ "analytics", "events", "pipeline", "gdpr-compliant", ] @pytest.mark.asyncio async def test_export_preferences_workflow(self): """Test interactive export with format preferences.""" # Test scenario: User wants to export all schemas but needs guidance on format options registry = "compliance-registry" # Mock export tool mock_export_tool = MagicMock( return_value={ "success": True, "exported_schemas": 25, "export_format": "json", "file_size": "2.4MB", } ) # Mock elicitation response with export preferences with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = ElicitationResponse( request_id="export-prefs-202", values={ "format": "yaml", "include_metadata": "true", "include_versions": "latest", "compression": "gzip", }, complete=True, metadata={"source": "export_configuration"}, ) mock_elicit.return_value = mock_response # Execute interactive export with missing preferences result = await export_global_interactive( registry=registry, # All preferences are None - should trigger elicitation include_metadata=None, include_config=None, include_versions=None, format=None, compression=None, export_global_tool=mock_export_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Verify elicitation was triggered for export preferences mock_elicit.assert_called_once() elicitation_request = mock_elicit.call_args[0][0] assert elicitation_request.type == ElicitationType.FORM assert elicitation_request.title == "Export Preferences" assert "global_export" in elicitation_request.description # Verify export tool was called mock_export_tool.assert_called_once() # Verify result contains export preferences assert result["success"] is True assert result["elicitation_used"] is True assert result["export_preferences"]["format"] == "yaml" assert result["export_preferences"]["compression"] == "gzip" @pytest.mark.asyncio async def test_migrate_schema_interactive_workflow(self): """Test complete interactive schema migration workflow.""" # Test scenario: User wants to migrate a schema that already exists in target registry subject = "user-profile-events" source_registry = "development" target_registry = "staging" # Mock the underlying migration tool mock_migrate_tool = MagicMock( return_value={ "success": True, "total_versions": 3, "successful_migrations": 3, "failed_migrations": 0, "migration_details": [ {"version": 1, "status": "migrated", "source_id": 101, "target_id": 201}, {"version": 2, "status": "migrated", "source_id": 102, "target_id": 202}, {"version": 3, "status": "migrated", "source_id": 103, "target_id": 203}, ], } ) # Mock export tool for backup mock_export_tool = MagicMock( return_value={ "success": True, "subject": subject, "backup_location": "/backups/user-profile-events.json", "timestamp": "2024-01-15T10:30:00Z", } ) # Mock registry clients mock_source_client = MagicMock() mock_target_client = MagicMock() mock_target_client.config.url = "http://staging-registry:8081" mock_target_client.auth = self.mock_auth mock_target_client.headers = self.mock_headers # Mock schema verification responses mock_source_client.get_schema.return_value = { "schema": { "type": "record", "name": "UserProfileEvent", "fields": [ {"name": "user_id", "type": "string"}, {"name": "profile_data", "type": "string"}, {"name": "timestamp", "type": "long"}, ], }, "schemaType": "AVRO", "id": 103, "version": 3, } mock_target_client.get_schema.return_value = { "schema": { "type": "record", "name": "UserProfileEvent", "fields": [ {"name": "user_id", "type": "string"}, {"name": "profile_data", "type": "string"}, {"name": "timestamp", "type": "long"}, ], }, "schemaType": "AVRO", "id": 203, # Different ID (not preserved) "version": 3, } def get_registry_side_effect(name): if name == source_registry: return mock_source_client elif name == target_registry: return mock_target_client return None self.mock_registry_manager.get_registry.side_effect = get_registry_side_effect # Mock requests to simulate schema existence check with patch("interactive_tools.requests.get") as mock_get: # First call: check if schema exists in target (returns 200 with versions) mock_get.return_value.status_code = 200 mock_get.return_value.json.return_value = [1, 2, 3] # Mock elicitation response with complete user preferences with patch("interactive_tools.elicit_with_fallback") as mock_elicit: mock_response = ElicitationResponse( request_id="migrate-schema-integration-test", values={ "replace_existing": "true", # User agrees to replace "backup_before_replace": "true", # User wants backup "preserve_ids": "false", # Don't preserve IDs "compare_after_migration": "true", # Verify after migration "migrate_all_versions": "true", # Migrate all versions "dry_run": "false", # Perform actual migration }, complete=True, metadata={"source": "integration_test_user"}, ) mock_elicit.return_value = mock_response # Execute the interactive migration result = await migrate_schema_interactive( subject=subject, source_registry=source_registry, target_registry=target_registry, source_context="user-events", target_context="user-events", migrate_schema_tool=mock_migrate_tool, export_schema_tool=mock_export_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, ) # Verify elicitation was triggered with correct request mock_elicit.assert_called_once() elicitation_request = mock_elicit.call_args[0][0] assert elicitation_request.type == ElicitationType.FORM assert elicitation_request.title == "Schema Migration Preferences" assert subject in elicitation_request.description assert "already exists" in elicitation_request.description # Verify schema existence was properly detected assert elicitation_request.context["schema_exists_in_target"] is True assert elicitation_request.context["existing_versions"] == [1, 2, 3] # Verify backup was performed mock_export_tool.assert_called_once() export_call_kwargs = mock_export_tool.call_args.kwargs assert export_call_kwargs["subject"] == subject assert export_call_kwargs["registry"] == target_registry assert export_call_kwargs["include_metadata"] is True # Verify migration was executed with correct preferences mock_migrate_tool.assert_called_once() migrate_call_kwargs = mock_migrate_tool.call_args.kwargs assert migrate_call_kwargs["subject"] == subject assert migrate_call_kwargs["source_registry"] == source_registry assert migrate_call_kwargs["target_registry"] == target_registry assert migrate_call_kwargs["preserve_ids"] is False assert migrate_call_kwargs["dry_run"] is False assert migrate_call_kwargs["migrate_all_versions"] is True # Verify result structure assert result["success"] is True assert result["elicitation_used"] is True assert result["schema_existed_in_target"] is True # Verify elicited preferences are recorded elicited_prefs = result["elicited_preferences"] assert elicited_prefs["replace_existing"] is True assert elicited_prefs["backup_before_replace"] is True assert elicited_prefs["preserve_ids"] is False assert elicited_prefs["compare_after_migration"] is True # Verify backup result is included assert result["backup_result"]["success"] is True assert result["backup_result"]["subject"] == subject # Verify post-migration verification was performed verification_result = result["verification_result"] assert verification_result["verification_type"] == "basic" assert verification_result["overall_success"] is True # Check that verification included expected checks checks = verification_result["checks"] check_names = [check["check"] for check in checks] assert "schema_exists_in_target" in check_names assert "schema_content_match" in check_names assert "schema_type_match" in check_names # ID preservation check should NOT be present since preserve_ids=false assert "id_preservation" not in check_names # Verify specific check results schema_exists_check = next(c for c in checks if c["check"] == "schema_exists_in_target") assert schema_exists_check["passed"] is True content_match_check = next(c for c in checks if c["check"] == "schema_content_match") assert content_match_check["passed"] is True type_match_check = next(c for c in checks if c["check"] == "schema_type_match") assert type_match_check["passed"] is True logger.info("✅ Complete interactive schema migration workflow test passed") @pytest.mark.asyncio async def test_elicitation_timeout_handling(self): """Test elicitation timeout and cleanup behavior.""" # Test scenario: Elicitation request times out and is properly cleaned up manager = ElicitationManager() # Create request with very short timeout request = ElicitationRequest( title="Timeout Test", fields=[ElicitationField("test_field", "text", required=True)], timeout_seconds=0.1, # 100ms timeout ) # Create and track the request request_id = await manager.create_request(request) assert request_id in manager.pending_requests # Wait for timeout to occur await asyncio.sleep(0.2) # Request should be automatically cleaned up assert request_id not in manager.pending_requests # Verify we can't submit a response to expired request response = ElicitationResponse(request_id=request_id, values={"test_field": "too_late"}) success = await manager.submit_response(response) assert success is False @pytest.mark.asyncio async def test_multi_round_elicitation_workflow(self): """Test complex workflow requiring multiple elicitation rounds.""" # Test scenario: Schema registration that requires multiple rounds of user input subject = "complex-user-profile" # Mock registration tool mock_register_tool = MagicMock(return_value={"success": True, "id": 456, "subject": subject, "version": 1}) # First round: Basic field definition first_response = ElicitationResponse( request_id="round-1", values={ "field_name": "user_id", "field_type": "string", "nullable": "false", "documentation": "Primary user identifier", }, complete=True, ) # Simulate multiple elicitation rounds elicitation_call_count = 0 async def mock_elicit_multiple_rounds(request): nonlocal elicitation_call_count elicitation_call_count += 1 if elicitation_call_count == 1: # First round: Add user_id field return first_response elif elicitation_call_count == 2: # Second round: Add email field return ElicitationResponse( request_id="round-2", values={ "field_name": "email", "field_type": "string", "nullable": "true", "documentation": "User email address", }, complete=True, ) else: # No more fields needed return None with patch( "interactive_tools.elicit_with_fallback", side_effect=mock_elicit_multiple_rounds, ): result = await register_schema_interactive( subject=subject, schema_definition={ "type": "record", "name": "UserProfile", "fields": [], }, register_schema_tool=mock_register_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Verify multiple elicitation rounds occurred assert elicitation_call_count >= 1 assert result["success"] is True assert result["elicitation_used"] is True @pytest.mark.asyncio async def test_fallback_when_elicitation_fails(self): """Test graceful fallback when elicitation fails or times out.""" # Test scenario: Elicitation system fails, but operation should continue with defaults subject = "fallback-test-schema" incomplete_schema = {"type": "record", "name": "FallbackTest", "fields": []} # Mock registration tool to expect a call even with fallback mock_register_tool = MagicMock( return_value={ "error": "Schema validation failed", "details": "No fields defined in schema", } ) # Mock elicitation to fail/timeout with patch("interactive_tools.elicit_with_fallback", return_value=None): result = await register_schema_interactive( subject=subject, schema_definition=incomplete_schema, register_schema_tool=mock_register_tool, registry_manager=self.mock_registry_manager, registry_mode=self.registry_mode, auth=self.mock_auth, headers=self.mock_headers, schema_registry_url=self.schema_registry_url, ) # Should return error indicating elicitation failed assert "error" in result assert "INCOMPLETE_SCHEMA_DEFINITION" in result.get("error_code", "") assert "elicitation_status" in result.get("details", {}) @pytest.mark.asyncio async def test_real_mcp_elicitation_integration(self): """Test real MCP protocol elicitation integration.""" # Test scenario: Verify MCP protocol integration works correctly from elicitation_mcp_integration import real_mcp_elicit, set_mcp_instance # Set up mock MCP instance set_mcp_instance(self.mock_mcp) # Configure mock MCP to return successful elicitation response self.mock_mcp.send_elicitation_request.return_value = { "values": {"test_field": "mcp_response"}, "complete": True, "metadata": {"source": "mcp_client"}, } # Create test elicitation request request = ElicitationRequest( title="MCP Protocol Test", fields=[ElicitationField("test_field", "text", required=True)], ) # Execute real MCP elicitation response = await real_mcp_elicit(request) # Verify MCP method was called self.mock_mcp.send_elicitation_request.assert_called_once() # Verify response was processed correctly assert response is not None assert response.values["test_field"] == "mcp_response" assert response.complete is True @pytest.mark.asyncio async def test_elicitation_response_submission(self): """Test client response submission through MCP protocol.""" # Test scenario: Client submits elicitation response via MCP tool manager = ElicitationManager() # Create pending request request = ElicitationRequest( title="Response Submission Test", fields=[ElicitationField("user_input", "text", required=True)], ) request_id = await manager.create_request(request) # Simulate client response submission response_data = { "values": {"user_input": "client_provided_value"}, "complete": True, "metadata": {"source": "mcp_client_submission"}, } # Test the response handling success = await handle_elicitation_response(request_id, response_data) # Verify response was processed assert success is True # Verify response is stored and request is cleaned up stored_response = manager.get_response(request_id) assert stored_response is not None assert stored_response.values["user_input"] == "client_provided_value" assert request_id not in manager.pending_requests def test_elicitation_manager_concurrent_requests(self): """Test elicitation manager handles concurrent requests correctly.""" # Test scenario: Multiple concurrent elicitation requests async def test_concurrent(): manager = ElicitationManager() # Create multiple requests concurrently request1 = ElicitationRequest(title="Concurrent Test 1") request2 = ElicitationRequest(title="Concurrent Test 2") request3 = ElicitationRequest(title="Concurrent Test 3") # Submit all requests ids = await asyncio.gather( manager.create_request(request1), manager.create_request(request2), manager.create_request(request3), ) # Verify all requests are tracked assert len(manager.pending_requests) == 3 assert all(req_id in manager.pending_requests for req_id in ids) # Submit responses concurrently responses = [ElicitationResponse(req_id, {"test": f"value_{i}"}) for i, req_id in enumerate(ids)] results = await asyncio.gather(*[manager.submit_response(resp) for resp in responses]) # Verify all responses were processed assert all(results) assert len(manager.pending_requests) == 0 # Verify responses are stored for req_id in ids: stored = manager.get_response(req_id) assert stored is not None # Run the concurrent test asyncio.run(test_concurrent()) @pytest.mark.asyncio async def test_enhanced_elicitation_with_fallback_chain(self): """Test the enhanced elicitation function's fallback chain.""" # Test scenario: Verify fallback chain from real MCP to mock works correctly from elicitation_mcp_integration import ( enhanced_elicit_with_fallback, set_mcp_instance, ) # Test 1: Real MCP elicitation succeeds mock_mcp = MagicMock() mock_mcp.send_elicitation_request = AsyncMock( return_value={"values": {"field": "real_mcp_value"}, "complete": True} ) set_mcp_instance(mock_mcp) request = ElicitationRequest( title="Fallback Chain Test", fields=[ElicitationField("field", "text", default="fallback_value")], ) response = await enhanced_elicit_with_fallback(request) assert response.values["field"] == "real_mcp_value" # Test 2: Real MCP fails, fallback to mock mock_mcp.send_elicitation_request.side_effect = Exception("MCP failed") response = await enhanced_elicit_with_fallback(request) assert response.values["field"] == "fallback_value" # From mock fallback assert response.metadata["source"] == "mock_fallback" class TestElicitationPerformance: """Test elicitation system performance and resource usage.""" @pytest.mark.asyncio async def test_large_scale_elicitation_handling(self): """Test system behavior with many concurrent elicitation requests.""" manager = ElicitationManager() # Create many requests quickly num_requests = 100 requests = [ ElicitationRequest( title=f"Performance Test {i}", fields=[ElicitationField(f"field_{i}", "text")], ) for i in range(num_requests) ] start_time = datetime.utcnow() # Submit all requests request_ids = await asyncio.gather(*[manager.create_request(req) for req in requests]) submission_time = datetime.utcnow() - start_time # Verify all requests were created quickly (< 1 second) assert submission_time.total_seconds() < 1.0 assert len(manager.pending_requests) == num_requests # Submit responses responses = [ElicitationResponse(req_id, {f"field_{i}": f"value_{i}"}) for i, req_id in enumerate(request_ids)] start_time = datetime.utcnow() results = await asyncio.gather(*[manager.submit_response(resp) for resp in responses]) processing_time = datetime.utcnow() - start_time # Verify all responses processed quickly assert processing_time.total_seconds() < 2.0 assert all(results) assert len(manager.pending_requests) == 0 @pytest.mark.asyncio async def test_memory_cleanup_after_completion(self): """Test that completed elicitation requests are properly cleaned up.""" manager = ElicitationManager() # Create and complete several requests for i in range(10): request = ElicitationRequest(title=f"Cleanup Test {i}") request_id = await manager.create_request(request) response = ElicitationResponse(request_id, {"test": f"value_{i}"}) await manager.submit_response(response) # Verify all requests are cleaned up from pending assert len(manager.pending_requests) == 0 # Verify responses are still accessible (for audit/debugging) assert len(manager.responses) == 10 # Mock functions for testing async def handle_elicitation_response(request_id: str, response_data: dict) -> bool: """Mock elicitation response handler for testing.""" manager = ElicitationManager() response = ElicitationResponse( request_id=request_id, values=response_data.get("values", {}), complete=response_data.get("complete", True), metadata=response_data.get("metadata", {}), ) return await manager.submit_response(response) if __name__ == "__main__": # Run the integration tests pytest.main([__file__, "-v", "--asyncio-mode=auto"])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server