Skip to main content
Glama

Cyber Sentinel MCP Server

by jx888-max
diagnostics.py11.8 kB
#!/usr/bin/env python3 """ Cyber Sentinel MCP Server - Configuration Diagnostics """ import asyncio import os import sys from pathlib import Path from typing import Dict, List, Tuple import httpx from rich.console import Console from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from .config import get_settings console = Console() class DiagnosticTool: """Configuration diagnostic and validation tool""" def __init__(self): self.settings = get_settings() self.results = {} def check_environment(self) -> Dict[str, bool]: """Check basic environment setup""" checks = {} # Check Python version python_version = sys.version_info checks["python_version"] = python_version >= (3, 8) # 不再检查.env文件,直接从环境变量读取 # 检查是否有环境变量设置 env_vars_set = any( [ os.getenv("VIRUSTOTAL_API_KEY"), os.getenv("ABUSEIPDB_API_KEY"), os.getenv("SHODAN_API_KEY"), os.getenv("URLHAUS_API_KEY"), ] ) checks["env_vars_configured"] = env_vars_set # Check required packages try: import mcp checks["mcp_installed"] = True except ImportError: checks["mcp_installed"] = False try: import httpx checks["httpx_installed"] = True except ImportError: checks["httpx_installed"] = False return checks def check_api_keys(self) -> Dict[str, bool]: """Check if API keys are configured""" checks = {} checks["virustotal"] = bool(self.settings.virustotal_api_key) checks["abuseipdb"] = bool(self.settings.abuseipdb_api_key) checks["shodan"] = bool(self.settings.shodan_api_key) checks["urlhaus"] = bool(self.settings.urlhaus_api_key) # Check if at least one API key is configured checks["has_any_api_key"] = any( [ checks["virustotal"], checks["abuseipdb"], checks["shodan"], checks["urlhaus"], ] ) return checks async def validate_api_connectivity(self) -> Dict[str, Tuple[bool, str]]: """Validate API connectivity and authentication""" results = {} async with httpx.AsyncClient(timeout=10.0) as client: # Test VirusTotal if self.settings.virustotal_api_key: try: response = await client.get( "https://www.virustotal.com/vtapi/v2/file/report", params={ "apikey": self.settings.virustotal_api_key, "resource": "test", }, ) if response.status_code in [200, 204]: results["virustotal"] = (True, "API key valid") else: results["virustotal"] = (False, f"HTTP {response.status_code}") except Exception as e: results["virustotal"] = (False, str(e)) else: results["virustotal"] = (False, "No API key configured") # Test AbuseIPDB if self.settings.abuseipdb_api_key: try: response = await client.get( "https://api.abuseipdb.com/api/v2/check", headers={ "Key": self.settings.abuseipdb_api_key, "Accept": "application/json", }, params={"ipAddress": "8.8.8.8", "maxAgeInDays": "90"}, ) if response.status_code == 200: results["abuseipdb"] = (True, "API key valid") else: results["abuseipdb"] = (False, f"HTTP {response.status_code}") except Exception as e: results["abuseipdb"] = (False, str(e)) else: results["abuseipdb"] = (False, "No API key configured") # Test Shodan if self.settings.shodan_api_key: try: response = await client.get( f"https://api.shodan.io/api-info?key={self.settings.shodan_api_key}" ) if response.status_code == 200: results["shodan"] = (True, "API key valid") else: results["shodan"] = (False, f"HTTP {response.status_code}") except Exception as e: results["shodan"] = (False, str(e)) else: results["shodan"] = (False, "No API key configured") # URLhaus doesn't require authentication for basic queries if self.settings.urlhaus_api_key: results["urlhaus"] = (True, "API key configured") else: results["urlhaus"] = (False, "No API key configured") return results def display_environment_results(self, results: Dict[str, bool]): """Display environment check results""" table = Table(title="Environment Check") table.add_column("Check", style="cyan") table.add_column("Status", style="bold") table.add_column("Details", style="dim") for check, passed in results.items(): status = "[green]✅ PASS[/green]" if passed else "[red]❌ FAIL[/red]" details = { "python_version": f"Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "env_vars_configured": "环境变量已配置" if passed else "未配置环境变量", "mcp_installed": ( "MCP package available" if passed else "MCP package not found" ), "httpx_installed": ( "httpx package available" if passed else "httpx package not found" ), } table.add_row( check.replace("_", " ").title(), status, details.get(check, "") ) console.print(table) def display_api_key_results(self, results: Dict[str, bool]): """Display API key check results""" table = Table(title="API Key Configuration") table.add_column("Service", style="cyan") table.add_column("Configured", style="bold") table.add_column("Recommendation", style="dim") recommendations = { "virustotal": "Highly recommended for comprehensive analysis", "abuseipdb": "Recommended for IP reputation checks", "shodan": "Optional for device intelligence", "urlhaus": "Optional for URL analysis", "has_any_api_key": "At least one API key required", } for service, configured in results.items(): if service == "has_any_api_key": continue status = "[green]✅ YES[/green]" if configured else "[yellow]⚠️ NO[/yellow]" table.add_row(service.title(), status, recommendations.get(service, "")) console.print(table) if not results["has_any_api_key"]: console.print( Panel( "[red]❌ 未配置API密钥![/red]\n\n" "您需要至少配置一个API密钥才能使用Cyber Sentinel。\n" "请设置环境变量,例如: [cyan]export VIRUSTOTAL_API_KEY=your_key_here[/cyan]", title="Configuration Error", ) ) def display_connectivity_results(self, results: Dict[str, Tuple[bool, str]]): """Display API connectivity results""" table = Table(title="API Connectivity Test") table.add_column("Service", style="cyan") table.add_column("Status", style="bold") table.add_column("Details", style="dim") for service, (success, message) in results.items(): status = ( "[green]✅ CONNECTED[/green]" if success else "[red]❌ FAILED[/red]" ) table.add_row(service.title(), status, message) console.print(table) def provide_recommendations( self, env_results: Dict[str, bool], api_results: Dict[str, bool], connectivity_results: Dict[str, Tuple[bool, str]], ): """Provide recommendations based on diagnostic results""" recommendations = [] # Environment issues if not env_results.get("python_version", True): recommendations.append("❌ 升级到Python 3.8或更高版本") if not env_results.get("env_vars_configured", True): recommendations.append( "❌ 设置环境变量,例如: export VIRUSTOTAL_API_KEY=your_key" ) if not env_results.get("mcp_installed", True): recommendations.append("❌ 安装MCP: pip install mcp") if not env_results.get("httpx_installed", True): recommendations.append("❌ 安装httpx: pip install httpx") # API key issues if not api_results.get("has_any_api_key", True): recommendations.append("❌ 至少配置一个API密钥") # Connectivity issues failed_apis = [ service for service, (success, _) in connectivity_results.items() if not success and api_results.get(service, False) ] if failed_apis: recommendations.append(f"❌ 修复连接问题: {', '.join(failed_apis)}") if recommendations: console.print(Panel("\n".join(recommendations), title="Recommendations")) else: console.print( Panel( "[green]🎉 所有检查都通过了!您的Cyber Sentinel已准备就绪。[/green]", title="Success", ) ) async def run_full_diagnostic(self): """Run complete diagnostic check""" console.print( Panel.fit( "[bold blue]🔍 Cyber Sentinel 诊断工具[/bold blue]\n\n" "正在检查您的配置和连接性...", title="Diagnostics", ) ) # Environment checks console.print("\n[bold]1. 环境检查[/bold]") env_results = self.check_environment() self.display_environment_results(env_results) # API key checks console.print("\n[bold]2. API密钥配置[/bold]") api_results = self.check_api_keys() self.display_api_key_results(api_results) # Connectivity checks if api_results["has_any_api_key"]: console.print("\n[bold]3. API连接测试[/bold]") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}") ) as progress: task = progress.add_task("正在测试API连接...", total=None) connectivity_results = await self.validate_api_connectivity() self.display_connectivity_results(connectivity_results) else: connectivity_results = {} # Recommendations console.print("\n[bold]4. 建议[/bold]") self.provide_recommendations(env_results, api_results, connectivity_results) async def main(): """Main entry point for diagnostics""" diagnostic = DiagnosticTool() await diagnostic.run_full_diagnostic() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jx888-max/cyber-sentinel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server