Skip to main content
Glama

Cyber Sentinel MCP Server

by jx888-max
cicd_integration.py17.3 kB
""" CI/CD Pipeline Integration Module Provides security analysis for CI/CD workflows """ import asyncio import hashlib import json import logging import re from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import yaml logger = logging.getLogger(__name__) class CICDAnalyzer: """CI/CD安全分析器""" def __init__(self, threat_aggregator): self.threat_aggregator = threat_aggregator async def scan_project_dependencies( self, project_files: Dict[str, str] ) -> Dict[str, Any]: """扫描项目依赖包的安全漏洞""" results = { "scan_type": "dependency_scan", "timestamp": datetime.now().isoformat(), "vulnerabilities": [], "recommendations": [], "risk_score": 0, "files_analyzed": [], } # 分析不同类型的依赖文件 dependency_files = { "package.json": self._analyze_npm_dependencies, "requirements.txt": self._analyze_python_dependencies, "pom.xml": self._analyze_maven_dependencies, "Cargo.toml": self._analyze_rust_dependencies, "go.mod": self._analyze_go_dependencies, "composer.json": self._analyze_php_dependencies, } for filename, content in project_files.items(): if filename in dependency_files: try: file_results = await dependency_files[filename](content) results["files_analyzed"].append(filename) results["vulnerabilities"].extend( file_results.get("vulnerabilities", []) ) results["recommendations"].extend( file_results.get("recommendations", []) ) except Exception as e: logger.error(f"Error analyzing {filename}: {e}") results["vulnerabilities"].append( { "type": "analysis_error", "file": filename, "message": f"Failed to analyze: {str(e)}", } ) # 计算风险评分 results["risk_score"] = self._calculate_risk_score(results["vulnerabilities"]) return results async def _analyze_npm_dependencies( self, package_json_content: str ) -> Dict[str, Any]: """分析NPM依赖""" try: package_data = json.loads(package_json_content) vulnerabilities = [] recommendations = [] # 检查dependencies和devDependencies all_deps = {} all_deps.update(package_data.get("dependencies", {})) all_deps.update(package_data.get("devDependencies", {})) for package_name, version in all_deps.items(): # 检查已知的高风险包 if package_name in self._get_known_malicious_npm_packages(): vulnerabilities.append( { "type": "malicious_package", "package": package_name, "version": version, "severity": "critical", "description": f"Package {package_name} is known to be malicious", } ) # 检查过时的版本 if self._is_outdated_version(package_name, version): vulnerabilities.append( { "type": "outdated_dependency", "package": package_name, "version": version, "severity": "medium", "description": f"Package {package_name}@{version} is outdated", } ) if vulnerabilities: recommendations.append("运行 'npm audit' 检查已知漏洞") recommendations.append("考虑使用 'npm update' 更新依赖包") return { "vulnerabilities": vulnerabilities, "recommendations": recommendations, } except json.JSONDecodeError: return { "vulnerabilities": [ {"type": "parse_error", "message": "Invalid package.json format"} ], "recommendations": [], } async def _analyze_python_dependencies( self, requirements_content: str ) -> Dict[str, Any]: """分析Python依赖""" vulnerabilities = [] recommendations = [] lines = requirements_content.strip().split("\n") for line in lines: line = line.strip() if line and not line.startswith("#"): # 解析包名和版本 package_match = re.match(r"^([a-zA-Z0-9\-_]+)([>=<!=]+.*)?", line) if package_match: package_name = package_match.group(1) version_spec = package_match.group(2) or "" # 检查已知的高风险包 if package_name in self._get_known_malicious_python_packages(): vulnerabilities.append( { "type": "malicious_package", "package": package_name, "version": version_spec, "severity": "critical", "description": f"Package {package_name} is known to be malicious", } ) # 检查是否固定版本 if not version_spec or "==" not in version_spec: vulnerabilities.append( { "type": "unpinned_dependency", "package": package_name, "severity": "low", "description": f"Package {package_name} version is not pinned", } ) if vulnerabilities: recommendations.append("使用 'pip-audit' 检查已知漏洞") recommendations.append("固定所有依赖包的版本号") return {"vulnerabilities": vulnerabilities, "recommendations": recommendations} async def analyze_docker_image(self, dockerfile_content: str) -> Dict[str, Any]: """分析Dockerfile的安全性""" results = { "scan_type": "docker_security", "timestamp": datetime.now().isoformat(), "vulnerabilities": [], "recommendations": [], "risk_score": 0, } lines = dockerfile_content.strip().split("\n") for i, line in enumerate(lines, 1): line = line.strip() # 检查基础镜像 if line.upper().startswith("FROM"): base_image = line.split()[1] if len(line.split()) > 1 else "" if ":latest" in base_image or ":" not in base_image: results["vulnerabilities"].append( { "type": "insecure_base_image", "line": i, "severity": "medium", "description": "使用了 'latest' 标签或未指定版本的基础镜像", "recommendation": "使用具体的版本标签", } ) # 检查基础镜像是否来自可信源 if not any( trusted in base_image for trusted in [ "ubuntu", "alpine", "debian", "centos", "node", "python", ] ): results["vulnerabilities"].append( { "type": "untrusted_base_image", "line": i, "severity": "high", "description": f"基础镜像 {base_image} 可能来自不可信源", } ) # 检查以root用户运行 if line.upper().startswith("USER") and "root" in line: results["vulnerabilities"].append( { "type": "root_user", "line": i, "severity": "high", "description": "容器以root用户运行", "recommendation": "创建并使用非特权用户", } ) # 检查敏感信息泄露 if any( keyword in line.upper() for keyword in ["PASSWORD", "SECRET", "KEY", "TOKEN"] ): results["vulnerabilities"].append( { "type": "secrets_exposure", "line": i, "severity": "critical", "description": "可能包含敏感信息", "recommendation": "使用环境变量或secrets管理", } ) # 生成建议 if not any("USER" in line.upper() for line in lines): results["recommendations"].append("添加非root用户运行容器") results["recommendations"].extend( ["使用多阶段构建减少攻击面", "定期更新基础镜像", "使用镜像扫描工具检查漏洞"] ) results["risk_score"] = self._calculate_risk_score(results["vulnerabilities"]) return results async def scan_kubernetes_config(self, yaml_content: str) -> Dict[str, Any]: """扫描Kubernetes配置的安全问题""" results = { "scan_type": "kubernetes_security", "timestamp": datetime.now().isoformat(), "vulnerabilities": [], "recommendations": [], "risk_score": 0, } try: # 解析YAML文档 docs = list(yaml.safe_load_all(yaml_content)) for doc in docs: if not doc: continue kind = doc.get("kind", "") metadata = doc.get("metadata", {}) spec = doc.get("spec", {}) # 检查Pod安全上下文 if kind in ["Pod", "Deployment", "StatefulSet", "DaemonSet"]: self._check_pod_security_context(doc, results) # 检查Service配置 if kind == "Service": self._check_service_security(doc, results) # 检查网络策略 if kind == "NetworkPolicy": self._check_network_policy(doc, results) # 检查RBAC if kind in ["Role", "ClusterRole", "RoleBinding", "ClusterRoleBinding"]: self._check_rbac_security(doc, results) except yaml.YAMLError as e: results["vulnerabilities"].append( { "type": "yaml_parse_error", "severity": "high", "description": f"YAML解析错误: {str(e)}", } ) results["risk_score"] = self._calculate_risk_score(results["vulnerabilities"]) return results def _check_pod_security_context(self, doc: Dict, results: Dict): """检查Pod安全上下文""" spec = doc.get("spec", {}) template_spec = ( spec.get("template", {}).get("spec", {}) if "template" in spec else spec ) security_context = template_spec.get("securityContext", {}) # 检查是否以root运行 if security_context.get("runAsUser") == 0: results["vulnerabilities"].append( { "type": "pod_runs_as_root", "severity": "high", "description": "Pod配置为以root用户运行", "resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}", } ) # 检查特权模式 containers = template_spec.get("containers", []) for container in containers: container_security = container.get("securityContext", {}) if container_security.get("privileged"): results["vulnerabilities"].append( { "type": "privileged_container", "severity": "critical", "description": f"容器 {container.get('name')} 运行在特权模式", "resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}", } ) def _check_service_security(self, doc: Dict, results: Dict): """检查Service安全配置""" spec = doc.get("spec", {}) # 检查NodePort服务 if spec.get("type") == "NodePort": results["vulnerabilities"].append( { "type": "nodeport_service", "severity": "medium", "description": "使用NodePort类型的Service可能暴露不必要的端口", "resource": f"Service/{doc.get('metadata', {}).get('name')}", } ) # 检查LoadBalancer服务 if spec.get("type") == "LoadBalancer": results["vulnerabilities"].append( { "type": "loadbalancer_service", "severity": "medium", "description": "LoadBalancer服务直接暴露到互联网", "resource": f"Service/{doc.get('metadata', {}).get('name')}", } ) def _check_network_policy(self, doc: Dict, results: Dict): """检查网络策略""" spec = doc.get("spec", {}) # 检查是否有入站规则 if not spec.get("ingress"): results["recommendations"].append("考虑添加具体的入站网络规则") # 检查是否有出站规则 if not spec.get("egress"): results["recommendations"].append("考虑添加具体的出站网络规则") def _check_rbac_security(self, doc: Dict, results: Dict): """检查RBAC安全配置""" rules = doc.get("rules", []) for rule in rules: # 检查过于宽泛的权限 if "*" in rule.get("verbs", []): results["vulnerabilities"].append( { "type": "overprivileged_rbac", "severity": "high", "description": "RBAC规则包含过于宽泛的权限 (*)", "resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}", } ) if "*" in rule.get("resources", []): results["vulnerabilities"].append( { "type": "overprivileged_rbac", "severity": "high", "description": "RBAC规则允许访问所有资源 (*)", "resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}", } ) def _get_known_malicious_npm_packages(self) -> List[str]: """获取已知恶意NPM包列表""" return [ "event-stream", "flatmap-stream", "getcookies", "http-fetch", "node-sqlite", "nodesass", "crossenv", "cross-env.js", ] def _get_known_malicious_python_packages(self) -> List[str]: """获取已知恶意Python包列表""" return [ "urllib3", "requests", "colorama", "numpy", "tensorflow", "torch", "pandas", "matplotlib", "scipy", "scikit-learn", ] def _is_outdated_version(self, package_name: str, version: str) -> bool: """检查包版本是否过时(简化实现)""" # 这里可以集成真实的包版本检查API # 目前返回False作为占位符 return False def _calculate_risk_score(self, vulnerabilities: List[Dict]) -> int: """计算风险评分""" score = 0 severity_weights = {"critical": 10, "high": 7, "medium": 4, "low": 1} for vuln in vulnerabilities: severity = vuln.get("severity", "low") score += severity_weights.get(severity, 1) return min(score, 100) # 最高100分

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jx888-max/cyber-sentinel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server