Skip to main content
Glama

Cyber Sentinel MCP Server

by jx888-max
setup_wizard.py10.9 kB
#!/usr/bin/env python3 """ Cyber Sentinel MCP Server - Interactive Setup Wizard """ import asyncio import os import sys from pathlib import Path from typing import Any, Dict, Optional import httpx from rich.console import Console from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn from rich.prompt import Confirm, Prompt from rich.table import Table console = Console() class SetupWizard: """Interactive setup wizard for Cyber Sentinel MCP Server""" def __init__(self): self.config = {} def welcome(self): """Display welcome message""" console.print( Panel.fit( "[bold blue]🛡️ Cyber Sentinel MCP Server Setup Wizard[/bold blue]\n\n" "此向导将帮助您配置威胁情报源。\n" "您需要至少一个支持的服务的API密钥。\n" "配置将通过环境变量进行,无需.env文件。", title="欢迎", ) ) def show_api_sources(self): """Show available API sources and their benefits""" table = Table(title="可用的威胁情报源") table.add_column("服务", style="cyan") table.add_column("免费额度", style="green") table.add_column("推荐程度", style="yellow") table.add_column("注册URL", style="blue") table.add_row( "VirusTotal", "1,000 req/day", "⭐ 强烈推荐", "https://www.virustotal.com/gui/join-us", ) table.add_row( "AbuseIPDB", "1,000 req/day", "⭐ 推荐", "https://www.abuseipdb.com/register", ) table.add_row("Shodan", "100 req/month", "可选", "https://account.shodan.io/") table.add_row("URLhaus", "无限制", "可选", "https://urlhaus.abuse.ch/api/") table.add_row( "AlienVault OTX", "10,000 req/month", "推荐", "https://otx.alienvault.com/" ) console.print(table) console.print( "\n[yellow]💡 提示: 您需要至少一个API密钥才能使用Cyber Sentinel[/yellow]" ) async def validate_api_key(self, service: str, api_key: str) -> bool: """Validate an API key by making a test request""" if not api_key or api_key.strip() == "": return False try: async with httpx.AsyncClient(timeout=10.0) as client: if service == "virustotal": response = await client.get( "https://www.virustotal.com/vtapi/v2/file/report", params={"apikey": api_key, "resource": "test"}, ) return response.status_code in [200, 204] elif service == "abuseipdb": response = await client.get( "https://api.abuseipdb.com/api/v2/check", headers={"Key": api_key, "Accept": "application/json"}, params={"ipAddress": "8.8.8.8", "maxAgeInDays": "90"}, ) return response.status_code == 200 elif service == "shodan": response = await client.get( f"https://api.shodan.io/api-info?key={api_key}" ) return response.status_code == 200 except Exception as e: console.print(f"[red]验证错误: {e}[/red]") return False return True async def collect_api_keys(self): """Collect and validate API keys from user""" console.print("\n[bold]步骤 1: API密钥配置[/bold]") # VirusTotal console.print("\n[cyan]VirusTotal API密钥[/cyan] (强烈推荐)") vt_key = Prompt.ask("输入您的VirusTotal API密钥 (或按Enter跳过)", default="") if vt_key: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}") ) as progress: task = progress.add_task("正在验证VirusTotal API密钥...", total=None) if await self.validate_api_key("virustotal", vt_key): console.print("[green]✅ VirusTotal API密钥有效![/green]") self.config["VIRUSTOTAL_API_KEY"] = vt_key else: console.print("[red]❌ VirusTotal API密钥验证失败[/red]") if Confirm.ask("仍然继续?"): self.config["VIRUSTOTAL_API_KEY"] = vt_key # AbuseIPDB console.print("\n[cyan]AbuseIPDB API密钥[/cyan] (推荐)") abuse_key = Prompt.ask("输入您的AbuseIPDB API密钥 (或按Enter跳过)", default="") if abuse_key: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}") ) as progress: task = progress.add_task("正在验证AbuseIPDB API密钥...", total=None) if await self.validate_api_key("abuseipdb", abuse_key): console.print("[green]✅ AbuseIPDB API密钥有效![/green]") self.config["ABUSEIPDB_API_KEY"] = abuse_key else: console.print("[red]❌ AbuseIPDB API密钥验证失败[/red]") if Confirm.ask("仍然继续?"): self.config["ABUSEIPDB_API_KEY"] = abuse_key # Shodan (optional) if Confirm.ask("\n您想配置Shodan API密钥吗? (可选)"): shodan_key = Prompt.ask("输入您的Shodan API密钥") if shodan_key: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), ) as progress: task = progress.add_task("正在验证Shodan API密钥...", total=None) if await self.validate_api_key("shodan", shodan_key): console.print("[green]✅ Shodan API密钥有效![/green]") self.config["SHODAN_API_KEY"] = shodan_key else: console.print("[red]❌ Shodan API密钥验证失败[/red]") if Confirm.ask("仍然继续?"): self.config["SHODAN_API_KEY"] = shodan_key # URLhaus (optional, no validation needed) if Confirm.ask("\n您想配置URLhaus API密钥吗? (可选)"): urlhaus_key = Prompt.ask("输入您的URLhaus API密钥") if urlhaus_key: self.config["URLHAUS_API_KEY"] = urlhaus_key # AlienVault OTX (optional) if Confirm.ask("\n您想配置AlienVault OTX API密钥吗? (推荐)"): otx_key = Prompt.ask("输入您的AlienVault OTX API密钥") if otx_key: self.config["OTX_API_KEY"] = otx_key def configure_advanced_settings(self): """Configure advanced settings""" console.print("\n[bold]步骤 2: 高级配置[/bold]") if Confirm.ask("您想配置高级设置吗?", default=False): self.config["MAX_REQUESTS_PER_MINUTE"] = Prompt.ask( "每分钟最大请求数", default="60" ) self.config["REQUEST_TIMEOUT"] = Prompt.ask( "请求超时时间 (秒)", default="30" ) self.config["CACHE_TTL"] = Prompt.ask("缓存TTL (秒)", default="3600") self.config["LOG_LEVEL"] = Prompt.ask( "日志级别", choices=["DEBUG", "INFO", "WARNING", "ERROR"], default="INFO", ) else: # Use defaults self.config.update( { "MAX_REQUESTS_PER_MINUTE": "60", "REQUEST_TIMEOUT": "30", "CACHE_TTL": "3600", "LOG_LEVEL": "INFO", "DEBUG": "false", } ) def save_config(self): """显示环境变量设置指令""" console.print("\n[bold]步骤 3: 环境变量配置[/bold]") # Check if at least one API key is configured api_keys = [ k for k in self.config.keys() if k.endswith("_API_KEY") and self.config[k] ] if not api_keys: console.print("[red]❌ 错误: 您必须配置至少一个API密钥![/red]") return False # 显示环境变量设置指令 console.print("\n[cyan]请设置以下环境变量:[/cyan]") console.print("\n[bold]Windows (PowerShell):[/bold]") for key, value in self.config.items(): console.print(f"$env:{key}='{value}'") console.print("\n[bold]Windows (CMD):[/bold]") for key, value in self.config.items(): console.print(f"set {key}={value}") console.print("\n[bold]Linux/macOS (Bash):[/bold]") for key, value in self.config.items(): console.print(f"export {key}='{value}'") console.print( "\n[yellow]💡 提示: 您可以将这些命令添加到您的shell配置文件中(.bashrc, .zshrc, 或PowerShell配置文件)以便永久保存。[/yellow]" ) return True def show_next_steps(self): """Show next steps to user""" console.print( Panel.fit( "[bold green]🎉 设置完成![/bold green]\n\n" "下一步:\n" "1. 设置上述环境变量\n" "2. 测试您的配置: [cyan]python -m cyber_sentinel.diagnostics[/cyan]\n" "3. 启动服务器: [cyan]python -m cyber_sentinel.server[/cyan]\n" "4. 在您的MCP客户端中配置 (Claude Desktop, Cursor等)\n\n" "获取帮助: [blue]https://github.com/jx888-max/cyber-sentinel-mcp[/blue]", title="成功", ) ) async def run(self): """Run the setup wizard""" try: self.welcome() self.show_api_sources() if not Confirm.ask("\n准备开始配置?"): console.print("设置已取消。") return await self.collect_api_keys() self.configure_advanced_settings() if self.save_config(): self.show_next_steps() else: console.print("[red]设置失败。请重试。[/red]") except KeyboardInterrupt: console.print("\n[yellow]用户取消了设置。[/yellow]") except Exception as e: console.print(f"[red]设置错误: {e}[/red]") def main(): """Main entry point""" wizard = SetupWizard() asyncio.run(wizard.run()) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jx888-max/cyber-sentinel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server