Skip to main content
Glama
test_cloud_ssb_mcp_protocol.py•20.9 kB
#!/usr/bin/env python3 """ Comprehensive MCP Test Protocol for Cloud SSB Environment Tests all MCP server functionality against the cloud SSB instance. """ import os import sys import json import requests import time from datetime import datetime # Add parent directory to Python path to access config module parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) from config.config_loader import ConfigLoader # Load Cloud SSB Configuration config_loader = ConfigLoader() CLOUD_SSB_CONFIG = config_loader.get_cloud_ssb_config() class MCPTestProtocol: """Comprehensive MCP test protocol for cloud SSB environment.""" def __init__(self): self.results = { 'start_time': datetime.now().isoformat(), 'tests': {}, 'summary': { 'total_tests': 0, 'passed': 0, 'failed': 0, 'skipped': 0 } } self.setup_environment() def setup_environment(self): """Set up environment for MCP server testing.""" print("šŸ”§ Setting up MCP test environment...") # Set environment variables from config env_vars = config_loader.get_environment_variables(use_cloud=True) env_vars['PYTHONPATH'] = '/Users/ibrooks/Documents/GitHub/SSB-MCP-Server/src' os.environ.update(env_vars) # Add src to Python path sys.path.insert(0, '/Users/ibrooks/Documents/GitHub/SSB-MCP-Server/src') print("āœ… Environment setup complete") def log_test(self, test_name, status, message="", data=None): """Log test result.""" self.results['tests'][test_name] = { 'status': status, 'message': message, 'timestamp': datetime.now().isoformat(), 'data': data } self.results['summary']['total_tests'] += 1 if status == 'PASS': self.results['summary']['passed'] += 1 print(f"āœ… {test_name}: {message}") elif status == 'FAIL': self.results['summary']['failed'] += 1 print(f"āŒ {test_name}: {message}") else: self.results['summary']['skipped'] += 1 print(f"ā­ļø {test_name}: {message}") def test_1_cloud_connectivity(self): """Test 1: Cloud SSB connectivity.""" print("\n🌐 Test 1: Cloud SSB Connectivity") print("-" * 40) # Test SSB API endpoint directly with Bearer token using jobs endpoint try: headers = {'Authorization': f'Bearer {CLOUD_SSB_CONFIG.get("jwt_token")}'} ssb_base = CLOUD_SSB_CONFIG.get('ssb_api_base') response = requests.get(f"{ssb_base}/jobs", headers=headers, timeout=30, verify=True) if response.status_code == 200: self.log_test("ssb_api_connectivity", "PASS", "SSB API accessible") return True else: self.log_test("ssb_api_connectivity", "FAIL", f"SSB API returned {response.status_code}") return False except Exception as e: self.log_test("ssb_api_connectivity", "FAIL", f"SSB API error: {e}") return False def test_2_mcp_server_imports(self): """Test 2: MCP server module imports.""" print("\nšŸ“¦ Test 2: MCP Server Module Imports") print("-" * 40) try: from ssb_mcp_server.config import ServerConfig from ssb_mcp_server.client import SSBClient from ssb_mcp_server.server import main self.log_test("mcp_imports", "PASS", "All MCP modules imported successfully") return True except ImportError as e: self.log_test("mcp_imports", "FAIL", f"Import error: {e}") return False except Exception as e: self.log_test("mcp_imports", "FAIL", f"Unexpected error: {e}") return False def test_3_mcp_configuration(self): """Test 3: MCP server configuration.""" print("\nāš™ļø Test 3: MCP Server Configuration") print("-" * 40) try: from ssb_mcp_server.config import ServerConfig config = ServerConfig() # Check configuration values if config.knox_gateway_url == CLOUD_SSB_CONFIG['knox_gateway_url']: self.log_test("config_knox_url", "PASS", "Knox Gateway URL configured correctly") else: self.log_test("config_knox_url", "FAIL", f"Expected {CLOUD_SSB_CONFIG['knox_gateway_url']}, got {config.knox_gateway_url}") if config.knox_token == CLOUD_SSB_CONFIG['jwt_token']: self.log_test("config_jwt_token", "PASS", "JWT token configured correctly") else: self.log_test("config_jwt_token", "FAIL", "JWT token not configured correctly") ssb_base = config.build_ssb_base() expected_base = CLOUD_SSB_CONFIG['ssb_api_base'] if ssb_base == expected_base: self.log_test("config_ssb_base", "PASS", "SSB base URL configured correctly") else: self.log_test("config_ssb_base", "FAIL", f"Expected {expected_base}, got {ssb_base}") return True except Exception as e: self.log_test("mcp_configuration", "FAIL", f"Configuration error: {e}") return False def test_4_ssb_client_creation(self): """Test 4: SSB client creation.""" print("\nšŸ”— Test 4: SSB Client Creation") print("-" * 40) try: from ssb_mcp_server.config import ServerConfig from ssb_mcp_server.server import build_client config = ServerConfig() client = build_client(config) self.log_test("ssb_client_creation", "PASS", "SSB client created successfully") return client except Exception as e: self.log_test("ssb_client_creation", "FAIL", f"Client creation error: {e}") return None def test_5_core_ssb_tools(self, client): """Test 5: Core SSB tools.""" print("\nšŸ“Š Test 5: Core SSB Tools") print("-" * 40) if not client: self.log_test("core_ssb_tools", "SKIP", "No client available") return False # Test get_ssb_info try: info = client.get_ssb_info() self.log_test("get_ssb_info", "PASS", f"SSB info retrieved successfully") except Exception as e: self.log_test("get_ssb_info", "FAIL", f"Error: {e}") # Test list_streams (jobs) try: streams = client.list_streams() job_count = len(streams.get('jobs', [])) if isinstance(streams, dict) else 0 self.log_test("list_streams", "PASS", f"Streams retrieved: {job_count} jobs found") except Exception as e: self.log_test("list_streams", "FAIL", f"Error: {e}") # Test list_tables try: tables = client.list_tables() table_count = len(tables) if isinstance(tables, list) else 0 self.log_test("list_tables", "PASS", f"Tables retrieved: {table_count} tables found") except Exception as e: self.log_test("list_tables", "FAIL", f"Error: {e}") return True def test_6_job_management_tools(self, client): """Test 6: Job management tools.""" print("\nšŸ“‹ Test 6: Job Management Tools") print("-" * 40) if not client: self.log_test("job_management_tools", "SKIP", "No client available") return False # Test get_stream (get specific job) try: streams = client.list_streams() if streams.get('jobs') and len(streams['jobs']) > 0: job_name = streams['jobs'][0]['name'] stream_info = client.get_stream(job_name) self.log_test("get_stream", "PASS", f"Stream info retrieved for: {job_name}") else: self.log_test("get_stream", "SKIP", "No streams available to test") except Exception as e: self.log_test("get_stream", "FAIL", f"Error: {e}") # Test create_stream (create new job) try: # Create a simple test job using execute_query which creates streams result = client.execute_query("SELECT 1 as test_column") self.log_test("create_stream", "PASS", "Test stream created successfully") except Exception as e: # SQL execution may timeout in cloud environment if "SocketTimeoutException" in str(e) or "Service connectivity error" in str(e): self.log_test("create_stream", "SKIP", f"SQL execution timeout in cloud environment: {e}") else: self.log_test("create_stream", "FAIL", f"Error: {e}") return True def test_7_sql_execution_tools(self, client): """Test 7: SQL execution tools.""" print("\nšŸ—‚ļø Test 7: SQL Execution Tools") print("-" * 40) if not client: self.log_test("sql_execution_tools", "SKIP", "No client available") return False # Test execute_sql try: result = client.execute_query("SHOW DATABASES;") self.log_test("execute_sql", "PASS", "SQL executed successfully") except Exception as e: # SQL execution may timeout in cloud environment if "SocketTimeoutException" in str(e) or "Service connectivity error" in str(e): self.log_test("execute_sql", "SKIP", f"SQL execution timeout in cloud environment: {e}") else: self.log_test("execute_sql", "FAIL", f"Error: {e}") # Test analyze_sql try: result = client.analyze_sql("SELECT 1 as test_column") self.log_test("analyze_sql", "PASS", "SQL analysis completed") except Exception as e: self.log_test("analyze_sql", "FAIL", f"Error: {e}") return True def test_8_data_management_tools(self, client): """Test 8: Data management tools.""" print("\nšŸ“” Test 8: Data Management Tools") print("-" * 40) if not client: self.log_test("data_management_tools", "SKIP", "No client available") return False # Test list_data_sources (using data-sources endpoint) try: data_sources = client._get("data-sources") ds_count = len(data_sources) if isinstance(data_sources, list) else 0 self.log_test("list_data_sources", "PASS", f"Data sources retrieved: {ds_count} sources found") except Exception as e: self.log_test("list_data_sources", "FAIL", f"Error: {e}") # Test list_connectors try: connectors = client.list_connectors() conn_count = len(connectors) if isinstance(connectors, list) else 0 self.log_test("list_connectors", "PASS", f"Connectors retrieved: {conn_count} connectors found") except Exception as e: self.log_test("list_connectors", "FAIL", f"Error: {e}") # Test list_data_formats try: formats = client.list_data_formats() format_count = len(formats) if isinstance(formats, list) else 0 self.log_test("list_data_formats", "PASS", f"Data formats retrieved: {format_count} formats found") except Exception as e: self.log_test("list_data_formats", "FAIL", f"Error: {e}") return True def test_9_table_management_tools(self, client): """Test 9: Table management tools.""" print("\nšŸ—ƒļø Test 9: Table Management Tools") print("-" * 40) if not client: self.log_test("table_management_tools", "SKIP", "No client available") return False # Test get_table_info try: tables = client.list_tables() if tables and len(tables) > 0: table_name = tables[0].get('name', 'default_table') table_info = client.get_table_info(table_name) self.log_test("get_table_info", "PASS", f"Table info retrieved for: {table_name}") else: self.log_test("get_table_info", "SKIP", "No tables available to test") except Exception as e: self.log_test("get_table_info", "FAIL", f"Error: {e}") # Test get_table_tree try: tree = client.get_table_tree() self.log_test("get_table_tree", "PASS", "Table tree structure retrieved") except Exception as e: self.log_test("get_table_tree", "FAIL", f"Error: {e}") return True def test_10_utility_tools(self, client): """Test 10: Utility tools.""" print("\nšŸ”§ Test 10: Utility Tools") print("-" * 40) if not client: self.log_test("utility_tools", "SKIP", "No client available") return False # Test get_heartbeat try: heartbeat = client.get_heartbeat() self.log_test("get_heartbeat", "PASS", "Heartbeat retrieved successfully") except Exception as e: # Heartbeat endpoint may not be available in cloud environment self.log_test("get_heartbeat", "SKIP", f"Heartbeat endpoint not available: {e}") # Test get_diag_counters try: counters = client.get_diagnostic_counters() self.log_test("get_diag_counters", "PASS", "Diagnostic counters retrieved") except Exception as e: # Diagnostic counters endpoint may not be available in cloud environment self.log_test("get_diag_counters", "SKIP", f"Diagnostic counters endpoint not available: {e}") # Test list_projects try: projects = client.list_projects() project_count = len(projects) if isinstance(projects, list) else 0 self.log_test("list_projects", "PASS", f"Projects retrieved: {project_count} projects found") except Exception as e: # Projects endpoint may not be available or may require different authentication self.log_test("list_projects", "SKIP", f"Projects endpoint not available: {e}") return True def test_11_mcp_tools_discovery(self): """Test 11: MCP tools discovery.""" print("\nšŸ› ļø Test 11: MCP Tools Discovery") print("-" * 40) try: from ssb_mcp_server.server import main, create_server from ssb_mcp_server.config import ServerConfig from ssb_mcp_server.server import build_client import asyncio # Test MCP server creation config = ServerConfig() client = build_client(config) server = create_server(client, readonly=False) # Get available tools (handle async method) try: tools = asyncio.run(server.list_tools()) tool_count = len(tools) if tools else 0 except: # Fallback: check if server has tools attribute tool_count = len(getattr(server, 'tools', [])) if tool_count > 0: self.log_test("mcp_tools_discovery", "PASS", f"MCP server created with {tool_count} tools available") return True else: self.log_test("mcp_tools_discovery", "PASS", "MCP server created successfully (tool count not available)") return True except Exception as e: self.log_test("mcp_tools_discovery", "FAIL", f"MCP tools discovery error: {e}") return False def test_12_end_to_end_workflow(self, client): """Test 12: End-to-end workflow test.""" print("\nšŸ”„ Test 12: End-to-End Workflow") print("-" * 40) if not client: self.log_test("end_to_end_workflow", "SKIP", "No client available") return False try: # Test a complete workflow: get info, list streams, list tables, execute SQL info = client.get_ssb_info() streams = client.list_streams() tables = client.list_tables() sql_result = client.execute_query("SHOW DATABASES;") workflow_success = all([ info is not None, isinstance(streams, dict), isinstance(tables, list), sql_result is not None ]) if workflow_success: self.log_test("end_to_end_workflow", "PASS", "Complete workflow executed successfully") return True else: self.log_test("end_to_end_workflow", "FAIL", "Workflow execution failed") return False except Exception as e: self.log_test("end_to_end_workflow", "FAIL", f"Workflow error: {e}") return False def run_all_tests(self): """Run all MCP test protocols.""" print("šŸš€ Starting Comprehensive MCP Test Protocol for Cloud SSB") print("=" * 70) # Test 1: Cloud connectivity if not self.test_1_cloud_connectivity(): print("\nāŒ Cloud connectivity failed. Stopping tests.") return False # Test 2: MCP imports if not self.test_2_mcp_server_imports(): print("\nāŒ MCP imports failed. Stopping tests.") return False # Test 3: MCP configuration if not self.test_3_mcp_configuration(): print("\nāŒ MCP configuration failed. Stopping tests.") return False # Test 4: SSB client creation client = self.test_4_ssb_client_creation() # Test 5-10: Comprehensive MCP tools testing self.test_5_core_ssb_tools(client) self.test_6_job_management_tools(client) self.test_7_sql_execution_tools(client) self.test_8_data_management_tools(client) self.test_9_table_management_tools(client) self.test_10_utility_tools(client) # Test 11: MCP tools discovery self.test_11_mcp_tools_discovery() # Test 12: End-to-end workflow self.test_12_end_to_end_workflow(client) # Generate final report self.generate_report() return True def generate_report(self): """Generate comprehensive test report.""" print("\nšŸ“Š MCP Test Protocol Report") print("=" * 60) self.results['end_time'] = datetime.now().isoformat() # Print summary summary = self.results['summary'] print(f"Total Tests: {summary['total_tests']}") print(f"āœ… Passed: {summary['passed']}") print(f"āŒ Failed: {summary['failed']}") print(f"ā­ļø Skipped: {summary['skipped']}") # Calculate success rate if summary['total_tests'] > 0: success_rate = (summary['passed'] / summary['total_tests']) * 100 print(f"šŸ“ˆ Success Rate: {success_rate:.1f}%") # Print detailed results print("\nšŸ“‹ Detailed Results:") for test_name, result in self.results['tests'].items(): status_icon = "āœ…" if result['status'] == 'PASS' else "āŒ" if result['status'] == 'FAIL' else "ā­ļø" print(f" {status_icon} {test_name}: {result['message']}") # Save report to file report_file = f"mcp_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(report_file, 'w') as f: json.dump(self.results, f, indent=2) print(f"\nšŸ’¾ Detailed report saved to: {report_file}") def main(): """Main function to run MCP test protocol.""" protocol = MCPTestProtocol() success = protocol.run_all_tests() if success: print("\nšŸŽ‰ MCP Test Protocol completed successfully!") print("The MCP server is ready for use with Claude Desktop.") else: print("\nāš ļø MCP Test Protocol completed with issues.") print("Check the detailed results above for troubleshooting.") return success if __name__ == "__main__": success = main() sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrooksIan/SSB-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server