#!/usr/bin/env python3
"""
Cyber Sentinel MCP Server - Interactive Setup Wizard
"""
import asyncio
import os
import sys
from pathlib import Path
from typing import Any, Dict, Optional
import httpx
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Confirm, Prompt
from rich.table import Table
console = Console()
class SetupWizard:
"""Interactive setup wizard for Cyber Sentinel MCP Server"""
def __init__(self):
self.config = {}
def welcome(self):
"""Display welcome message"""
console.print(
Panel.fit(
"[bold blue]🛡️ Cyber Sentinel MCP Server Setup Wizard[/bold blue]\n\n"
"此向导将帮助您配置威胁情报源。\n"
"您需要至少一个支持的服务的API密钥。\n"
"配置将通过环境变量进行,无需.env文件。",
title="欢迎",
)
)
def show_api_sources(self):
"""Show available API sources and their benefits"""
table = Table(title="可用的威胁情报源")
table.add_column("服务", style="cyan")
table.add_column("免费额度", style="green")
table.add_column("推荐程度", style="yellow")
table.add_column("注册URL", style="blue")
table.add_row(
"VirusTotal",
"1,000 req/day",
"⭐ 强烈推荐",
"https://www.virustotal.com/gui/join-us",
)
table.add_row(
"AbuseIPDB",
"1,000 req/day",
"⭐ 推荐",
"https://www.abuseipdb.com/register",
)
table.add_row("Shodan", "100 req/month", "可选", "https://account.shodan.io/")
table.add_row("URLhaus", "无限制", "可选", "https://urlhaus.abuse.ch/api/")
table.add_row(
"AlienVault OTX", "10,000 req/month", "推荐", "https://otx.alienvault.com/"
)
console.print(table)
console.print(
"\n[yellow]💡 提示: 您需要至少一个API密钥才能使用Cyber Sentinel[/yellow]"
)
async def validate_api_key(self, service: str, api_key: str) -> bool:
"""Validate an API key by making a test request"""
if not api_key or api_key.strip() == "":
return False
try:
async with httpx.AsyncClient(timeout=10.0) as client:
if service == "virustotal":
response = await client.get(
"https://www.virustotal.com/vtapi/v2/file/report",
params={"apikey": api_key, "resource": "test"},
)
return response.status_code in [200, 204]
elif service == "abuseipdb":
response = await client.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": api_key, "Accept": "application/json"},
params={"ipAddress": "8.8.8.8", "maxAgeInDays": "90"},
)
return response.status_code == 200
elif service == "shodan":
response = await client.get(
f"https://api.shodan.io/api-info?key={api_key}"
)
return response.status_code == 200
except Exception as e:
console.print(f"[red]验证错误: {e}[/red]")
return False
return True
async def collect_api_keys(self):
"""Collect and validate API keys from user"""
console.print("\n[bold]步骤 1: API密钥配置[/bold]")
# VirusTotal
console.print("\n[cyan]VirusTotal API密钥[/cyan] (强烈推荐)")
vt_key = Prompt.ask("输入您的VirusTotal API密钥 (或按Enter跳过)", default="")
if vt_key:
with Progress(
SpinnerColumn(), TextColumn("[progress.description]{task.description}")
) as progress:
task = progress.add_task("正在验证VirusTotal API密钥...", total=None)
if await self.validate_api_key("virustotal", vt_key):
console.print("[green]✅ VirusTotal API密钥有效![/green]")
self.config["VIRUSTOTAL_API_KEY"] = vt_key
else:
console.print("[red]❌ VirusTotal API密钥验证失败[/red]")
if Confirm.ask("仍然继续?"):
self.config["VIRUSTOTAL_API_KEY"] = vt_key
# AbuseIPDB
console.print("\n[cyan]AbuseIPDB API密钥[/cyan] (推荐)")
abuse_key = Prompt.ask("输入您的AbuseIPDB API密钥 (或按Enter跳过)", default="")
if abuse_key:
with Progress(
SpinnerColumn(), TextColumn("[progress.description]{task.description}")
) as progress:
task = progress.add_task("正在验证AbuseIPDB API密钥...", total=None)
if await self.validate_api_key("abuseipdb", abuse_key):
console.print("[green]✅ AbuseIPDB API密钥有效![/green]")
self.config["ABUSEIPDB_API_KEY"] = abuse_key
else:
console.print("[red]❌ AbuseIPDB API密钥验证失败[/red]")
if Confirm.ask("仍然继续?"):
self.config["ABUSEIPDB_API_KEY"] = abuse_key
# Shodan (optional)
if Confirm.ask("\n您想配置Shodan API密钥吗? (可选)"):
shodan_key = Prompt.ask("输入您的Shodan API密钥")
if shodan_key:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
) as progress:
task = progress.add_task("正在验证Shodan API密钥...", total=None)
if await self.validate_api_key("shodan", shodan_key):
console.print("[green]✅ Shodan API密钥有效![/green]")
self.config["SHODAN_API_KEY"] = shodan_key
else:
console.print("[red]❌ Shodan API密钥验证失败[/red]")
if Confirm.ask("仍然继续?"):
self.config["SHODAN_API_KEY"] = shodan_key
# URLhaus (optional, no validation needed)
if Confirm.ask("\n您想配置URLhaus API密钥吗? (可选)"):
urlhaus_key = Prompt.ask("输入您的URLhaus API密钥")
if urlhaus_key:
self.config["URLHAUS_API_KEY"] = urlhaus_key
# AlienVault OTX (optional)
if Confirm.ask("\n您想配置AlienVault OTX API密钥吗? (推荐)"):
otx_key = Prompt.ask("输入您的AlienVault OTX API密钥")
if otx_key:
self.config["OTX_API_KEY"] = otx_key
def configure_advanced_settings(self):
"""Configure advanced settings"""
console.print("\n[bold]步骤 2: 高级配置[/bold]")
if Confirm.ask("您想配置高级设置吗?", default=False):
self.config["MAX_REQUESTS_PER_MINUTE"] = Prompt.ask(
"每分钟最大请求数", default="60"
)
self.config["REQUEST_TIMEOUT"] = Prompt.ask(
"请求超时时间 (秒)", default="30"
)
self.config["CACHE_TTL"] = Prompt.ask("缓存TTL (秒)", default="3600")
self.config["LOG_LEVEL"] = Prompt.ask(
"日志级别",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
)
else:
# Use defaults
self.config.update(
{
"MAX_REQUESTS_PER_MINUTE": "60",
"REQUEST_TIMEOUT": "30",
"CACHE_TTL": "3600",
"LOG_LEVEL": "INFO",
"DEBUG": "false",
}
)
def save_config(self):
"""显示环境变量设置指令"""
console.print("\n[bold]步骤 3: 环境变量配置[/bold]")
# Check if at least one API key is configured
api_keys = [
k for k in self.config.keys() if k.endswith("_API_KEY") and self.config[k]
]
if not api_keys:
console.print("[red]❌ 错误: 您必须配置至少一个API密钥![/red]")
return False
# 显示环境变量设置指令
console.print("\n[cyan]请设置以下环境变量:[/cyan]")
console.print("\n[bold]Windows (PowerShell):[/bold]")
for key, value in self.config.items():
console.print(f"$env:{key}='{value}'")
console.print("\n[bold]Windows (CMD):[/bold]")
for key, value in self.config.items():
console.print(f"set {key}={value}")
console.print("\n[bold]Linux/macOS (Bash):[/bold]")
for key, value in self.config.items():
console.print(f"export {key}='{value}'")
console.print(
"\n[yellow]💡 提示: 您可以将这些命令添加到您的shell配置文件中(.bashrc, .zshrc, 或PowerShell配置文件)以便永久保存。[/yellow]"
)
return True
def show_next_steps(self):
"""Show next steps to user"""
console.print(
Panel.fit(
"[bold green]🎉 设置完成![/bold green]\n\n"
"下一步:\n"
"1. 设置上述环境变量\n"
"2. 测试您的配置: [cyan]python -m cyber_sentinel.diagnostics[/cyan]\n"
"3. 启动服务器: [cyan]python -m cyber_sentinel.server[/cyan]\n"
"4. 在您的MCP客户端中配置 (Claude Desktop, Cursor等)\n\n"
"获取帮助: [blue]https://github.com/jx888-max/cyber-sentinel-mcp[/blue]",
title="成功",
)
)
async def run(self):
"""Run the setup wizard"""
try:
self.welcome()
self.show_api_sources()
if not Confirm.ask("\n准备开始配置?"):
console.print("设置已取消。")
return
await self.collect_api_keys()
self.configure_advanced_settings()
if self.save_config():
self.show_next_steps()
else:
console.print("[red]设置失败。请重试。[/red]")
except KeyboardInterrupt:
console.print("\n[yellow]用户取消了设置。[/yellow]")
except Exception as e:
console.print(f"[red]设置错误: {e}[/red]")
def main():
"""Main entry point"""
wizard = SetupWizard()
asyncio.run(wizard.run())
if __name__ == "__main__":
main()