#!/usr/bin/env python3
"""
Cyber Sentinel MCP Server - Configuration Diagnostics
"""
import asyncio
import os
import sys
from pathlib import Path
from typing import Dict, List, Tuple
import httpx
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
from .config import get_settings
console = Console()
class DiagnosticTool:
"""Configuration diagnostic and validation tool"""
def __init__(self):
self.settings = get_settings()
self.results = {}
def check_environment(self) -> Dict[str, bool]:
"""Check basic environment setup"""
checks = {}
# Check Python version
python_version = sys.version_info
checks["python_version"] = python_version >= (3, 8)
# 不再检查.env文件,直接从环境变量读取
# 检查是否有环境变量设置
env_vars_set = any(
[
os.getenv("VIRUSTOTAL_API_KEY"),
os.getenv("ABUSEIPDB_API_KEY"),
os.getenv("SHODAN_API_KEY"),
os.getenv("URLHAUS_API_KEY"),
]
)
checks["env_vars_configured"] = env_vars_set
# Check required packages
try:
import mcp
checks["mcp_installed"] = True
except ImportError:
checks["mcp_installed"] = False
try:
import httpx
checks["httpx_installed"] = True
except ImportError:
checks["httpx_installed"] = False
return checks
def check_api_keys(self) -> Dict[str, bool]:
"""Check if API keys are configured"""
checks = {}
checks["virustotal"] = bool(self.settings.virustotal_api_key)
checks["abuseipdb"] = bool(self.settings.abuseipdb_api_key)
checks["shodan"] = bool(self.settings.shodan_api_key)
checks["urlhaus"] = bool(self.settings.urlhaus_api_key)
# Check if at least one API key is configured
checks["has_any_api_key"] = any(
[
checks["virustotal"],
checks["abuseipdb"],
checks["shodan"],
checks["urlhaus"],
]
)
return checks
async def validate_api_connectivity(self) -> Dict[str, Tuple[bool, str]]:
"""Validate API connectivity and authentication"""
results = {}
async with httpx.AsyncClient(timeout=10.0) as client:
# Test VirusTotal
if self.settings.virustotal_api_key:
try:
response = await client.get(
"https://www.virustotal.com/vtapi/v2/file/report",
params={
"apikey": self.settings.virustotal_api_key,
"resource": "test",
},
)
if response.status_code in [200, 204]:
results["virustotal"] = (True, "API key valid")
else:
results["virustotal"] = (False, f"HTTP {response.status_code}")
except Exception as e:
results["virustotal"] = (False, str(e))
else:
results["virustotal"] = (False, "No API key configured")
# Test AbuseIPDB
if self.settings.abuseipdb_api_key:
try:
response = await client.get(
"https://api.abuseipdb.com/api/v2/check",
headers={
"Key": self.settings.abuseipdb_api_key,
"Accept": "application/json",
},
params={"ipAddress": "8.8.8.8", "maxAgeInDays": "90"},
)
if response.status_code == 200:
results["abuseipdb"] = (True, "API key valid")
else:
results["abuseipdb"] = (False, f"HTTP {response.status_code}")
except Exception as e:
results["abuseipdb"] = (False, str(e))
else:
results["abuseipdb"] = (False, "No API key configured")
# Test Shodan
if self.settings.shodan_api_key:
try:
response = await client.get(
f"https://api.shodan.io/api-info?key={self.settings.shodan_api_key}"
)
if response.status_code == 200:
results["shodan"] = (True, "API key valid")
else:
results["shodan"] = (False, f"HTTP {response.status_code}")
except Exception as e:
results["shodan"] = (False, str(e))
else:
results["shodan"] = (False, "No API key configured")
# URLhaus doesn't require authentication for basic queries
if self.settings.urlhaus_api_key:
results["urlhaus"] = (True, "API key configured")
else:
results["urlhaus"] = (False, "No API key configured")
return results
def display_environment_results(self, results: Dict[str, bool]):
"""Display environment check results"""
table = Table(title="Environment Check")
table.add_column("Check", style="cyan")
table.add_column("Status", style="bold")
table.add_column("Details", style="dim")
for check, passed in results.items():
status = "[green]✅ PASS[/green]" if passed else "[red]❌ FAIL[/red]"
details = {
"python_version": f"Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"env_vars_configured": "环境变量已配置" if passed else "未配置环境变量",
"mcp_installed": (
"MCP package available" if passed else "MCP package not found"
),
"httpx_installed": (
"httpx package available" if passed else "httpx package not found"
),
}
table.add_row(
check.replace("_", " ").title(), status, details.get(check, "")
)
console.print(table)
def display_api_key_results(self, results: Dict[str, bool]):
"""Display API key check results"""
table = Table(title="API Key Configuration")
table.add_column("Service", style="cyan")
table.add_column("Configured", style="bold")
table.add_column("Recommendation", style="dim")
recommendations = {
"virustotal": "Highly recommended for comprehensive analysis",
"abuseipdb": "Recommended for IP reputation checks",
"shodan": "Optional for device intelligence",
"urlhaus": "Optional for URL analysis",
"has_any_api_key": "At least one API key required",
}
for service, configured in results.items():
if service == "has_any_api_key":
continue
status = "[green]✅ YES[/green]" if configured else "[yellow]⚠️ NO[/yellow]"
table.add_row(service.title(), status, recommendations.get(service, ""))
console.print(table)
if not results["has_any_api_key"]:
console.print(
Panel(
"[red]❌ 未配置API密钥![/red]\n\n"
"您需要至少配置一个API密钥才能使用Cyber Sentinel。\n"
"请设置环境变量,例如: [cyan]export VIRUSTOTAL_API_KEY=your_key_here[/cyan]",
title="Configuration Error",
)
)
def display_connectivity_results(self, results: Dict[str, Tuple[bool, str]]):
"""Display API connectivity results"""
table = Table(title="API Connectivity Test")
table.add_column("Service", style="cyan")
table.add_column("Status", style="bold")
table.add_column("Details", style="dim")
for service, (success, message) in results.items():
status = (
"[green]✅ CONNECTED[/green]" if success else "[red]❌ FAILED[/red]"
)
table.add_row(service.title(), status, message)
console.print(table)
def provide_recommendations(
self,
env_results: Dict[str, bool],
api_results: Dict[str, bool],
connectivity_results: Dict[str, Tuple[bool, str]],
):
"""Provide recommendations based on diagnostic results"""
recommendations = []
# Environment issues
if not env_results.get("python_version", True):
recommendations.append("❌ 升级到Python 3.8或更高版本")
if not env_results.get("env_vars_configured", True):
recommendations.append(
"❌ 设置环境变量,例如: export VIRUSTOTAL_API_KEY=your_key"
)
if not env_results.get("mcp_installed", True):
recommendations.append("❌ 安装MCP: pip install mcp")
if not env_results.get("httpx_installed", True):
recommendations.append("❌ 安装httpx: pip install httpx")
# API key issues
if not api_results.get("has_any_api_key", True):
recommendations.append("❌ 至少配置一个API密钥")
# Connectivity issues
failed_apis = [
service
for service, (success, _) in connectivity_results.items()
if not success and api_results.get(service, False)
]
if failed_apis:
recommendations.append(f"❌ 修复连接问题: {', '.join(failed_apis)}")
if recommendations:
console.print(Panel("\n".join(recommendations), title="Recommendations"))
else:
console.print(
Panel(
"[green]🎉 所有检查都通过了!您的Cyber Sentinel已准备就绪。[/green]",
title="Success",
)
)
async def run_full_diagnostic(self):
"""Run complete diagnostic check"""
console.print(
Panel.fit(
"[bold blue]🔍 Cyber Sentinel 诊断工具[/bold blue]\n\n"
"正在检查您的配置和连接性...",
title="Diagnostics",
)
)
# Environment checks
console.print("\n[bold]1. 环境检查[/bold]")
env_results = self.check_environment()
self.display_environment_results(env_results)
# API key checks
console.print("\n[bold]2. API密钥配置[/bold]")
api_results = self.check_api_keys()
self.display_api_key_results(api_results)
# Connectivity checks
if api_results["has_any_api_key"]:
console.print("\n[bold]3. API连接测试[/bold]")
with Progress(
SpinnerColumn(), TextColumn("[progress.description]{task.description}")
) as progress:
task = progress.add_task("正在测试API连接...", total=None)
connectivity_results = await self.validate_api_connectivity()
self.display_connectivity_results(connectivity_results)
else:
connectivity_results = {}
# Recommendations
console.print("\n[bold]4. 建议[/bold]")
self.provide_recommendations(env_results, api_results, connectivity_results)
async def main():
"""Main entry point for diagnostics"""
diagnostic = DiagnosticTool()
await diagnostic.run_full_diagnostic()
if __name__ == "__main__":
asyncio.run(main())