"""
Exploit Availability Checker Tool
This module provides functionality to check for the availability of public exploits
for a given CVE across multiple databases and sources.
"""
import re
from datetime import datetime
from typing import List
import httpx
import mcp.types as types
async def get_exploit_availability(
cve_id: str,
) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Check for the availability of public exploits for a CVE.
Searches multiple databases and sources including ExploitDB, Metasploit, and others.
Args:
cve_id: CVE identifier in format CVE-YYYY-NNNN
Returns:
List of content containing exploit availability information or error messages
"""
# Clean up CVE ID format
cve_id = cve_id.upper().strip()
if not cve_id.startswith("CVE-"):
cve_id = f"CVE-{cve_id}"
# Validate CVE ID format (CVE-YYYY-NNNN)
if not re.match(r"^CVE-\d{4}-\d{4,}$", cve_id):
return [
types.TextContent(
type="text",
text=f"Error: Invalid CVE ID format. Expected format: CVE-YYYY-NNNN (e.g., CVE-2021-44228). Got: {cve_id}",
)
]
headers = {
"User-Agent": "MCP Exploit Availability Checker v1.0",
"Accept": "application/json",
}
exploit_sources = {}
try:
timeout = httpx.Timeout(15.0, connect=10.0)
async with httpx.AsyncClient(
follow_redirects=True, headers=headers, timeout=timeout
) as client:
# Check 1: NIST NVD for CVE references that might indicate exploits
nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
try:
nvd_response = await client.get(nvd_url)
if nvd_response.status_code == 200:
nvd_data = nvd_response.json()
if nvd_data.get("totalResults", 0) > 0:
cve_item = nvd_data["vulnerabilities"][0]["cve"]
# Check references for exploit indicators
exploit_indicators = []
references = cve_item.get("references", [])
for ref in references:
url = ref.get("url", "").lower()
tags = [tag.lower() for tag in ref.get("tags", [])]
# Look for exploit-related keywords
if (
"exploit" in tags
or "exploit" in url
or "poc" in url
or "proof" in url
or "github.com" in url
or "packetstorm" in url
or "metasploit" in url
or "rapid7" in url
):
exploit_indicators.append(
{
"url": ref.get("url", ""),
"source": ref.get("source", ""),
"tags": ref.get("tags", []),
}
)
if exploit_indicators:
exploit_sources["NVD_References"] = {
"status": "POTENTIAL_EXPLOITS_FOUND",
"count": len(exploit_indicators),
"details": exploit_indicators[:5], # Limit to first 5
}
else:
exploit_sources["NVD_References"] = {
"status": "NO_EXPLOIT_INDICATORS",
"count": 0,
"details": [],
}
except Exception as e:
exploit_sources["NVD_References"] = {
"status": "ERROR",
"error": str(e),
}
# Check 2: Search CVE Mitre for additional exploit references
try:
mitre_url = f"https://cve.mitre.org/cgi-bin/cvekey.cgi?keyword={cve_id}"
# Note: This is a basic check since MITRE doesn't have a simple API
mitre_response = await client.get(mitre_url)
if mitre_response.status_code == 200:
content = mitre_response.text.lower()
exploit_keywords = [
"exploit",
"proof of concept",
"poc",
"metasploit",
"weaponized",
]
found_keywords = [kw for kw in exploit_keywords if kw in content]
if found_keywords:
exploit_sources["MITRE_Page"] = {
"status": "EXPLOIT_KEYWORDS_FOUND",
"keywords": found_keywords,
}
else:
exploit_sources["MITRE_Page"] = {
"status": "NO_EXPLOIT_KEYWORDS",
"keywords": [],
}
except Exception as e:
exploit_sources["MITRE_Page"] = {
"status": "ERROR",
"error": str(e),
}
# Check 3: Search GitHub for potential PoCs (indirect check)
try:
# Note: This would require GitHub API for full search
# For now, we'll provide guidance on manual checking
exploit_sources["GitHub_Search"] = {
"status": "MANUAL_CHECK_RECOMMENDED",
"search_url": f"https://github.com/search?q={cve_id}+exploit&type=repositories",
"search_url_poc": f"https://github.com/search?q={cve_id}+poc&type=repositories",
}
except Exception as e:
exploit_sources["GitHub_Search"] = {
"status": "ERROR",
"error": str(e),
}
# Check 4: ExploitDB search guidance (since they don't have a public API)
exploit_sources["ExploitDB"] = {
"status": "MANUAL_CHECK_RECOMMENDED",
"search_url": f"https://www.exploit-db.com/search?cve={cve_id}",
"description": "Check ExploitDB manually for verified exploits",
}
# Check 5: Metasploit modules guidance
exploit_sources["Metasploit"] = {
"status": "MANUAL_CHECK_RECOMMENDED",
"search_guidance": f"Search for '{cve_id}' in Metasploit Framework",
"command": f"msfconsole -q -x 'search cve:{cve_id}; exit'",
}
except Exception as e:
return [
types.TextContent(
type="text",
text=f"Error: Failed to check exploit availability: {str(e)}",
)
]
# Analyze results and determine overall risk
exploit_found = False
potential_exploits = False
for source, data in exploit_sources.items():
if isinstance(data, dict):
status = data.get("status", "")
if "FOUND" in status or "POTENTIAL" in status:
if "POTENTIAL" in status:
potential_exploits = True
else:
exploit_found = True
# Format the response
result = f"๐ก๏ธ **Exploit Availability Report: {cve_id}**\n\n"
# Overall assessment
if exploit_found:
risk_level = "๐ด HIGH RISK"
risk_desc = "Public exploits appear to be available"
elif potential_exploits:
risk_level = "๐ MEDIUM RISK"
risk_desc = "Potential exploit indicators found"
else:
risk_level = "๐ข LOW RISK"
risk_desc = "No obvious public exploits found"
result += f"โ ๏ธ **Risk Assessment:** {risk_level}\n"
result += f"๐ **Assessment:** {risk_desc}\n\n"
result += "๐ **Detailed Source Analysis:**\n\n"
# NVD References Analysis
nvd_data = exploit_sources.get("NVD_References", {})
if nvd_data.get("status") == "POTENTIAL_EXPLOITS_FOUND":
result += f"**๐ NVD References:** โ
Found {nvd_data.get('count', 0)} potential exploit references\n"
for detail in nvd_data.get("details", [])[:3]: # Show first 3
result += f" โข {detail.get('url', '')}\n"
if detail.get("tags"):
result += f" Tags: {', '.join(detail.get('tags', []))}\n"
elif nvd_data.get("status") == "NO_EXPLOIT_INDICATORS":
result += "**๐ NVD References:** โช No exploit indicators in references\n"
elif nvd_data.get("status") == "ERROR":
result += f"**๐ NVD References:** โ Error checking ({nvd_data.get('error', 'Unknown')})\n"
# MITRE Analysis
mitre_data = exploit_sources.get("MITRE_Page", {})
if mitre_data.get("status") == "EXPLOIT_KEYWORDS_FOUND":
result += f"**๐๏ธ MITRE Page:** โ ๏ธ Exploit-related keywords found: {', '.join(mitre_data.get('keywords', []))}\n"
elif mitre_data.get("status") == "NO_EXPLOIT_KEYWORDS":
result += "**๐๏ธ MITRE Page:** โช No exploit keywords detected\n"
result += "\n๐ **Manual Verification Recommended:**\n\n"
# GitHub Search
github_data = exploit_sources.get("GitHub_Search", {})
result += "**๐ GitHub Search:**\n"
result += f" โข Repository search: {github_data.get('search_url', '')}\n"
result += f" โข PoC search: {github_data.get('search_url_poc', '')}\n\n"
# ExploitDB
exploitdb_data = exploit_sources.get("ExploitDB", {})
result += "**๐ฅ ExploitDB:**\n"
result += f" โข Search URL: {exploitdb_data.get('search_url', '')}\n"
result += f" โข Description: {exploitdb_data.get('description', '')}\n\n"
# Metasploit
metasploit_data = exploit_sources.get("Metasploit", {})
result += "**๐ฏ Metasploit Framework:**\n"
result += f" โข Command: `{metasploit_data.get('command', '')}`\n"
result += f" โข Guidance: {metasploit_data.get('search_guidance', '')}\n\n"
result += "๐ก **Additional Resources:**\n"
result += f" โข VulnCheck: https://vulncheck.com/search?q={cve_id}\n"
result += f" โข PacketStorm: https://packetstormsecurity.com/search/?q={cve_id}\n"
result += " โข SecLists: Check security mailing lists and advisories\n\n"
result += "๐จ **Security Recommendations:**\n"
if risk_level.startswith("๐ด"):
result += " โข ๐จ **URGENT:** Treat as actively exploited\n"
result += " โข ๐ก๏ธ Implement immediate mitigations\n"
result += " โข ๐ Enhanced monitoring for attack attempts\n"
result += " โข โก Emergency patching procedures\n"
elif risk_level.startswith("๐ "):
result += " โข โ ๏ธ **HIGH PRIORITY:** Assume exploitable\n"
result += " โข ๐ Accelerated patching timeline\n"
result += " โข ๐ Monitor for new exploit releases\n"
result += " โข ๐ Additional security controls\n"
else:
result += " โข ๐
**STANDARD:** Follow normal patching schedule\n"
result += " โข ๐ Continue monitoring threat landscape\n"
result += " โข ๐ Periodic re-evaluation recommended\n"
result += (
f"\n๐ **Report Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
return [types.TextContent(type="text", text=result)]