#!/usr/bin/env python3
"""
================================================================================
AI Analysis Module for SAST MCP Server
================================================================================
This module provides AI-powered analysis capabilities for SAST scan results.
It processes TOON-formatted scan data and can interact with LLM services
for intelligent summarization, risk assessment, and remediation guidance.
FEATURES:
- LLM-based scan result analysis (stub for future implementation)
- Intelligent summarization of security findings
- Risk prioritization and assessment
- Remediation recommendations
- False positive detection
- Interactive decision support
FUTURE DEVELOPMENT:
- API key integration for LLM services (OpenAI, Anthropic, etc.)
- Custom prompt templates for different analysis types
- Multi-model support and fallback strategies
- Cost tracking and optimization
- Caching and incremental analysis
USAGE:
from ai_analysis import analyze_scan_with_ai, is_ai_configured
if is_ai_configured():
summary = analyze_scan_with_ai(ai_payload, api_key="your-key")
AUTHOR: MCP-SAST-Server Contributors
LICENSE: MIT
================================================================================
"""
import json
import logging
import os
from typing import Dict, Any, Optional, List
from datetime import datetime
# Setup logger
logger = logging.getLogger(__name__)
# Configuration for AI services (to be expanded)
AI_SERVICE_ENABLED = os.environ.get("AI_SERVICE_ENABLED", "false").lower() == "true"
AI_API_KEY = os.environ.get("AI_API_KEY", "")
AI_SERVICE_URL = os.environ.get("AI_SERVICE_URL", "")
AI_MODEL = os.environ.get("AI_MODEL", "claude-3-5-sonnet-20241022")
AI_MAX_TOKENS = int(os.environ.get("AI_MAX_TOKENS", 4096))
def is_ai_configured() -> bool:
"""
Check if AI analysis is configured and available.
Returns:
bool: True if AI service is configured with API key
"""
return AI_SERVICE_ENABLED and bool(AI_API_KEY)
def analyze_scan_with_ai(
ai_payload: Dict[str, Any],
api_key: Optional[str] = None,
custom_prompt: Optional[str] = None
) -> Dict[str, Any]:
"""
Analyze scan results using AI/LLM service.
This is a stub function that will be implemented in the future to provide
intelligent analysis of SAST scan results using LLM services.
Args:
ai_payload: AI-ready payload with TOON formatted data
api_key: Optional API key (uses env var if not provided)
custom_prompt: Optional custom analysis prompt
Returns:
Dictionary containing AI analysis results
Future Implementation:
- Send TOON data to LLM service
- Process and structure the response
- Extract key insights and recommendations
- Return structured analysis
"""
logger.info("AI analysis requested (feature stub)")
if not is_ai_configured() and not api_key:
logger.warning("AI service not configured. Set AI_SERVICE_ENABLED=true and AI_API_KEY in environment.")
return {
"success": False,
"error": "AI service not configured",
"message": "To enable AI analysis, set AI_SERVICE_ENABLED=true and provide AI_API_KEY",
"stub": True
}
# Extract metadata
metadata = ai_payload.get("metadata", {})
tool_name = metadata.get("tool_name", "unknown")
job_id = metadata.get("job_id", "unknown")
logger.info(f"AI analysis stub called for job {job_id}, tool: {tool_name}")
# Return stub response
return {
"success": True,
"job_id": job_id,
"tool_name": tool_name,
"analyzed_at": datetime.now().isoformat(),
"stub": True,
"message": "AI analysis feature is not yet implemented",
"future_capabilities": {
"summary": "High-level overview of security findings",
"critical_findings": "List of most critical vulnerabilities",
"risk_assessment": {
"overall_risk": "HIGH/MEDIUM/LOW",
"risk_factors": ["factor1", "factor2"],
"risk_score": 0.0
},
"remediation_priorities": [
{
"priority": 1,
"issue": "Description",
"remediation": "How to fix",
"effort": "LOW/MEDIUM/HIGH"
}
],
"false_positive_analysis": {
"potential_false_positives": [],
"confidence_scores": {}
},
"recommendations": [
"Recommendation 1",
"Recommendation 2"
]
},
"next_steps": [
"Configure AI_API_KEY in environment variables",
"Set AI_SERVICE_ENABLED=true",
"Optionally configure AI_SERVICE_URL for custom endpoints",
"Implement actual LLM integration in this module"
]
}
def summarize_findings(scan_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate a basic summary of scan findings (non-AI version).
This provides a simple statistical summary without AI analysis.
Args:
scan_results: Complete scan results dictionary
Returns:
Dictionary with summary statistics
"""
try:
tool_name = scan_results.get("tool_name", "unknown")
scan_result = scan_results.get("scan_result", {})
summary = {
"tool": tool_name,
"summary_type": "statistical",
"timestamp": datetime.now().isoformat()
}
# Tool-specific summaries
if tool_name == "semgrep":
results = scan_result.get("parsed_output", {}).get("results", [])
errors = scan_result.get("parsed_output", {}).get("errors", [])
# Count by severity
severity_counts = {}
for result in results:
severity = result.get("extra", {}).get("severity", "UNKNOWN")
severity_counts[severity] = severity_counts.get(severity, 0) + 1
summary["total_findings"] = len(results)
summary["by_severity"] = severity_counts
summary["errors"] = len(errors)
elif tool_name == "bandit":
results = scan_result.get("parsed_output", {}).get("results", [])
summary["total_findings"] = len(results)
elif tool_name == "trufflehog":
secrets = scan_result.get("parsed_secrets", [])
verified = sum(1 for s in secrets if s.get("Verified", False))
summary["total_secrets"] = len(secrets)
summary["verified_secrets"] = verified
return summary
except Exception as e:
logger.error(f"Error generating summary: {str(e)}")
return {"error": str(e)}
def generate_remediation_guidance(
finding: Dict[str, Any],
tool_name: str
) -> Dict[str, Any]:
"""
Generate basic remediation guidance for a specific finding.
Future: This will be enhanced with AI-powered contextual recommendations.
Args:
finding: Individual security finding
tool_name: Name of the scanning tool
Returns:
Dictionary with remediation guidance
"""
return {
"stub": True,
"message": "AI-powered remediation guidance coming soon",
"basic_guidance": {
"review": "Manually review the finding in context",
"research": "Look up CVE or CWE references for detailed information",
"test": "Verify the finding is not a false positive",
"fix": "Apply security best practices for the identified issue",
"validate": "Re-scan after applying fixes"
},
"future_features": {
"contextual_analysis": "AI will analyze the specific code context",
"similar_cases": "Reference similar vulnerabilities and their fixes",
"code_snippets": "Suggest specific code changes",
"testing_guidance": "Provide test cases to verify the fix"
}
}
def prioritize_findings(
findings: List[Dict[str, Any]],
context: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Prioritize security findings based on severity, impact, and context.
Future: AI will provide intelligent prioritization based on:
- Business context
- Exploitability
- Asset criticality
- Historical data
Args:
findings: List of security findings
context: Optional context about the application/environment
Returns:
Prioritized list of findings
"""
logger.info(f"Prioritizing {len(findings)} findings (stub implementation)")
# Simple priority sorting by severity (placeholder)
severity_order = {"CRITICAL": 0, "HIGH": 1, "ERROR": 1, "MEDIUM": 2, "WARNING": 2, "LOW": 3, "INFO": 4}
def get_priority(finding: Dict[str, Any]) -> int:
severity = finding.get("extra", {}).get("severity", "LOW")
return severity_order.get(severity.upper(), 5)
sorted_findings = sorted(findings, key=get_priority)
return [
{
**finding,
"priority_rank": idx + 1,
"ai_enhanced": False,
"note": "Future: AI will provide intelligent prioritization"
}
for idx, finding in enumerate(sorted_findings)
]
# Configuration check on module load
if __name__ == "__main__":
if is_ai_configured():
logger.info("AI analysis is configured and ready")
logger.info(f"AI Model: {AI_MODEL}")
else:
logger.info("AI analysis is not configured")
logger.info("Set AI_SERVICE_ENABLED=true and AI_API_KEY to enable")