"""
CI/CD Pipeline Integration Module
Provides security analysis for CI/CD workflows
"""
import asyncio
import hashlib
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
logger = logging.getLogger(__name__)
class CICDAnalyzer:
"""CI/CD安全分析器"""
def __init__(self, threat_aggregator):
self.threat_aggregator = threat_aggregator
async def scan_project_dependencies(
self, project_files: Dict[str, str]
) -> Dict[str, Any]:
"""扫描项目依赖包的安全漏洞"""
results = {
"scan_type": "dependency_scan",
"timestamp": datetime.now().isoformat(),
"vulnerabilities": [],
"recommendations": [],
"risk_score": 0,
"files_analyzed": [],
}
# 分析不同类型的依赖文件
dependency_files = {
"package.json": self._analyze_npm_dependencies,
"requirements.txt": self._analyze_python_dependencies,
"pom.xml": self._analyze_maven_dependencies,
"Cargo.toml": self._analyze_rust_dependencies,
"go.mod": self._analyze_go_dependencies,
"composer.json": self._analyze_php_dependencies,
}
for filename, content in project_files.items():
if filename in dependency_files:
try:
file_results = await dependency_files[filename](content)
results["files_analyzed"].append(filename)
results["vulnerabilities"].extend(
file_results.get("vulnerabilities", [])
)
results["recommendations"].extend(
file_results.get("recommendations", [])
)
except Exception as e:
logger.error(f"Error analyzing {filename}: {e}")
results["vulnerabilities"].append(
{
"type": "analysis_error",
"file": filename,
"message": f"Failed to analyze: {str(e)}",
}
)
# 计算风险评分
results["risk_score"] = self._calculate_risk_score(results["vulnerabilities"])
return results
async def _analyze_npm_dependencies(
self, package_json_content: str
) -> Dict[str, Any]:
"""分析NPM依赖"""
try:
package_data = json.loads(package_json_content)
vulnerabilities = []
recommendations = []
# 检查dependencies和devDependencies
all_deps = {}
all_deps.update(package_data.get("dependencies", {}))
all_deps.update(package_data.get("devDependencies", {}))
for package_name, version in all_deps.items():
# 检查已知的高风险包
if package_name in self._get_known_malicious_npm_packages():
vulnerabilities.append(
{
"type": "malicious_package",
"package": package_name,
"version": version,
"severity": "critical",
"description": f"Package {package_name} is known to be malicious",
}
)
# 检查过时的版本
if self._is_outdated_version(package_name, version):
vulnerabilities.append(
{
"type": "outdated_dependency",
"package": package_name,
"version": version,
"severity": "medium",
"description": f"Package {package_name}@{version} is outdated",
}
)
if vulnerabilities:
recommendations.append("运行 'npm audit' 检查已知漏洞")
recommendations.append("考虑使用 'npm update' 更新依赖包")
return {
"vulnerabilities": vulnerabilities,
"recommendations": recommendations,
}
except json.JSONDecodeError:
return {
"vulnerabilities": [
{"type": "parse_error", "message": "Invalid package.json format"}
],
"recommendations": [],
}
async def _analyze_python_dependencies(
self, requirements_content: str
) -> Dict[str, Any]:
"""分析Python依赖"""
vulnerabilities = []
recommendations = []
lines = requirements_content.strip().split("\n")
for line in lines:
line = line.strip()
if line and not line.startswith("#"):
# 解析包名和版本
package_match = re.match(r"^([a-zA-Z0-9\-_]+)([>=<!=]+.*)?", line)
if package_match:
package_name = package_match.group(1)
version_spec = package_match.group(2) or ""
# 检查已知的高风险包
if package_name in self._get_known_malicious_python_packages():
vulnerabilities.append(
{
"type": "malicious_package",
"package": package_name,
"version": version_spec,
"severity": "critical",
"description": f"Package {package_name} is known to be malicious",
}
)
# 检查是否固定版本
if not version_spec or "==" not in version_spec:
vulnerabilities.append(
{
"type": "unpinned_dependency",
"package": package_name,
"severity": "low",
"description": f"Package {package_name} version is not pinned",
}
)
if vulnerabilities:
recommendations.append("使用 'pip-audit' 检查已知漏洞")
recommendations.append("固定所有依赖包的版本号")
return {"vulnerabilities": vulnerabilities, "recommendations": recommendations}
async def analyze_docker_image(self, dockerfile_content: str) -> Dict[str, Any]:
"""分析Dockerfile的安全性"""
results = {
"scan_type": "docker_security",
"timestamp": datetime.now().isoformat(),
"vulnerabilities": [],
"recommendations": [],
"risk_score": 0,
}
lines = dockerfile_content.strip().split("\n")
for i, line in enumerate(lines, 1):
line = line.strip()
# 检查基础镜像
if line.upper().startswith("FROM"):
base_image = line.split()[1] if len(line.split()) > 1 else ""
if ":latest" in base_image or ":" not in base_image:
results["vulnerabilities"].append(
{
"type": "insecure_base_image",
"line": i,
"severity": "medium",
"description": "使用了 'latest' 标签或未指定版本的基础镜像",
"recommendation": "使用具体的版本标签",
}
)
# 检查基础镜像是否来自可信源
if not any(
trusted in base_image
for trusted in [
"ubuntu",
"alpine",
"debian",
"centos",
"node",
"python",
]
):
results["vulnerabilities"].append(
{
"type": "untrusted_base_image",
"line": i,
"severity": "high",
"description": f"基础镜像 {base_image} 可能来自不可信源",
}
)
# 检查以root用户运行
if line.upper().startswith("USER") and "root" in line:
results["vulnerabilities"].append(
{
"type": "root_user",
"line": i,
"severity": "high",
"description": "容器以root用户运行",
"recommendation": "创建并使用非特权用户",
}
)
# 检查敏感信息泄露
if any(
keyword in line.upper()
for keyword in ["PASSWORD", "SECRET", "KEY", "TOKEN"]
):
results["vulnerabilities"].append(
{
"type": "secrets_exposure",
"line": i,
"severity": "critical",
"description": "可能包含敏感信息",
"recommendation": "使用环境变量或secrets管理",
}
)
# 生成建议
if not any("USER" in line.upper() for line in lines):
results["recommendations"].append("添加非root用户运行容器")
results["recommendations"].extend(
["使用多阶段构建减少攻击面", "定期更新基础镜像", "使用镜像扫描工具检查漏洞"]
)
results["risk_score"] = self._calculate_risk_score(results["vulnerabilities"])
return results
async def scan_kubernetes_config(self, yaml_content: str) -> Dict[str, Any]:
"""扫描Kubernetes配置的安全问题"""
results = {
"scan_type": "kubernetes_security",
"timestamp": datetime.now().isoformat(),
"vulnerabilities": [],
"recommendations": [],
"risk_score": 0,
}
try:
# 解析YAML文档
docs = list(yaml.safe_load_all(yaml_content))
for doc in docs:
if not doc:
continue
kind = doc.get("kind", "")
metadata = doc.get("metadata", {})
spec = doc.get("spec", {})
# 检查Pod安全上下文
if kind in ["Pod", "Deployment", "StatefulSet", "DaemonSet"]:
self._check_pod_security_context(doc, results)
# 检查Service配置
if kind == "Service":
self._check_service_security(doc, results)
# 检查网络策略
if kind == "NetworkPolicy":
self._check_network_policy(doc, results)
# 检查RBAC
if kind in ["Role", "ClusterRole", "RoleBinding", "ClusterRoleBinding"]:
self._check_rbac_security(doc, results)
except yaml.YAMLError as e:
results["vulnerabilities"].append(
{
"type": "yaml_parse_error",
"severity": "high",
"description": f"YAML解析错误: {str(e)}",
}
)
results["risk_score"] = self._calculate_risk_score(results["vulnerabilities"])
return results
def _check_pod_security_context(self, doc: Dict, results: Dict):
"""检查Pod安全上下文"""
spec = doc.get("spec", {})
template_spec = (
spec.get("template", {}).get("spec", {}) if "template" in spec else spec
)
security_context = template_spec.get("securityContext", {})
# 检查是否以root运行
if security_context.get("runAsUser") == 0:
results["vulnerabilities"].append(
{
"type": "pod_runs_as_root",
"severity": "high",
"description": "Pod配置为以root用户运行",
"resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}",
}
)
# 检查特权模式
containers = template_spec.get("containers", [])
for container in containers:
container_security = container.get("securityContext", {})
if container_security.get("privileged"):
results["vulnerabilities"].append(
{
"type": "privileged_container",
"severity": "critical",
"description": f"容器 {container.get('name')} 运行在特权模式",
"resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}",
}
)
def _check_service_security(self, doc: Dict, results: Dict):
"""检查Service安全配置"""
spec = doc.get("spec", {})
# 检查NodePort服务
if spec.get("type") == "NodePort":
results["vulnerabilities"].append(
{
"type": "nodeport_service",
"severity": "medium",
"description": "使用NodePort类型的Service可能暴露不必要的端口",
"resource": f"Service/{doc.get('metadata', {}).get('name')}",
}
)
# 检查LoadBalancer服务
if spec.get("type") == "LoadBalancer":
results["vulnerabilities"].append(
{
"type": "loadbalancer_service",
"severity": "medium",
"description": "LoadBalancer服务直接暴露到互联网",
"resource": f"Service/{doc.get('metadata', {}).get('name')}",
}
)
def _check_network_policy(self, doc: Dict, results: Dict):
"""检查网络策略"""
spec = doc.get("spec", {})
# 检查是否有入站规则
if not spec.get("ingress"):
results["recommendations"].append("考虑添加具体的入站网络规则")
# 检查是否有出站规则
if not spec.get("egress"):
results["recommendations"].append("考虑添加具体的出站网络规则")
def _check_rbac_security(self, doc: Dict, results: Dict):
"""检查RBAC安全配置"""
rules = doc.get("rules", [])
for rule in rules:
# 检查过于宽泛的权限
if "*" in rule.get("verbs", []):
results["vulnerabilities"].append(
{
"type": "overprivileged_rbac",
"severity": "high",
"description": "RBAC规则包含过于宽泛的权限 (*)",
"resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}",
}
)
if "*" in rule.get("resources", []):
results["vulnerabilities"].append(
{
"type": "overprivileged_rbac",
"severity": "high",
"description": "RBAC规则允许访问所有资源 (*)",
"resource": f"{doc.get('kind')}/{doc.get('metadata', {}).get('name')}",
}
)
def _get_known_malicious_npm_packages(self) -> List[str]:
"""获取已知恶意NPM包列表"""
return [
"event-stream",
"flatmap-stream",
"getcookies",
"http-fetch",
"node-sqlite",
"nodesass",
"crossenv",
"cross-env.js",
]
def _get_known_malicious_python_packages(self) -> List[str]:
"""获取已知恶意Python包列表"""
return [
"urllib3",
"requests",
"colorama",
"numpy",
"tensorflow",
"torch",
"pandas",
"matplotlib",
"scipy",
"scikit-learn",
]
def _is_outdated_version(self, package_name: str, version: str) -> bool:
"""检查包版本是否过时(简化实现)"""
# 这里可以集成真实的包版本检查API
# 目前返回False作为占位符
return False
def _calculate_risk_score(self, vulnerabilities: List[Dict]) -> int:
"""计算风险评分"""
score = 0
severity_weights = {"critical": 10, "high": 7, "medium": 4, "low": 1}
for vuln in vulnerabilities:
severity = vuln.get("severity", "low")
score += severity_weights.get(severity, 1)
return min(score, 100) # 最高100分