"""
Security Visualization and Reporting Module
Generates visual reports with Chinese/English support
"""
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class SecurityReportGenerator:
"""安全报告生成器 - 支持中英文可视化"""
def __init__(self):
# 中英文标签映射
self.labels = {
"zh": {
"title": "网络安全分析报告",
"summary": "摘要",
"vulnerabilities": "漏洞",
"recommendations": "建议",
"risk_score": "风险评分",
"severity": "严重程度",
"critical": "严重",
"high": "高",
"medium": "中",
"low": "低",
"clean": "安全",
"malicious": "恶意",
"unknown": "未知",
"timestamp": "时间戳",
"indicator": "指标",
"source": "来源",
"details": "详情",
"line": "行号",
"file": "文件",
"type": "类型",
"description": "描述",
"fix": "修复建议",
"threat_intel": "威胁情报",
"code_analysis": "代码分析",
"dependency_scan": "依赖扫描",
"docker_security": "Docker安全",
"kubernetes_security": "Kubernetes安全",
"total_issues": "问题总数",
"risk_distribution": "风险分布",
"security_score": "安全评分",
},
"en": {
"title": "Cybersecurity Analysis Report",
"summary": "Summary",
"vulnerabilities": "Vulnerabilities",
"recommendations": "Recommendations",
"risk_score": "Risk Score",
"severity": "Severity",
"critical": "Critical",
"high": "High",
"medium": "Medium",
"low": "Low",
"clean": "Clean",
"malicious": "Malicious",
"unknown": "Unknown",
"timestamp": "Timestamp",
"indicator": "Indicator",
"source": "Source",
"details": "Details",
"line": "Line",
"file": "File",
"type": "Type",
"description": "Description",
"fix": "Fix Recommendation",
"threat_intel": "Threat Intelligence",
"code_analysis": "Code Analysis",
"dependency_scan": "Dependency Scan",
"docker_security": "Docker Security",
"kubernetes_security": "Kubernetes Security",
"total_issues": "Total Issues",
"risk_distribution": "Risk Distribution",
"security_score": "Security Score",
},
}
# 颜色配置
self.colors = {
"critical": "#dc3545", # 红色
"high": "#fd7e14", # 橙色
"medium": "#ffc107", # 黄色
"low": "#28a745", # 绿色
"clean": "#6c757d", # 灰色
"malicious": "#dc3545", # 红色
"unknown": "#6c757d", # 灰色
}
def generate_mermaid_chart(
self, data: Dict[str, Any], chart_type: str = "pie", locale: str = "zh"
) -> str:
"""生成Mermaid图表"""
labels = self.labels[locale]
if chart_type == "pie":
return self._generate_pie_chart(data, labels)
elif chart_type == "bar":
return self._generate_bar_chart(data, labels)
elif chart_type == "timeline":
return self._generate_timeline_chart(data, labels)
elif chart_type == "flowchart":
return self._generate_flowchart(data, labels)
else:
return self._generate_pie_chart(data, labels)
def _generate_pie_chart(self, data: Dict[str, Any], labels: Dict[str, str]) -> str:
"""生成饼图"""
vulnerabilities = data.get("vulnerabilities", [])
# 统计严重程度分布
severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for vuln in vulnerabilities:
severity = vuln.get("severity", "low")
if severity in severity_count:
severity_count[severity] += 1
# 生成Mermaid饼图
chart = "pie title " + labels["risk_distribution"] + "\n"
for severity, count in severity_count.items():
if count > 0:
chart += f' "{labels[severity]}" : {count}\n'
return chart
def _generate_bar_chart(self, data: Dict[str, Any], labels: Dict[str, str]) -> str:
"""生成柱状图"""
vulnerabilities = data.get("vulnerabilities", [])
# 统计漏洞类型分布
type_count = {}
for vuln in vulnerabilities:
vuln_type = vuln.get("type", "unknown")
type_count[vuln_type] = type_count.get(vuln_type, 0) + 1
# 生成Mermaid柱状图(使用flowchart模拟)
chart = "graph TB\n"
chart += f' A["{labels["vulnerabilities"]}"] --> B["{labels["type"]} {labels["distribution"]}"]\n'
for i, (vuln_type, count) in enumerate(type_count.items()):
node_id = f"C{i+1}"
chart += f' B --> {node_id}["{vuln_type}: {count}"]\n'
return chart
def _generate_timeline_chart(
self, data: Dict[str, Any], labels: Dict[str, str]
) -> str:
"""生成时间线图表"""
timestamp = data.get("timestamp", datetime.now().isoformat())
analysis_type = data.get("analysis_type", "security_analysis")
chart = "timeline\n"
chart += f' title {labels["title"]}\n'
chart += f" {timestamp[:10]} : {labels[analysis_type] if analysis_type in labels else analysis_type}\n"
vulnerabilities = data.get("vulnerabilities", [])
if vulnerabilities:
chart += (
f' : {labels["total_issues"]}: {len(vulnerabilities)}\n'
)
# 按严重程度分组
severity_groups = {}
for vuln in vulnerabilities:
severity = vuln.get("severity", "low")
if severity not in severity_groups:
severity_groups[severity] = 0
severity_groups[severity] += 1
for severity, count in severity_groups.items():
chart += f" : {labels[severity]}: {count}\n"
return chart
def _generate_flowchart(self, data: Dict[str, Any], labels: Dict[str, str]) -> str:
"""生成流程图"""
chart = "flowchart TD\n"
chart += f' A["{labels["title"]}"] --> B["{labels["summary"]}"]\n'
# 添加分析类型
analysis_type = data.get("analysis_type", "security_analysis")
chart += f' B --> C["{labels.get(analysis_type, analysis_type)}"]\n'
# 添加风险评分
risk_score = data.get("risk_score", 0)
risk_level = self._get_risk_level(risk_score)
chart += f' C --> D["{labels["risk_score"]}: {risk_score}"]\n'
chart += f' D --> E["{labels["severity"]}: {labels[risk_level]}"]\n'
# 添加漏洞统计
vulnerabilities = data.get("vulnerabilities", [])
if vulnerabilities:
chart += (
f' E --> F["{labels["vulnerabilities"]}: {len(vulnerabilities)}"]\n'
)
# 按严重程度分类
severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for vuln in vulnerabilities:
severity = vuln.get("severity", "low")
if severity in severity_count:
severity_count[severity] += 1
for severity, count in severity_count.items():
if count > 0:
node_id = f"G{severity[0].upper()}"
chart += f' F --> {node_id}["{labels[severity]}: {count}"]\n'
return chart
def generate_comprehensive_report(
self, data: Dict[str, Any], locale: str = "zh"
) -> Dict[str, Any]:
"""生成综合安全报告"""
labels = self.labels[locale]
report = {
"title": labels["title"],
"timestamp": datetime.now().isoformat(),
"locale": locale,
"summary": self._generate_summary(data, labels),
"charts": {
"risk_distribution": self.generate_mermaid_chart(data, "pie", locale),
"vulnerability_timeline": self.generate_mermaid_chart(
data, "timeline", locale
),
"security_overview": self.generate_mermaid_chart(
data, "flowchart", locale
),
},
"detailed_findings": self._format_detailed_findings(data, labels),
"recommendations": self._format_recommendations(data, labels),
"risk_assessment": self._generate_risk_assessment(data, labels),
}
return report
def _generate_summary(
self, data: Dict[str, Any], labels: Dict[str, str]
) -> Dict[str, Any]:
"""生成摘要"""
vulnerabilities = data.get("vulnerabilities", [])
risk_score = data.get("risk_score", 0)
# 统计严重程度分布
severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for vuln in vulnerabilities:
severity = vuln.get("severity", "low")
if severity in severity_count:
severity_count[severity] += 1
summary = {
labels["total_issues"]: len(vulnerabilities),
labels["risk_score"]: risk_score,
labels["severity"]
+ " "
+ labels["distribution"]: {
labels[k]: v for k, v in severity_count.items() if v > 0
},
labels["security_score"]: max(0, 100 - risk_score),
}
return summary
def _format_detailed_findings(
self, data: Dict[str, Any], labels: Dict[str, str]
) -> List[Dict[str, Any]]:
"""格式化详细发现"""
vulnerabilities = data.get("vulnerabilities", [])
formatted_findings = []
for vuln in vulnerabilities:
finding = {
labels["type"]: vuln.get("type", "unknown"),
labels["severity"]: labels.get(
vuln.get("severity", "low"), vuln.get("severity", "low")
),
labels["description"]: vuln.get("description", ""),
}
# 添加位置信息
if "line" in vuln:
finding[labels["line"]] = vuln["line"]
if "file" in vuln:
finding[labels["file"]] = vuln["file"]
# 添加指标信息
if "indicator" in vuln:
finding[labels["indicator"]] = vuln["indicator"]
# 添加威胁详情
if "threat_details" in vuln:
finding[labels["threat_intel"]] = vuln["threat_details"]
formatted_findings.append(finding)
return formatted_findings
def _format_recommendations(
self, data: Dict[str, Any], labels: Dict[str, str]
) -> List[str]:
"""格式化建议"""
recommendations = data.get("recommendations", [])
return recommendations
def _generate_risk_assessment(
self, data: Dict[str, Any], labels: Dict[str, str]
) -> Dict[str, Any]:
"""生成风险评估"""
risk_score = data.get("risk_score", 0)
vulnerabilities = data.get("vulnerabilities", [])
# 计算风险等级
risk_level = self._get_risk_level(risk_score)
# 统计关键指标
critical_count = sum(
1 for v in vulnerabilities if v.get("severity") == "critical"
)
high_count = sum(1 for v in vulnerabilities if v.get("severity") == "high")
assessment = {
labels["risk_score"]: risk_score,
labels["severity"]: labels[risk_level],
"critical_issues": critical_count,
"high_issues": high_count,
"total_issues": len(vulnerabilities),
"security_posture": self._get_security_posture(risk_score, labels),
}
return assessment
def _get_risk_level(self, risk_score: int) -> str:
"""获取风险等级"""
if risk_score >= 80:
return "critical"
elif risk_score >= 60:
return "high"
elif risk_score >= 30:
return "medium"
else:
return "low"
def _get_security_posture(self, risk_score: int, labels: Dict[str, str]) -> str:
"""获取安全态势描述"""
if risk_score >= 80:
return (
"需要立即采取行动"
if "需要" in labels.get("critical", "")
else "Immediate action required"
)
elif risk_score >= 60:
return (
"存在重大安全风险"
if "需要" in labels.get("critical", "")
else "Significant security risks present"
)
elif risk_score >= 30:
return (
"存在一些安全问题"
if "需要" in labels.get("critical", "")
else "Some security issues present"
)
else:
return (
"安全状况良好"
if "需要" in labels.get("critical", "")
else "Good security posture"
)
def export_report_json(self, report: Dict[str, Any]) -> str:
"""导出JSON格式报告"""
return json.dumps(report, ensure_ascii=False, indent=2)
def export_report_markdown(self, report: Dict[str, Any]) -> str:
"""导出Markdown格式报告"""
md = f"# {report['title']}\n\n"
md += f"**{report.get('timestamp', '')}**\n\n"
# 摘要
md += "## 摘要\n\n" if report["locale"] == "zh" else "## Summary\n\n"
summary = report.get("summary", {})
for key, value in summary.items():
if isinstance(value, dict):
md += f"**{key}:**\n"
for k, v in value.items():
md += f"- {k}: {v}\n"
else:
md += f"**{key}:** {value}\n"
md += "\n"
# 图表
if "charts" in report:
md += (
"## 可视化图表\n\n"
if report["locale"] == "zh"
else "## Visualization Charts\n\n"
)
for chart_name, chart_content in report["charts"].items():
md += f"### {chart_name}\n\n"
md += "```mermaid\n"
md += chart_content
md += "\n```\n\n"
# 详细发现
if "detailed_findings" in report and report["detailed_findings"]:
md += (
"## 详细发现\n\n"
if report["locale"] == "zh"
else "## Detailed Findings\n\n"
)
for i, finding in enumerate(report["detailed_findings"], 1):
md += f"### {i}. {finding.get('类型', finding.get('Type', 'Unknown'))}\n\n"
for key, value in finding.items():
if key not in ["类型", "Type"]:
md += f"**{key}:** {value}\n"
md += "\n"
# 建议
if "recommendations" in report and report["recommendations"]:
md += (
"## 建议\n\n" if report["locale"] == "zh" else "## Recommendations\n\n"
)
for rec in report["recommendations"]:
md += f"- {rec}\n"
md += "\n"
return md