"""
Cyber Sentinel MCP Server
Main server implementation using FastMCP
"""
import logging
import sys
from datetime import datetime
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from .cicd_integration import CICDAnalyzer
from .code_analyzer import CodeSecurityAnalyzer
from .config import get_settings, validate_api_keys
from .threat_intel import ThreatIntelAggregator
from .visualization import SecurityReportGenerator
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
app = FastMCP("CyberSentinel")
# Global instances
aggregator: Optional[ThreatIntelAggregator] = None
cicd_analyzer: Optional[CICDAnalyzer] = None
code_analyzer: Optional[CodeSecurityAnalyzer] = None
report_generator: Optional[SecurityReportGenerator] = None
class IndicatorAnalysisRequest(BaseModel):
"""Request model for indicator analysis"""
indicator: str = Field(
..., description="The indicator to analyze (IP, domain, hash, or URL)"
)
force_refresh: bool = Field(
default=False, description="Force refresh from sources, bypass cache"
)
class IndicatorAnalysisResponse(BaseModel):
"""Response model for indicator analysis"""
indicator: str
type: str
overall_reputation: str
confidence: float
sources_checked: int
sources_responded: int
malicious_sources: int
clean_sources: int
countries: List[str]
isps: List[str]
detailed_results: List[Dict[str, Any]]
errors: List[str]
timestamp: str
@app.tool()
async def analyze_indicator(indicator: str) -> Dict[str, Any]:
"""
Analyze a security indicator (IP address, domain, hash, or URL) across multiple threat intelligence sources.
This tool queries various threat intelligence APIs to provide a comprehensive security assessment
of the given indicator. It aggregates results from multiple sources and provides an overall
reputation score with confidence level.
Args:
indicator: The security indicator to analyze. Can be:
- IP address (e.g., "8.8.8.8")
- Domain name (e.g., "example.com")
- File hash (MD5, SHA1, or SHA256)
- URL (e.g., "https://example.com/path")
Returns:
A comprehensive analysis report including:
- Overall reputation (malicious/clean/unknown)
- Confidence score (0-100)
- Results from individual sources
- Geographic and ISP information (for IPs)
- Error information if any sources failed
"""
global aggregator
if not aggregator:
return {
"error": "Threat intelligence aggregator not initialized. Please check API key configuration.",
"indicator": indicator,
"timestamp": "N/A",
}
try:
result = await aggregator.analyze_indicator(indicator)
return result
except Exception as e:
logger.error(f"Error analyzing indicator {indicator}: {str(e)}")
return {
"error": f"Analysis failed: {str(e)}",
"indicator": indicator,
"timestamp": "N/A",
}
@app.tool()
async def check_api_status() -> Dict[str, Any]:
"""
Check the status of configured threat intelligence API keys and sources.
Returns information about which threat intelligence sources are available
and properly configured with valid API keys.
Returns:
Status information for each threat intelligence source including:
- Which APIs are configured
- API key validation status
- Available functionality per source
"""
settings = get_settings()
api_status = validate_api_keys()
source_info = {
"virustotal": {
"configured": api_status["virustotal"],
"capabilities": (
["ip", "domain", "hash"] if api_status["virustotal"] else []
),
"description": "VirusTotal - Comprehensive malware and URL analysis",
},
"abuseipdb": {
"configured": api_status["abuseipdb"],
"capabilities": ["ip"] if api_status["abuseipdb"] else [],
"description": "AbuseIPDB - IP address abuse reporting and checking",
},
"urlhaus": {
"configured": True, # URLhaus doesn't require API key
"capabilities": ["url", "hash"],
"description": "URLhaus - Malware URL and payload tracking",
},
"shodan": {
"configured": api_status["shodan"],
"capabilities": ["ip"] if api_status["shodan"] else [],
"description": "Shodan - Internet-connected device search engine",
},
}
total_configured = sum(1 for info in source_info.values() if info["configured"])
return {
"total_sources": len(source_info),
"configured_sources": total_configured,
"sources": source_info,
"rate_limit": f"{settings.max_requests_per_minute} requests/minute",
"cache_ttl": f"{settings.cache_ttl} seconds",
"status": "ready" if total_configured > 0 else "no_sources_configured",
}
@app.tool()
async def analyze_code_security(
code_content: str, language: str = "auto", locale: str = "zh"
) -> Dict[str, Any]:
"""
分析代码安全性并提供主动安全建议
这个工具会扫描代码中的安全漏洞,包括硬编码密钥、SQL注入、XSS漏洞等,
并检查代码中的网络指标(IP、域名、URL)是否存在威胁。
Args:
code_content: 要分析的代码内容
language: 编程语言 (auto/python/javascript/java/csharp/php/go/rust/cpp/sql)
locale: 语言设置 (zh/en)
Returns:
包含安全漏洞、建议和修复方案的详细分析报告
"""
global code_analyzer
if not code_analyzer:
return {
"error": "Code security analyzer not initialized",
"timestamp": datetime.now().isoformat(),
}
try:
result = await code_analyzer.analyze_code_security(
code_content, language, locale
)
return result
except Exception as e:
logger.error(f"Error analyzing code security: {str(e)}")
return {
"error": f"Code analysis failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
@app.tool()
async def scan_project_dependencies(project_files: Dict[str, str]) -> Dict[str, Any]:
"""
扫描项目依赖包的安全漏洞
支持多种项目类型的依赖文件分析,包括NPM、Python、Maven、Cargo等。
检查已知的恶意包、过时的依赖和安全漏洞。
Args:
project_files: 项目文件字典,键为文件名,值为文件内容
支持: package.json, requirements.txt, pom.xml, Cargo.toml, go.mod, composer.json
Returns:
包含漏洞列表、建议和风险评分的扫描报告
"""
global cicd_analyzer
if not cicd_analyzer:
return {
"error": "CI/CD analyzer not initialized",
"timestamp": datetime.now().isoformat(),
}
try:
result = await cicd_analyzer.scan_project_dependencies(project_files)
return result
except Exception as e:
logger.error(f"Error scanning dependencies: {str(e)}")
return {
"error": f"Dependency scan failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
@app.tool()
async def analyze_docker_security(dockerfile_content: str) -> Dict[str, Any]:
"""
分析Dockerfile的安全配置
检查Docker镜像构建文件中的安全问题,包括基础镜像安全性、
用户权限配置、敏感信息泄露等。
Args:
dockerfile_content: Dockerfile文件内容
Returns:
包含安全问题、建议和风险评分的分析报告
"""
global cicd_analyzer
if not cicd_analyzer:
return {
"error": "CI/CD analyzer not initialized",
"timestamp": datetime.now().isoformat(),
}
try:
result = await cicd_analyzer.analyze_docker_image(dockerfile_content)
return result
except Exception as e:
logger.error(f"Error analyzing Docker security: {str(e)}")
return {
"error": f"Docker analysis failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
@app.tool()
async def scan_kubernetes_config(yaml_content: str) -> Dict[str, Any]:
"""
扫描Kubernetes配置的安全问题
分析Kubernetes YAML配置文件中的安全风险,包括Pod安全上下文、
服务配置、网络策略和RBAC权限等。
Args:
yaml_content: Kubernetes YAML配置文件内容
Returns:
包含安全问题、建议和风险评分的分析报告
"""
global cicd_analyzer
if not cicd_analyzer:
return {
"error": "CI/CD analyzer not initialized",
"timestamp": datetime.now().isoformat(),
}
try:
result = await cicd_analyzer.scan_kubernetes_config(yaml_content)
return result
except Exception as e:
logger.error(f"Error scanning Kubernetes config: {str(e)}")
return {
"error": f"Kubernetes scan failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
@app.tool()
async def generate_security_report(
data: Dict[str, Any], chart_type: str = "comprehensive", locale: str = "zh"
) -> Dict[str, Any]:
"""
生成安全分析的可视化报告
基于安全分析数据生成包含图表和详细信息的综合报告,
支持中英文显示和多种图表类型。
Args:
data: 安全分析数据
chart_type: 图表类型 (pie/bar/timeline/flowchart/comprehensive)
locale: 语言设置 (zh/en)
Returns:
包含Mermaid图表、详细发现和建议的可视化报告
"""
global report_generator
if not report_generator:
return {
"error": "Report generator not initialized",
"timestamp": datetime.now().isoformat(),
}
try:
if chart_type == "comprehensive":
result = report_generator.generate_comprehensive_report(data, locale)
else:
chart = report_generator.generate_mermaid_chart(data, chart_type, locale)
result = {
"chart_type": chart_type,
"locale": locale,
"chart": chart,
"timestamp": datetime.now().isoformat(),
}
return result
except Exception as e:
logger.error(f"Error generating security report: {str(e)}")
return {
"error": f"Report generation failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
@app.resource("threat-intel://status")
async def get_threat_intel_status() -> str:
"""
Get current threat intelligence system status and configuration.
Returns a detailed status report of the threat intelligence aggregation system.
"""
status = await check_api_status()
status_report = f"""# Cyber Sentinel Status Report
## Configuration Summary
- **Total Sources**: {status['total_sources']}
- **Configured Sources**: {status['configured_sources']}
- **Rate Limit**: {status['rate_limit']}
- **Cache TTL**: {status['cache_ttl']}
- **Overall Status**: {status['status'].upper()}
## Source Details
"""
for source_name, info in status["sources"].items():
status_icon = "✅" if info["configured"] else "❌"
capabilities = (
", ".join(info["capabilities"]) if info["capabilities"] else "None"
)
status_report += f"""
### {source_name.title()} {status_icon}
- **Status**: {'Configured' if info['configured'] else 'Not Configured'}
- **Capabilities**: {capabilities}
- **Description**: {info['description']}
"""
if status["configured_sources"] == 0:
status_report += """
## ⚠️ Configuration Required
No threat intelligence sources are currently configured. Please set up API keys:
1. **VirusTotal**: Set `VIRUSTOTAL_API_KEY` environment variable
2. **AbuseIPDB**: Set `ABUSEIPDB_API_KEY` environment variable
3. **Shodan**: Set `SHODAN_API_KEY` environment variable
URLhaus is available without API key configuration.
"""
return status_report
def initialize_server():
"""Initialize all components"""
global aggregator, cicd_analyzer, code_analyzer, report_generator
try:
settings = get_settings()
logger.info(f"Initializing Cyber Sentinel with log level: {settings.log_level}")
# Set log level
logging.getLogger().setLevel(getattr(logging, settings.log_level.upper()))
# Initialize aggregator
aggregator = ThreatIntelAggregator()
# Initialize other components
cicd_analyzer = CICDAnalyzer(aggregator)
code_analyzer = CodeSecurityAnalyzer(aggregator)
report_generator = SecurityReportGenerator()
# Check API configuration
api_status = validate_api_keys()
configured_sources = sum(1 for configured in api_status.values() if configured)
if configured_sources == 0:
logger.warning(
"No threat intelligence API keys configured. Only URLhaus will be available."
)
else:
logger.info(
f"Initialized with {configured_sources} configured threat intelligence sources"
)
logger.info("Cyber Sentinel MCP Server initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize server: {str(e)}")
sys.exit(1)
# 自动初始化服务器
initialize_server()
def main():
"""Main entry point for the server"""
initialize_server()
app.run()
if __name__ == "__main__":
main()