"""
Python Package Vulnerability Check Tool
This module provides functionality to check for known vulnerabilities in Python packages
using the OSV (Open Source Vulnerabilities) database.
"""
import json
from typing import Any, Dict, List, Optional
import httpx
import mcp.types as types
async def get_package_info(package_name: str) -> Optional[Dict[str, Any]]:
"""
Get package information from PyPI to find the latest version.
Args:
package_name: Name of the Python package
Returns:
Dictionary containing package info or None if not found
"""
try:
timeout = httpx.Timeout(10.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(f"https://pypi.org/pypi/{package_name}/json")
response.raise_for_status()
return response.json()
except Exception:
return None
async def query_osv_vulnerabilities(
package_name: str, version: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Query OSV database for vulnerabilities in a Python package.
Args:
package_name: Name of the Python package
version: Specific version to check (optional)
Returns:
List of vulnerability records
"""
osv_query = {"package": {"name": package_name, "ecosystem": "PyPI"}}
if version:
osv_query["version"] = version
headers = {
"User-Agent": "MCP Package Vulnerability Checker v1.0",
"Content-Type": "application/json",
}
try:
timeout = httpx.Timeout(15.0, connect=10.0)
async with httpx.AsyncClient(headers=headers, timeout=timeout) as client:
response = await client.post("https://api.osv.dev/v1/query", json=osv_query)
response.raise_for_status()
data = response.json()
return data.get("vulns", [])
except Exception:
# Return empty list but don't print error - let caller handle it
return []
def format_vulnerability_report(
vulns: List[Dict[str, Any]],
package_name: str,
package_info: Optional[Dict[str, Any]] = None,
) -> str:
"""
Format vulnerability information into a readable report.
Args:
vulns: List of vulnerability records from OSV
package_name: Name of the package
package_info: Package metadata from PyPI
Returns:
Formatted vulnerability report string
"""
if not vulns:
latest_version = "Unknown"
if package_info:
latest_version = package_info["info"].get("version", "Unknown")
return (
f"๐ **Python Package Security Report: {package_name}**\n\n"
f"โ
**Good News!** No known vulnerabilities found for package '{package_name}'\n\n"
f"๐ฆ **Latest Version:** {latest_version}\n"
f"๐ **Data Source:** OSV (Open Source Vulnerabilities Database)\n"
f"๐ **Package URL:** https://pypi.org/project/{package_name}/"
)
# Header
result = f"๐จ **Python Package Security Report: {package_name}**\n\n"
result += f"โ ๏ธ **Found {len(vulns)} known vulnerabilities**\n\n"
# Package info
if package_info:
info = package_info["info"]
result += "๐ฆ **Package Information:**\n"
result += f" โข Latest Version: {info.get('version', 'Unknown')}\n"
result += f" โข Summary: {info.get('summary', 'No description available')[:100]}...\n"
result += f" โข Author: {info.get('author', 'Unknown')}\n"
result += f" โข PyPI: https://pypi.org/project/{package_name}/\n\n"
# Vulnerabilities
for i, vuln in enumerate(vulns, 1):
result += f"๐ **Vulnerability #{i}: {vuln.get('id', 'Unknown ID')}**\n"
# Summary
summary = vuln.get("summary", "No summary available")
result += f" ๐ **Summary:** {summary}\n"
# Severity
severity_info = vuln.get("database_specific", {}).get("severity")
if severity_info:
if isinstance(severity_info, list) and severity_info:
sev = severity_info[0]
if isinstance(sev, dict):
result += f" ๐ฅ **Severity:** {sev.get('score', 'Unknown')} ({sev.get('type', 'Unknown')})\n"
# Published date
published = vuln.get("published", "Unknown")
result += f" ๐
**Published:** {published}\n"
# Affected versions
affected = vuln.get("affected", [])
if affected:
for pkg in affected:
if pkg.get("package", {}).get("name") == package_name:
ranges = pkg.get("ranges", [])
if ranges:
result += " ๐ **Affected Versions:**\n"
for r in ranges:
events = r.get("events", [])
for event in events:
if "introduced" in event:
result += (
f" โข Introduced: {event['introduced']}\n"
)
if "fixed" in event:
result += f" โข Fixed: {event['fixed']}\n"
# References
references = vuln.get("references", [])
if references:
result += " ๐ **References:**\n"
for ref in references[:3]: # Limit to first 3 references
url = ref.get("url", "")
ref_type = ref.get("type", "ADVISORY")
result += f" โข [{ref_type}] {url}\n"
# Aliases (CVE, GHSA, etc.)
aliases = vuln.get("aliases", [])
if aliases:
result += f" ๐ท๏ธ **Aliases:** {', '.join(aliases)}\n"
result += "\n"
result += "๐ **Data Source:** OSV (Open Source Vulnerabilities Database)\n"
result += f"๐ **OSV URL:** https://osv.dev/list?q={package_name}&ecosystem=PyPI\n"
result += "๐ **Recommendation:** Review affected versions and update to a secure version if available."
return result
async def check_package_vulnerabilities(
package_name: str, version: Optional[str] = None
) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Check for known vulnerabilities in a Python package.
Args:
package_name: Name of the Python package to check
version: Specific version to check (optional, checks all versions if not provided)
Returns:
List of content containing vulnerability report or error messages
"""
# Clean up package name
package_name = package_name.lower().strip().replace("_", "-")
if not package_name:
return [
types.TextContent(type="text", text="Error: Package name cannot be empty.")
]
try:
# Get package info from PyPI
package_info = await get_package_info(package_name)
if not package_info:
return [
types.TextContent(
type="text",
text=f"Error: Package '{package_name}' not found on PyPI. Please check the package name.",
)
]
# Query OSV for vulnerabilities
vulns = await query_osv_vulnerabilities(package_name, version)
# Format the report
report = format_vulnerability_report(vulns, package_name, package_info)
return [types.TextContent(type="text", text=report)]
except httpx.TimeoutException:
return [
types.TextContent(
type="text",
text="Error: Request timed out while checking package vulnerabilities.",
)
]
except httpx.HTTPStatusError as e:
return [
types.TextContent(
type="text",
text=f"Error: HTTP {e.response.status_code} error while fetching vulnerability data.",
)
]
except json.JSONDecodeError:
return [
types.TextContent(
type="text",
text="Error: Invalid JSON response from vulnerability database.",
)
]
except Exception as e:
return [
types.TextContent(
type="text",
text=f"Error: Failed to check package vulnerabilities: {str(e)}",
)
]