Skip to main content
Glama

Cyber Sentinel MCP Server

by jx888-max
visualization.py16.1 kB
""" Security Visualization and Reporting Module Generates visual reports with Chinese/English support """ import json import logging from datetime import datetime from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) class SecurityReportGenerator: """安全报告生成器 - 支持中英文可视化""" def __init__(self): # 中英文标签映射 self.labels = { "zh": { "title": "网络安全分析报告", "summary": "摘要", "vulnerabilities": "漏洞", "recommendations": "建议", "risk_score": "风险评分", "severity": "严重程度", "critical": "严重", "high": "高", "medium": "中", "low": "低", "clean": "安全", "malicious": "恶意", "unknown": "未知", "timestamp": "时间戳", "indicator": "指标", "source": "来源", "details": "详情", "line": "行号", "file": "文件", "type": "类型", "description": "描述", "fix": "修复建议", "threat_intel": "威胁情报", "code_analysis": "代码分析", "dependency_scan": "依赖扫描", "docker_security": "Docker安全", "kubernetes_security": "Kubernetes安全", "total_issues": "问题总数", "risk_distribution": "风险分布", "security_score": "安全评分", }, "en": { "title": "Cybersecurity Analysis Report", "summary": "Summary", "vulnerabilities": "Vulnerabilities", "recommendations": "Recommendations", "risk_score": "Risk Score", "severity": "Severity", "critical": "Critical", "high": "High", "medium": "Medium", "low": "Low", "clean": "Clean", "malicious": "Malicious", "unknown": "Unknown", "timestamp": "Timestamp", "indicator": "Indicator", "source": "Source", "details": "Details", "line": "Line", "file": "File", "type": "Type", "description": "Description", "fix": "Fix Recommendation", "threat_intel": "Threat Intelligence", "code_analysis": "Code Analysis", "dependency_scan": "Dependency Scan", "docker_security": "Docker Security", "kubernetes_security": "Kubernetes Security", "total_issues": "Total Issues", "risk_distribution": "Risk Distribution", "security_score": "Security Score", }, } # 颜色配置 self.colors = { "critical": "#dc3545", # 红色 "high": "#fd7e14", # 橙色 "medium": "#ffc107", # 黄色 "low": "#28a745", # 绿色 "clean": "#6c757d", # 灰色 "malicious": "#dc3545", # 红色 "unknown": "#6c757d", # 灰色 } def generate_mermaid_chart( self, data: Dict[str, Any], chart_type: str = "pie", locale: str = "zh" ) -> str: """生成Mermaid图表""" labels = self.labels[locale] if chart_type == "pie": return self._generate_pie_chart(data, labels) elif chart_type == "bar": return self._generate_bar_chart(data, labels) elif chart_type == "timeline": return self._generate_timeline_chart(data, labels) elif chart_type == "flowchart": return self._generate_flowchart(data, labels) else: return self._generate_pie_chart(data, labels) def _generate_pie_chart(self, data: Dict[str, Any], labels: Dict[str, str]) -> str: """生成饼图""" vulnerabilities = data.get("vulnerabilities", []) # 统计严重程度分布 severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0} for vuln in vulnerabilities: severity = vuln.get("severity", "low") if severity in severity_count: severity_count[severity] += 1 # 生成Mermaid饼图 chart = "pie title " + labels["risk_distribution"] + "\n" for severity, count in severity_count.items(): if count > 0: chart += f' "{labels[severity]}" : {count}\n' return chart def _generate_bar_chart(self, data: Dict[str, Any], labels: Dict[str, str]) -> str: """生成柱状图""" vulnerabilities = data.get("vulnerabilities", []) # 统计漏洞类型分布 type_count = {} for vuln in vulnerabilities: vuln_type = vuln.get("type", "unknown") type_count[vuln_type] = type_count.get(vuln_type, 0) + 1 # 生成Mermaid柱状图(使用flowchart模拟) chart = "graph TB\n" chart += f' A["{labels["vulnerabilities"]}"] --> B["{labels["type"]} {labels["distribution"]}"]\n' for i, (vuln_type, count) in enumerate(type_count.items()): node_id = f"C{i+1}" chart += f' B --> {node_id}["{vuln_type}: {count}"]\n' return chart def _generate_timeline_chart( self, data: Dict[str, Any], labels: Dict[str, str] ) -> str: """生成时间线图表""" timestamp = data.get("timestamp", datetime.now().isoformat()) analysis_type = data.get("analysis_type", "security_analysis") chart = "timeline\n" chart += f' title {labels["title"]}\n' chart += f" {timestamp[:10]} : {labels[analysis_type] if analysis_type in labels else analysis_type}\n" vulnerabilities = data.get("vulnerabilities", []) if vulnerabilities: chart += ( f' : {labels["total_issues"]}: {len(vulnerabilities)}\n' ) # 按严重程度分组 severity_groups = {} for vuln in vulnerabilities: severity = vuln.get("severity", "low") if severity not in severity_groups: severity_groups[severity] = 0 severity_groups[severity] += 1 for severity, count in severity_groups.items(): chart += f" : {labels[severity]}: {count}\n" return chart def _generate_flowchart(self, data: Dict[str, Any], labels: Dict[str, str]) -> str: """生成流程图""" chart = "flowchart TD\n" chart += f' A["{labels["title"]}"] --> B["{labels["summary"]}"]\n' # 添加分析类型 analysis_type = data.get("analysis_type", "security_analysis") chart += f' B --> C["{labels.get(analysis_type, analysis_type)}"]\n' # 添加风险评分 risk_score = data.get("risk_score", 0) risk_level = self._get_risk_level(risk_score) chart += f' C --> D["{labels["risk_score"]}: {risk_score}"]\n' chart += f' D --> E["{labels["severity"]}: {labels[risk_level]}"]\n' # 添加漏洞统计 vulnerabilities = data.get("vulnerabilities", []) if vulnerabilities: chart += ( f' E --> F["{labels["vulnerabilities"]}: {len(vulnerabilities)}"]\n' ) # 按严重程度分类 severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0} for vuln in vulnerabilities: severity = vuln.get("severity", "low") if severity in severity_count: severity_count[severity] += 1 for severity, count in severity_count.items(): if count > 0: node_id = f"G{severity[0].upper()}" chart += f' F --> {node_id}["{labels[severity]}: {count}"]\n' return chart def generate_comprehensive_report( self, data: Dict[str, Any], locale: str = "zh" ) -> Dict[str, Any]: """生成综合安全报告""" labels = self.labels[locale] report = { "title": labels["title"], "timestamp": datetime.now().isoformat(), "locale": locale, "summary": self._generate_summary(data, labels), "charts": { "risk_distribution": self.generate_mermaid_chart(data, "pie", locale), "vulnerability_timeline": self.generate_mermaid_chart( data, "timeline", locale ), "security_overview": self.generate_mermaid_chart( data, "flowchart", locale ), }, "detailed_findings": self._format_detailed_findings(data, labels), "recommendations": self._format_recommendations(data, labels), "risk_assessment": self._generate_risk_assessment(data, labels), } return report def _generate_summary( self, data: Dict[str, Any], labels: Dict[str, str] ) -> Dict[str, Any]: """生成摘要""" vulnerabilities = data.get("vulnerabilities", []) risk_score = data.get("risk_score", 0) # 统计严重程度分布 severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0} for vuln in vulnerabilities: severity = vuln.get("severity", "low") if severity in severity_count: severity_count[severity] += 1 summary = { labels["total_issues"]: len(vulnerabilities), labels["risk_score"]: risk_score, labels["severity"] + " " + labels["distribution"]: { labels[k]: v for k, v in severity_count.items() if v > 0 }, labels["security_score"]: max(0, 100 - risk_score), } return summary def _format_detailed_findings( self, data: Dict[str, Any], labels: Dict[str, str] ) -> List[Dict[str, Any]]: """格式化详细发现""" vulnerabilities = data.get("vulnerabilities", []) formatted_findings = [] for vuln in vulnerabilities: finding = { labels["type"]: vuln.get("type", "unknown"), labels["severity"]: labels.get( vuln.get("severity", "low"), vuln.get("severity", "low") ), labels["description"]: vuln.get("description", ""), } # 添加位置信息 if "line" in vuln: finding[labels["line"]] = vuln["line"] if "file" in vuln: finding[labels["file"]] = vuln["file"] # 添加指标信息 if "indicator" in vuln: finding[labels["indicator"]] = vuln["indicator"] # 添加威胁详情 if "threat_details" in vuln: finding[labels["threat_intel"]] = vuln["threat_details"] formatted_findings.append(finding) return formatted_findings def _format_recommendations( self, data: Dict[str, Any], labels: Dict[str, str] ) -> List[str]: """格式化建议""" recommendations = data.get("recommendations", []) return recommendations def _generate_risk_assessment( self, data: Dict[str, Any], labels: Dict[str, str] ) -> Dict[str, Any]: """生成风险评估""" risk_score = data.get("risk_score", 0) vulnerabilities = data.get("vulnerabilities", []) # 计算风险等级 risk_level = self._get_risk_level(risk_score) # 统计关键指标 critical_count = sum( 1 for v in vulnerabilities if v.get("severity") == "critical" ) high_count = sum(1 for v in vulnerabilities if v.get("severity") == "high") assessment = { labels["risk_score"]: risk_score, labels["severity"]: labels[risk_level], "critical_issues": critical_count, "high_issues": high_count, "total_issues": len(vulnerabilities), "security_posture": self._get_security_posture(risk_score, labels), } return assessment def _get_risk_level(self, risk_score: int) -> str: """获取风险等级""" if risk_score >= 80: return "critical" elif risk_score >= 60: return "high" elif risk_score >= 30: return "medium" else: return "low" def _get_security_posture(self, risk_score: int, labels: Dict[str, str]) -> str: """获取安全态势描述""" if risk_score >= 80: return ( "需要立即采取行动" if "需要" in labels.get("critical", "") else "Immediate action required" ) elif risk_score >= 60: return ( "存在重大安全风险" if "需要" in labels.get("critical", "") else "Significant security risks present" ) elif risk_score >= 30: return ( "存在一些安全问题" if "需要" in labels.get("critical", "") else "Some security issues present" ) else: return ( "安全状况良好" if "需要" in labels.get("critical", "") else "Good security posture" ) def export_report_json(self, report: Dict[str, Any]) -> str: """导出JSON格式报告""" return json.dumps(report, ensure_ascii=False, indent=2) def export_report_markdown(self, report: Dict[str, Any]) -> str: """导出Markdown格式报告""" md = f"# {report['title']}\n\n" md += f"**{report.get('timestamp', '')}**\n\n" # 摘要 md += "## 摘要\n\n" if report["locale"] == "zh" else "## Summary\n\n" summary = report.get("summary", {}) for key, value in summary.items(): if isinstance(value, dict): md += f"**{key}:**\n" for k, v in value.items(): md += f"- {k}: {v}\n" else: md += f"**{key}:** {value}\n" md += "\n" # 图表 if "charts" in report: md += ( "## 可视化图表\n\n" if report["locale"] == "zh" else "## Visualization Charts\n\n" ) for chart_name, chart_content in report["charts"].items(): md += f"### {chart_name}\n\n" md += "```mermaid\n" md += chart_content md += "\n```\n\n" # 详细发现 if "detailed_findings" in report and report["detailed_findings"]: md += ( "## 详细发现\n\n" if report["locale"] == "zh" else "## Detailed Findings\n\n" ) for i, finding in enumerate(report["detailed_findings"], 1): md += f"### {i}. {finding.get('类型', finding.get('Type', 'Unknown'))}\n\n" for key, value in finding.items(): if key not in ["类型", "Type"]: md += f"**{key}:** {value}\n" md += "\n" # 建议 if "recommendations" in report and report["recommendations"]: md += ( "## 建议\n\n" if report["locale"] == "zh" else "## Recommendations\n\n" ) for rec in report["recommendations"]: md += f"- {rec}\n" md += "\n" return md

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jx888-max/cyber-sentinel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server