"""
Vulnerability Search Tool
This module provides functionality to search vulnerability databases using the
National Vulnerability Database (NVD) API with advanced filtering capabilities.
"""
import json
from datetime import datetime, timedelta
from typing import List, Optional
import httpx
import mcp.types as types
async def search_vulnerabilities(
keywords: Optional[str] = None,
severity: Optional[str] = None,
date_range: Optional[str] = None,
) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Search vulnerability databases with advanced filtering.
Args:
keywords: Keywords to search for in CVE descriptions (optional)
severity: Filter by severity level: CRITICAL, HIGH, MEDIUM, LOW, NONE (optional)
date_range: Date range filter in format "YYYY-MM-DD,YYYY-MM-DD" or predefined ranges like "30d", "90d" (optional)
Note: NVD API has a 120-day maximum limit for date ranges
Returns:
List of content containing search results or error messages
"""
# Validate and parse inputs
parsed_params = {}
# Parse date range
if date_range:
try:
if date_range.lower() in ["30d", "90d", "1y", "2y"]:
# Predefined ranges
days_map = {"30d": 30, "90d": 90, "1y": 365, "2y": 730}
days = days_map[date_range.lower()]
# NVD API has a 120-day maximum limit for date ranges
if days > 120:
return [
types.TextContent(
type="text",
text=f"Error: Date range '{date_range}' ({days} days) exceeds the NVD API limit of 120 days. Please use '30d', '90d', or a custom range within 120 days.",
)
]
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
parsed_params["pubStartDate"] = start_date.strftime(
"%Y-%m-%dT00:00:00.000"
)
parsed_params["pubEndDate"] = end_date.strftime("%Y-%m-%dT23:59:59.999")
elif "," in date_range:
# Custom range: YYYY-MM-DD,YYYY-MM-DD
start_str, end_str = date_range.split(",", 1)
start_date = datetime.strptime(start_str.strip(), "%Y-%m-%d")
end_date = datetime.strptime(end_str.strip(), "%Y-%m-%d")
# Check if custom range exceeds 120-day limit
days_diff = (end_date - start_date).days
if days_diff > 120:
return [
types.TextContent(
type="text",
text=f"Error: Custom date range ({days_diff} days) exceeds the NVD API limit of 120 days. Please use a shorter range.",
)
]
parsed_params["pubStartDate"] = start_date.strftime(
"%Y-%m-%dT00:00:00.000"
)
parsed_params["pubEndDate"] = end_date.strftime("%Y-%m-%dT23:59:59.999")
else:
return [
types.TextContent(
type="text",
text="Error: Invalid date_range format. Use 'YYYY-MM-DD,YYYY-MM-DD' or predefined ranges: 30d, 90d, 1y, 2y",
)
]
except ValueError as e:
return [
types.TextContent(
type="text",
text=f"Error: Invalid date format. Use YYYY-MM-DD. Details: {str(e)}",
)
]
# Parse severity filter
if severity:
severity = severity.upper()
valid_severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "NONE"]
if severity not in valid_severities:
return [
types.TextContent(
type="text",
text=f"Error: Invalid severity '{severity}'. Valid options: {', '.join(valid_severities)}",
)
]
# Map severity to CVSS score ranges for NVD API
if severity == "CRITICAL":
parsed_params["cvssV3Severity"] = "CRITICAL"
elif severity == "HIGH":
parsed_params["cvssV3Severity"] = "HIGH"
elif severity == "MEDIUM":
parsed_params["cvssV3Severity"] = "MEDIUM"
elif severity == "LOW":
parsed_params["cvssV3Severity"] = "LOW"
# Add keywords if provided
if keywords:
parsed_params["keywordSearch"] = keywords.strip()
# Set result limit
parsed_params["resultsPerPage"] = "20" # Reasonable limit for display
parsed_params["startIndex"] = "0"
headers = {
"User-Agent": "MCP Vulnerability Search Tool v1.0",
"Accept": "application/json",
}
try:
timeout = httpx.Timeout(30.0, connect=15.0) # Longer timeout for search
async with httpx.AsyncClient(
follow_redirects=True, headers=headers, timeout=timeout
) as client:
# NVD API 2.0 endpoint for CVE search
url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
response = await client.get(url, params=parsed_params)
response.raise_for_status()
data = response.json()
total_results = data.get("totalResults", 0)
vulnerabilities = data.get("vulnerabilities", [])
results_per_page = data.get("resultsPerPage", 20)
if total_results == 0:
search_desc = []
if keywords:
search_desc.append(f"keywords: '{keywords}'")
if severity:
search_desc.append(f"severity: {severity}")
if date_range:
search_desc.append(f"date range: {date_range}")
search_text = " with " + ", ".join(search_desc) if search_desc else ""
return [
types.TextContent(
type="text",
text=f"No vulnerabilities found{search_text}.",
)
]
# Format the response
result = "๐ **Vulnerability Search Results**\n\n"
result += "๐ **Search Summary:**\n"
result += f" โข Total Results: {total_results:,}\n"
result += (
f" โข Showing: {min(results_per_page, len(vulnerabilities))} results\n"
)
if keywords:
result += f" โข Keywords: '{keywords}'\n"
if severity:
result += f" โข Severity Filter: {severity}\n"
if date_range:
result += f" โข Date Range: {date_range}\n"
result += "\n๐ **Vulnerabilities Found:**\n\n"
for i, vuln_data in enumerate(vulnerabilities[:20], 1): # Limit display
cve = vuln_data.get("cve", {})
cve_id = cve.get("id", "Unknown")
published = cve.get("published", "Unknown")
cve.get("lastModified", "Unknown")
# Get description
descriptions = cve.get("descriptions", [])
description = "No description available"
for desc in descriptions:
if desc.get("lang") == "en":
description = desc.get("value", "")[
:200
] # Truncate long descriptions
if len(desc.get("value", "")) > 200:
description += "..."
break
# Get CVSS scores
metrics = cve.get("metrics", {})
cvss_info = "Not available"
severity_emoji = "โช"
# Try CVSS 3.1 first, then 3.0, then 2.0
for version in [
"cvssMetricV31",
"cvssMetricV30",
"cvssMetricV2",
]:
if version in metrics and metrics[version]:
metric = metrics[version][0] # Take first metric
cvss_data = metric.get("cvssData", {})
score = cvss_data.get("baseScore", "N/A")
sev = cvss_data.get("baseSeverity", "N/A")
if score != "N/A":
cvss_info = f"{score} ({sev})"
# Set emoji based on severity
if sev == "CRITICAL":
severity_emoji = "๐ด"
elif sev == "HIGH":
severity_emoji = "๐ "
elif sev == "MEDIUM":
severity_emoji = "๐ก"
elif sev == "LOW":
severity_emoji = "๐ข"
break
# Format published date
try:
pub_date = datetime.fromisoformat(published.replace("Z", "+00:00"))
pub_formatted = pub_date.strftime("%Y-%m-%d")
except (ValueError, TypeError, AttributeError):
pub_formatted = (
published[:10] if len(published) >= 10 else published
)
result += f"**{i}. {severity_emoji} {cve_id}**\n"
result += f" ๐
Published: {pub_formatted}\n"
result += f" โ ๏ธ CVSS: {cvss_info}\n"
result += f" ๐ Description: {description}\n"
result += f" ๐ Details: https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}\n\n"
if total_results > results_per_page:
result += f"๐ **Note:** Showing first {results_per_page} of {total_results:,} total results.\n"
result += "๐ก **Tip:** Use more specific keywords or severity filters to narrow results.\n\n"
result += "๐ **Data Source:** National Vulnerability Database (NVD)\n"
result += f"๐ **Search performed:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return [types.TextContent(type="text", text=result)]
except httpx.TimeoutException:
return [
types.TextContent(
type="text",
text="Error: Request timed out while searching vulnerability database. Try using more specific search criteria.",
)
]
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
return [
types.TextContent(
type="text",
text="Error: Access forbidden. The NVD API may be rate limiting requests. Please try again later.",
)
]
elif e.response.status_code == 503:
return [
types.TextContent(
type="text",
text="Error: NVD service temporarily unavailable. Please try again later.",
)
]
return [
types.TextContent(
type="text",
text=f"Error: HTTP {e.response.status_code} error while searching vulnerabilities.",
)
]
except json.JSONDecodeError:
return [
types.TextContent(
type="text", text="Error: Invalid JSON response from NVD API."
)
]
except Exception as e:
return [
types.TextContent(
type="text",
text=f"Error: Failed to search vulnerabilities: {str(e)}",
)
]