Skip to main content
Glama
test_all_mcp_features.py19.1 kB
#!/usr/bin/env python3 """ Comprehensive test suite for SSB MCP Server features. Tests all 80+ MCP tools across 15 functional categories. """ import asyncio import json import os import sys from typing import Dict, Any, List from datetime import datetime # Add the src directory to the path (go up one level from Testing/) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from ssb_mcp_server.server import build_client from ssb_mcp_server.config import ServerConfig class MCPFeatureTester: """Comprehensive tester for all MCP features.""" def __init__(self): self.config = ServerConfig() self.client = build_client(self.config) self.results = { 'total_tests': 0, 'passed': 0, 'failed': 0, 'errors': [], 'categories': {} } self.test_data = {} def log_result(self, category: str, test_name: str, success: bool, error: str = None): """Log test result.""" self.results['total_tests'] += 1 if success: self.results['passed'] += 1 print(f"✅ {category} - {test_name}") else: self.results['failed'] += 1 error_msg = f"❌ {category} - {test_name}: {error}" print(error_msg) self.results['errors'].append(error_msg) if category not in self.results['categories']: self.results['categories'][category] = {'passed': 0, 'failed': 0} if success: self.results['categories'][category]['passed'] += 1 else: self.results['categories'][category]['failed'] += 1 def test_basic_connectivity(self): """Test basic connectivity and authentication.""" print("\n🔗 Testing Basic Connectivity...") try: result = self.client.get_ssb_info() self.log_result("Connectivity", "get_ssb_info", True) self.test_data['ssb_info'] = result except Exception as e: self.log_result("Connectivity", "get_ssb_info", False, str(e)) try: result = self.client.get_heartbeat() self.log_result("Connectivity", "get_heartbeat", True) except Exception as e: self.log_result("Connectivity", "get_heartbeat", False, str(e)) def test_monitoring_diagnostics(self): """Test monitoring and diagnostics features.""" print("\n📊 Testing Monitoring & Diagnostics...") try: result = self.client.get_diagnostic_counters() self.log_result("Monitoring", "get_diagnostic_counters", True) except Exception as e: self.log_result("Monitoring", "get_diagnostic_counters", False, str(e)) try: result = self.client.analyze_sql("SELECT * FROM NVDA LIMIT 10") self.log_result("Monitoring", "analyze_sql", True) except Exception as e: self.log_result("Monitoring", "analyze_sql", False, str(e)) def test_job_management(self): """Test job management features.""" print("\n🔧 Testing Advanced Job Management...") # First, get a list of jobs to work with try: jobs_result = self.client.list_streams() jobs = jobs_result.get('jobs', []) if jobs: job_id = jobs[0]['job_id'] self.test_data['job_id'] = job_id try: result = self.client.get_job_events(job_id) self.log_result("Job Management", "get_job_events", True) except Exception as e: self.log_result("Job Management", "get_job_events", False, str(e)) try: result = self.client.get_job_state(job_id) self.log_result("Job Management", "get_job_state", True) except Exception as e: self.log_result("Job Management", "get_job_state", False, str(e)) try: result = self.client.get_job_mv_endpoints(job_id) self.log_result("Job Management", "get_job_mv_endpoints", True) except Exception as e: self.log_result("Job Management", "get_job_mv_endpoints", False, str(e)) try: result = self.client.copy_job(job_id) self.log_result("Job Management", "copy_job", True) except Exception as e: self.log_result("Job Management", "copy_job", False, str(e)) else: print("⚠️ No jobs available for job management tests") except Exception as e: self.log_result("Job Management", "list_streams_for_job_tests", False, str(e)) def test_table_management(self): """Test table management features.""" print("\n🗂️ Testing Enhanced Table Management...") try: result = self.client.list_tables_detailed() self.log_result("Table Management", "list_tables_detailed", True) self.test_data['tables'] = result.get('tables', []) except Exception as e: self.log_result("Table Management", "list_tables_detailed", False, str(e)) try: result = self.client.get_table_tree() self.log_result("Table Management", "get_table_tree", True) except Exception as e: self.log_result("Table Management", "get_table_tree", False, str(e)) try: result = self.client.list_tables() self.log_result("Table Management", "list_tables", True) except Exception as e: self.log_result("Table Management", "list_tables", False, str(e)) # Test table schema if we have tables if self.test_data.get('tables'): try: table_name = self.test_data['tables'][0].get('table_name', 'NVDA') result = self.client.get_table_schema(table_name) self.log_result("Table Management", "get_table_schema", True) except Exception as e: self.log_result("Table Management", "get_table_schema", False, str(e)) def test_connector_format_management(self): """Test connector and format management features.""" print("\n🔌 Testing Connector & Format Management...") try: result = self.client.list_data_formats() self.log_result("Connector Management", "list_data_formats", True) except Exception as e: self.log_result("Connector Management", "list_data_formats", False, str(e)) try: result = self.client.list_connectors() self.log_result("Connector Management", "list_connectors", True) except Exception as e: self.log_result("Connector Management", "list_connectors", False, str(e)) try: result = self.client.validate_kafka_connector("local-kafka") self.log_result("Connector Management", "validate_kafka_connector", True) except Exception as e: self.log_result("Connector Management", "validate_kafka_connector", False, str(e)) def test_user_project_management(self): """Test user and project management features.""" print("\n👤 Testing User & Project Management...") try: result = self.client.get_user_info() self.log_result("User Management", "get_user_info", True) except Exception as e: self.log_result("User Management", "get_user_info", False, str(e)) try: result = self.client.get_user_settings() self.log_result("User Management", "get_user_settings", True) except Exception as e: self.log_result("User Management", "get_user_settings", False, str(e)) try: result = self.client.list_projects() self.log_result("User Management", "list_projects", True) except Exception as e: self.log_result("User Management", "list_projects", False, str(e)) def test_api_key_management(self): """Test API key management features.""" print("\n🔑 Testing API Key Management...") try: result = self.client.list_api_keys() self.log_result("API Key Management", "list_api_keys", True) except Exception as e: self.log_result("API Key Management", "list_api_keys", False, str(e)) def test_environment_management(self): """Test environment management features.""" print("\n🌍 Testing Environment Management...") try: result = self.client.list_environments() self.log_result("Environment Management", "list_environments", True) except Exception as e: self.log_result("Environment Management", "list_environments", False, str(e)) def test_sync_configuration(self): """Test sync and configuration features.""" print("\n🔄 Testing Sync & Configuration...") try: result = self.client.get_sync_config() self.log_result("Sync Configuration", "get_sync_config", True) except Exception as e: self.log_result("Sync Configuration", "get_sync_config", False, str(e)) def test_udf_management(self): """Test UDF management features.""" print("\n📈 Testing UDF Management...") try: result = self.client.list_udfs_detailed() self.log_result("UDF Management", "list_udfs_detailed", True) except Exception as e: self.log_result("UDF Management", "list_udfs_detailed", False, str(e)) try: result = self.client.list_udfs() self.log_result("UDF Management", "list_udfs", True) except Exception as e: self.log_result("UDF Management", "list_udfs", False, str(e)) try: result = self.client.get_udf_artifacts() self.log_result("UDF Management", "get_udf_artifacts", True) except Exception as e: self.log_result("UDF Management", "get_udf_artifacts", False, str(e)) def test_stream_management(self): """Test stream management features.""" print("\n🌊 Testing Stream Management...") try: result = self.client.list_streams() self.log_result("Stream Management", "list_streams", True) self.test_data['streams'] = result.get('jobs', []) except Exception as e: self.log_result("Stream Management", "list_streams", False, str(e)) if self.test_data.get('streams'): try: stream_name = self.test_data['streams'][0]['name'] result = self.client.get_stream(stream_name) self.log_result("Stream Management", "get_stream", True) except Exception as e: self.log_result("Stream Management", "get_stream", False, str(e)) def test_query_execution(self): """Test query execution features.""" print("\n⚡ Testing Query Execution...") try: result = self.client.execute_query("SELECT 1 as test_column") self.log_result("Query Execution", "execute_query", True) except Exception as e: self.log_result("Query Execution", "execute_query", False, str(e)) try: result = self.client.execute_query_with_sampling("SELECT 1 as test_column", sample_all_messages=True) self.log_result("Query Execution", "execute_query_with_sampling", True) except Exception as e: self.log_result("Query Execution", "execute_query_with_sampling", False, str(e)) def test_job_control(self): """Test job control features.""" print("\n🎮 Testing Job Control...") if self.test_data.get('job_id'): try: result = self.client.get_job_status(self.test_data['job_id']) self.log_result("Job Control", "get_job_status", True) except Exception as e: self.log_result("Job Control", "get_job_status", False, str(e)) try: result = self.client.get_job_sample_by_id(self.test_data['job_id']) self.log_result("Job Control", "get_job_sample_by_id", True) except Exception as e: self.log_result("Job Control", "get_job_sample_by_id", False, str(e)) try: result = self.client.list_jobs_with_samples() self.log_result("Job Control", "list_jobs_with_samples", True) except Exception as e: self.log_result("Job Control", "list_jobs_with_samples", False, str(e)) def test_kafka_integration(self): """Test Kafka integration features.""" print("\n📨 Testing Kafka Integration...") try: result = self.client.list_topics() self.log_result("Kafka Integration", "list_topics", True) except Exception as e: self.log_result("Kafka Integration", "list_topics", False, str(e)) def test_cluster_management(self): """Test cluster management features.""" print("\n🏢 Testing Cluster Management...") try: result = self.client.get_cluster_info() self.log_result("Cluster Management", "get_cluster_info", True) except Exception as e: self.log_result("Cluster Management", "get_cluster_info", False, str(e)) try: result = self.client.get_cluster_health() self.log_result("Cluster Management", "get_cluster_health", True) except Exception as e: self.log_result("Cluster Management", "get_cluster_health", False, str(e)) def test_kafka_table_management(self): """Test Kafka table management features.""" print("\n🗃️ Testing Kafka Table Management...") try: result = self.client.create_kafka_table("test_table", "test-topic", "local-kafka") self.log_result("Kafka Table Management", "create_kafka_table", True) except Exception as e: self.log_result("Kafka Table Management", "create_kafka_table", False, str(e)) try: result = self.client.register_kafka_table("test_register", "test-topic") self.log_result("Kafka Table Management", "register_kafka_table", True) except Exception as e: self.log_result("Kafka Table Management", "register_kafka_table", False, str(e)) def run_all_tests(self): """Run all test categories.""" print("🚀 Starting Comprehensive MCP Feature Testing") print("=" * 60) start_time = datetime.now() # Run all test categories self.test_basic_connectivity() self.test_monitoring_diagnostics() self.test_job_management() self.test_table_management() self.test_connector_format_management() self.test_user_project_management() self.test_api_key_management() self.test_environment_management() self.test_sync_configuration() self.test_udf_management() self.test_stream_management() self.test_query_execution() self.test_job_control() self.test_kafka_integration() self.test_cluster_management() self.test_kafka_table_management() end_time = datetime.now() duration = (end_time - start_time).total_seconds() # Print summary self.print_summary(duration) return self.results def print_summary(self, duration: float): """Print test summary.""" print("\n" + "=" * 60) print("📊 TEST SUMMARY") print("=" * 60) total = self.results['total_tests'] passed = self.results['passed'] failed = self.results['failed'] success_rate = (passed / total * 100) if total > 0 else 0 print(f"⏱️ Duration: {duration:.2f} seconds") print(f"📈 Total Tests: {total}") print(f"✅ Passed: {passed}") print(f"❌ Failed: {failed}") print(f"📊 Success Rate: {success_rate:.1f}%") print(f"\n📋 Category Breakdown:") for category, stats in self.results['categories'].items(): cat_total = stats['passed'] + stats['failed'] cat_success = (stats['passed'] / cat_total * 100) if cat_total > 0 else 0 print(f" {category}: {stats['passed']}/{cat_total} ({cat_success:.1f}%)") if self.results['errors']: print(f"\n❌ Errors ({len(self.results['errors'])}):") for error in self.results['errors'][:10]: # Show first 10 errors print(f" {error}") if len(self.results['errors']) > 10: print(f" ... and {len(self.results['errors']) - 10} more errors") print(f"\n🎯 MCP Server Status: {'✅ HEALTHY' if success_rate >= 80 else '⚠️ NEEDS ATTENTION'}") # Save detailed results self.save_results() def save_results(self): """Save test results to file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"mcp_test_results_{timestamp}.json" with open(filename, 'w') as f: json.dump(self.results, f, indent=2, default=str) print(f"\n💾 Detailed results saved to: {filename}") def main(): """Main test runner.""" print("🧪 SSB MCP Server Comprehensive Feature Test") print("Testing all 80+ MCP tools across 15 functional categories") print("=" * 60) # Check if we're in the right directory if not os.path.exists('src/ssb_mcp_server'): print("❌ Error: Please run this script from the SSB-MCP-Server root directory") sys.exit(1) # Set up environment variables if not already set if not os.getenv('SSB_API_BASE') and not os.getenv('KNOX_GATEWAY_URL'): print("⚠️ Warning: No SSB_API_BASE or KNOX_GATEWAY_URL set. Using defaults.") os.environ['SSB_API_BASE'] = 'http://localhost:18121' os.environ['SSB_USER'] = 'admin' os.environ['SSB_PASSWORD'] = 'admin' # Run tests tester = MCPFeatureTester() results = tester.run_all_tests() # Exit with appropriate code success_rate = (results['passed'] / results['total_tests'] * 100) if results['total_tests'] > 0 else 0 sys.exit(0 if success_rate >= 80 else 1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrooksIan/SSB-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server