Skip to main content
Glama
ai_analysis.py•9.78 kB
#!/usr/bin/env python3 """ ================================================================================ AI Analysis Module for SAST MCP Server ================================================================================ This module provides AI-powered analysis capabilities for SAST scan results. It processes TOON-formatted scan data and can interact with LLM services for intelligent summarization, risk assessment, and remediation guidance. FEATURES: - LLM-based scan result analysis (stub for future implementation) - Intelligent summarization of security findings - Risk prioritization and assessment - Remediation recommendations - False positive detection - Interactive decision support FUTURE DEVELOPMENT: - API key integration for LLM services (OpenAI, Anthropic, etc.) - Custom prompt templates for different analysis types - Multi-model support and fallback strategies - Cost tracking and optimization - Caching and incremental analysis USAGE: from ai_analysis import analyze_scan_with_ai, is_ai_configured if is_ai_configured(): summary = analyze_scan_with_ai(ai_payload, api_key="your-key") AUTHOR: MCP-SAST-Server Contributors LICENSE: MIT ================================================================================ """ import json import logging import os from typing import Dict, Any, Optional, List from datetime import datetime # Setup logger logger = logging.getLogger(__name__) # Configuration for AI services (to be expanded) AI_SERVICE_ENABLED = os.environ.get("AI_SERVICE_ENABLED", "false").lower() == "true" AI_API_KEY = os.environ.get("AI_API_KEY", "") AI_SERVICE_URL = os.environ.get("AI_SERVICE_URL", "") AI_MODEL = os.environ.get("AI_MODEL", "claude-3-5-sonnet-20241022") AI_MAX_TOKENS = int(os.environ.get("AI_MAX_TOKENS", 4096)) def is_ai_configured() -> bool: """ Check if AI analysis is configured and available. Returns: bool: True if AI service is configured with API key """ return AI_SERVICE_ENABLED and bool(AI_API_KEY) def analyze_scan_with_ai( ai_payload: Dict[str, Any], api_key: Optional[str] = None, custom_prompt: Optional[str] = None ) -> Dict[str, Any]: """ Analyze scan results using AI/LLM service. This is a stub function that will be implemented in the future to provide intelligent analysis of SAST scan results using LLM services. Args: ai_payload: AI-ready payload with TOON formatted data api_key: Optional API key (uses env var if not provided) custom_prompt: Optional custom analysis prompt Returns: Dictionary containing AI analysis results Future Implementation: - Send TOON data to LLM service - Process and structure the response - Extract key insights and recommendations - Return structured analysis """ logger.info("AI analysis requested (feature stub)") if not is_ai_configured() and not api_key: logger.warning("AI service not configured. Set AI_SERVICE_ENABLED=true and AI_API_KEY in environment.") return { "success": False, "error": "AI service not configured", "message": "To enable AI analysis, set AI_SERVICE_ENABLED=true and provide AI_API_KEY", "stub": True } # Extract metadata metadata = ai_payload.get("metadata", {}) tool_name = metadata.get("tool_name", "unknown") job_id = metadata.get("job_id", "unknown") logger.info(f"AI analysis stub called for job {job_id}, tool: {tool_name}") # Return stub response return { "success": True, "job_id": job_id, "tool_name": tool_name, "analyzed_at": datetime.now().isoformat(), "stub": True, "message": "AI analysis feature is not yet implemented", "future_capabilities": { "summary": "High-level overview of security findings", "critical_findings": "List of most critical vulnerabilities", "risk_assessment": { "overall_risk": "HIGH/MEDIUM/LOW", "risk_factors": ["factor1", "factor2"], "risk_score": 0.0 }, "remediation_priorities": [ { "priority": 1, "issue": "Description", "remediation": "How to fix", "effort": "LOW/MEDIUM/HIGH" } ], "false_positive_analysis": { "potential_false_positives": [], "confidence_scores": {} }, "recommendations": [ "Recommendation 1", "Recommendation 2" ] }, "next_steps": [ "Configure AI_API_KEY in environment variables", "Set AI_SERVICE_ENABLED=true", "Optionally configure AI_SERVICE_URL for custom endpoints", "Implement actual LLM integration in this module" ] } def summarize_findings(scan_results: Dict[str, Any]) -> Dict[str, Any]: """ Generate a basic summary of scan findings (non-AI version). This provides a simple statistical summary without AI analysis. Args: scan_results: Complete scan results dictionary Returns: Dictionary with summary statistics """ try: tool_name = scan_results.get("tool_name", "unknown") scan_result = scan_results.get("scan_result", {}) summary = { "tool": tool_name, "summary_type": "statistical", "timestamp": datetime.now().isoformat() } # Tool-specific summaries if tool_name == "semgrep": results = scan_result.get("parsed_output", {}).get("results", []) errors = scan_result.get("parsed_output", {}).get("errors", []) # Count by severity severity_counts = {} for result in results: severity = result.get("extra", {}).get("severity", "UNKNOWN") severity_counts[severity] = severity_counts.get(severity, 0) + 1 summary["total_findings"] = len(results) summary["by_severity"] = severity_counts summary["errors"] = len(errors) elif tool_name == "bandit": results = scan_result.get("parsed_output", {}).get("results", []) summary["total_findings"] = len(results) elif tool_name == "trufflehog": secrets = scan_result.get("parsed_secrets", []) verified = sum(1 for s in secrets if s.get("Verified", False)) summary["total_secrets"] = len(secrets) summary["verified_secrets"] = verified return summary except Exception as e: logger.error(f"Error generating summary: {str(e)}") return {"error": str(e)} def generate_remediation_guidance( finding: Dict[str, Any], tool_name: str ) -> Dict[str, Any]: """ Generate basic remediation guidance for a specific finding. Future: This will be enhanced with AI-powered contextual recommendations. Args: finding: Individual security finding tool_name: Name of the scanning tool Returns: Dictionary with remediation guidance """ return { "stub": True, "message": "AI-powered remediation guidance coming soon", "basic_guidance": { "review": "Manually review the finding in context", "research": "Look up CVE or CWE references for detailed information", "test": "Verify the finding is not a false positive", "fix": "Apply security best practices for the identified issue", "validate": "Re-scan after applying fixes" }, "future_features": { "contextual_analysis": "AI will analyze the specific code context", "similar_cases": "Reference similar vulnerabilities and their fixes", "code_snippets": "Suggest specific code changes", "testing_guidance": "Provide test cases to verify the fix" } } def prioritize_findings( findings: List[Dict[str, Any]], context: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Prioritize security findings based on severity, impact, and context. Future: AI will provide intelligent prioritization based on: - Business context - Exploitability - Asset criticality - Historical data Args: findings: List of security findings context: Optional context about the application/environment Returns: Prioritized list of findings """ logger.info(f"Prioritizing {len(findings)} findings (stub implementation)") # Simple priority sorting by severity (placeholder) severity_order = {"CRITICAL": 0, "HIGH": 1, "ERROR": 1, "MEDIUM": 2, "WARNING": 2, "LOW": 3, "INFO": 4} def get_priority(finding: Dict[str, Any]) -> int: severity = finding.get("extra", {}).get("severity", "LOW") return severity_order.get(severity.upper(), 5) sorted_findings = sorted(findings, key=get_priority) return [ { **finding, "priority_rank": idx + 1, "ai_enhanced": False, "note": "Future: AI will provide intelligent prioritization" } for idx, finding in enumerate(sorted_findings) ] # Configuration check on module load if __name__ == "__main__": if is_ai_configured(): logger.info("AI analysis is configured and ready") logger.info(f"AI Model: {AI_MODEL}") else: logger.info("AI analysis is not configured") logger.info("Set AI_SERVICE_ENABLED=true and AI_API_KEY to enable")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Sengtocxoen/sast-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server