Skip to main content
Glama

Cyber Sentinel MCP Server

by jx888-max
server.py14 kB
""" Cyber Sentinel MCP Server Main server implementation using FastMCP """ import logging import sys from datetime import datetime from typing import Any, Dict, List, Optional from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field from .cicd_integration import CICDAnalyzer from .code_analyzer import CodeSecurityAnalyzer from .config import get_settings, validate_api_keys from .threat_intel import ThreatIntelAggregator from .visualization import SecurityReportGenerator # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize FastMCP server app = FastMCP("CyberSentinel") # Global instances aggregator: Optional[ThreatIntelAggregator] = None cicd_analyzer: Optional[CICDAnalyzer] = None code_analyzer: Optional[CodeSecurityAnalyzer] = None report_generator: Optional[SecurityReportGenerator] = None class IndicatorAnalysisRequest(BaseModel): """Request model for indicator analysis""" indicator: str = Field( ..., description="The indicator to analyze (IP, domain, hash, or URL)" ) force_refresh: bool = Field( default=False, description="Force refresh from sources, bypass cache" ) class IndicatorAnalysisResponse(BaseModel): """Response model for indicator analysis""" indicator: str type: str overall_reputation: str confidence: float sources_checked: int sources_responded: int malicious_sources: int clean_sources: int countries: List[str] isps: List[str] detailed_results: List[Dict[str, Any]] errors: List[str] timestamp: str @app.tool() async def analyze_indicator(indicator: str) -> Dict[str, Any]: """ Analyze a security indicator (IP address, domain, hash, or URL) across multiple threat intelligence sources. This tool queries various threat intelligence APIs to provide a comprehensive security assessment of the given indicator. It aggregates results from multiple sources and provides an overall reputation score with confidence level. Args: indicator: The security indicator to analyze. Can be: - IP address (e.g., "8.8.8.8") - Domain name (e.g., "example.com") - File hash (MD5, SHA1, or SHA256) - URL (e.g., "https://example.com/path") Returns: A comprehensive analysis report including: - Overall reputation (malicious/clean/unknown) - Confidence score (0-100) - Results from individual sources - Geographic and ISP information (for IPs) - Error information if any sources failed """ global aggregator if not aggregator: return { "error": "Threat intelligence aggregator not initialized. Please check API key configuration.", "indicator": indicator, "timestamp": "N/A", } try: result = await aggregator.analyze_indicator(indicator) return result except Exception as e: logger.error(f"Error analyzing indicator {indicator}: {str(e)}") return { "error": f"Analysis failed: {str(e)}", "indicator": indicator, "timestamp": "N/A", } @app.tool() async def check_api_status() -> Dict[str, Any]: """ Check the status of configured threat intelligence API keys and sources. Returns information about which threat intelligence sources are available and properly configured with valid API keys. Returns: Status information for each threat intelligence source including: - Which APIs are configured - API key validation status - Available functionality per source """ settings = get_settings() api_status = validate_api_keys() source_info = { "virustotal": { "configured": api_status["virustotal"], "capabilities": ( ["ip", "domain", "hash"] if api_status["virustotal"] else [] ), "description": "VirusTotal - Comprehensive malware and URL analysis", }, "abuseipdb": { "configured": api_status["abuseipdb"], "capabilities": ["ip"] if api_status["abuseipdb"] else [], "description": "AbuseIPDB - IP address abuse reporting and checking", }, "urlhaus": { "configured": True, # URLhaus doesn't require API key "capabilities": ["url", "hash"], "description": "URLhaus - Malware URL and payload tracking", }, "shodan": { "configured": api_status["shodan"], "capabilities": ["ip"] if api_status["shodan"] else [], "description": "Shodan - Internet-connected device search engine", }, } total_configured = sum(1 for info in source_info.values() if info["configured"]) return { "total_sources": len(source_info), "configured_sources": total_configured, "sources": source_info, "rate_limit": f"{settings.max_requests_per_minute} requests/minute", "cache_ttl": f"{settings.cache_ttl} seconds", "status": "ready" if total_configured > 0 else "no_sources_configured", } @app.tool() async def analyze_code_security( code_content: str, language: str = "auto", locale: str = "zh" ) -> Dict[str, Any]: """ 分析代码安全性并提供主动安全建议 这个工具会扫描代码中的安全漏洞,包括硬编码密钥、SQL注入、XSS漏洞等, 并检查代码中的网络指标(IP、域名、URL)是否存在威胁。 Args: code_content: 要分析的代码内容 language: 编程语言 (auto/python/javascript/java/csharp/php/go/rust/cpp/sql) locale: 语言设置 (zh/en) Returns: 包含安全漏洞、建议和修复方案的详细分析报告 """ global code_analyzer if not code_analyzer: return { "error": "Code security analyzer not initialized", "timestamp": datetime.now().isoformat(), } try: result = await code_analyzer.analyze_code_security( code_content, language, locale ) return result except Exception as e: logger.error(f"Error analyzing code security: {str(e)}") return { "error": f"Code analysis failed: {str(e)}", "timestamp": datetime.now().isoformat(), } @app.tool() async def scan_project_dependencies(project_files: Dict[str, str]) -> Dict[str, Any]: """ 扫描项目依赖包的安全漏洞 支持多种项目类型的依赖文件分析,包括NPM、Python、Maven、Cargo等。 检查已知的恶意包、过时的依赖和安全漏洞。 Args: project_files: 项目文件字典,键为文件名,值为文件内容 支持: package.json, requirements.txt, pom.xml, Cargo.toml, go.mod, composer.json Returns: 包含漏洞列表、建议和风险评分的扫描报告 """ global cicd_analyzer if not cicd_analyzer: return { "error": "CI/CD analyzer not initialized", "timestamp": datetime.now().isoformat(), } try: result = await cicd_analyzer.scan_project_dependencies(project_files) return result except Exception as e: logger.error(f"Error scanning dependencies: {str(e)}") return { "error": f"Dependency scan failed: {str(e)}", "timestamp": datetime.now().isoformat(), } @app.tool() async def analyze_docker_security(dockerfile_content: str) -> Dict[str, Any]: """ 分析Dockerfile的安全配置 检查Docker镜像构建文件中的安全问题,包括基础镜像安全性、 用户权限配置、敏感信息泄露等。 Args: dockerfile_content: Dockerfile文件内容 Returns: 包含安全问题、建议和风险评分的分析报告 """ global cicd_analyzer if not cicd_analyzer: return { "error": "CI/CD analyzer not initialized", "timestamp": datetime.now().isoformat(), } try: result = await cicd_analyzer.analyze_docker_image(dockerfile_content) return result except Exception as e: logger.error(f"Error analyzing Docker security: {str(e)}") return { "error": f"Docker analysis failed: {str(e)}", "timestamp": datetime.now().isoformat(), } @app.tool() async def scan_kubernetes_config(yaml_content: str) -> Dict[str, Any]: """ 扫描Kubernetes配置的安全问题 分析Kubernetes YAML配置文件中的安全风险,包括Pod安全上下文、 服务配置、网络策略和RBAC权限等。 Args: yaml_content: Kubernetes YAML配置文件内容 Returns: 包含安全问题、建议和风险评分的分析报告 """ global cicd_analyzer if not cicd_analyzer: return { "error": "CI/CD analyzer not initialized", "timestamp": datetime.now().isoformat(), } try: result = await cicd_analyzer.scan_kubernetes_config(yaml_content) return result except Exception as e: logger.error(f"Error scanning Kubernetes config: {str(e)}") return { "error": f"Kubernetes scan failed: {str(e)}", "timestamp": datetime.now().isoformat(), } @app.tool() async def generate_security_report( data: Dict[str, Any], chart_type: str = "comprehensive", locale: str = "zh" ) -> Dict[str, Any]: """ 生成安全分析的可视化报告 基于安全分析数据生成包含图表和详细信息的综合报告, 支持中英文显示和多种图表类型。 Args: data: 安全分析数据 chart_type: 图表类型 (pie/bar/timeline/flowchart/comprehensive) locale: 语言设置 (zh/en) Returns: 包含Mermaid图表、详细发现和建议的可视化报告 """ global report_generator if not report_generator: return { "error": "Report generator not initialized", "timestamp": datetime.now().isoformat(), } try: if chart_type == "comprehensive": result = report_generator.generate_comprehensive_report(data, locale) else: chart = report_generator.generate_mermaid_chart(data, chart_type, locale) result = { "chart_type": chart_type, "locale": locale, "chart": chart, "timestamp": datetime.now().isoformat(), } return result except Exception as e: logger.error(f"Error generating security report: {str(e)}") return { "error": f"Report generation failed: {str(e)}", "timestamp": datetime.now().isoformat(), } @app.resource("threat-intel://status") async def get_threat_intel_status() -> str: """ Get current threat intelligence system status and configuration. Returns a detailed status report of the threat intelligence aggregation system. """ status = await check_api_status() status_report = f"""# Cyber Sentinel Status Report ## Configuration Summary - **Total Sources**: {status['total_sources']} - **Configured Sources**: {status['configured_sources']} - **Rate Limit**: {status['rate_limit']} - **Cache TTL**: {status['cache_ttl']} - **Overall Status**: {status['status'].upper()} ## Source Details """ for source_name, info in status["sources"].items(): status_icon = "✅" if info["configured"] else "❌" capabilities = ( ", ".join(info["capabilities"]) if info["capabilities"] else "None" ) status_report += f""" ### {source_name.title()} {status_icon} - **Status**: {'Configured' if info['configured'] else 'Not Configured'} - **Capabilities**: {capabilities} - **Description**: {info['description']} """ if status["configured_sources"] == 0: status_report += """ ## ⚠️ Configuration Required No threat intelligence sources are currently configured. Please set up API keys: 1. **VirusTotal**: Set `VIRUSTOTAL_API_KEY` environment variable 2. **AbuseIPDB**: Set `ABUSEIPDB_API_KEY` environment variable 3. **Shodan**: Set `SHODAN_API_KEY` environment variable URLhaus is available without API key configuration. """ return status_report def initialize_server(): """Initialize all components""" global aggregator, cicd_analyzer, code_analyzer, report_generator try: settings = get_settings() logger.info(f"Initializing Cyber Sentinel with log level: {settings.log_level}") # Set log level logging.getLogger().setLevel(getattr(logging, settings.log_level.upper())) # Initialize aggregator aggregator = ThreatIntelAggregator() # Initialize other components cicd_analyzer = CICDAnalyzer(aggregator) code_analyzer = CodeSecurityAnalyzer(aggregator) report_generator = SecurityReportGenerator() # Check API configuration api_status = validate_api_keys() configured_sources = sum(1 for configured in api_status.values() if configured) if configured_sources == 0: logger.warning( "No threat intelligence API keys configured. Only URLhaus will be available." ) else: logger.info( f"Initialized with {configured_sources} configured threat intelligence sources" ) logger.info("Cyber Sentinel MCP Server initialized successfully") except Exception as e: logger.error(f"Failed to initialize server: {str(e)}") sys.exit(1) # 自动初始化服务器 initialize_server() def main(): """Main entry point for the server""" initialize_server() app.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jx888-max/cyber-sentinel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server