whois_query.py•6.22 kB
import asyncio
import contextlib
import logging
import time
from typing import Annotated, Any
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession
from pydantic import Field
from whois_mcp.cache import TTLCache
from whois_mcp.config import (
RIPE_WHOIS_PORT,
RIPE_WHOIS_SERVER,
SUPPORT_RIPE,
WHOIS_CONNECT_TIMEOUT_SECONDS,
WHOIS_READ_TIMEOUT_SECONDS,
)
__all__ = ["register"]
# Configure logging
logger = logging.getLogger(__name__)
# Initialize cache with 5-minute TTL for WHOIS results
_whois_cache: TTLCache[str, Any] = TTLCache(max_items=1000, ttl_seconds=300.0)
def _get_whois_config() -> tuple[str, int]:
"""Get the appropriate WHOIS server and port based on RIR support configuration."""
if SUPPORT_RIPE:
return RIPE_WHOIS_SERVER, RIPE_WHOIS_PORT
else:
# When RIPE is disabled, no other RIRs are supported yet
# This will be expanded when other RIRs are added
raise RuntimeError(
"No RIR support enabled. Set SUPPORT_RIPE=true to enable RIPE NCC queries."
)
# Tool metadata constants
TOOL_NAME = "ripe_whois_query"
TOOL_DESCRIPTION = (
"Perform raw WHOIS queries against the RIPE NCC database to get complete object information in RPSL format. "
"This tool is specifically for the RIPE RIR (Europe/Middle East/Central Asia region). "
"Use ONLY when you need full object details or administrative data from RIPE. "
"DO NOT use for contact information - use ripe_contact_card for abuse, NOC, admin, or tech contacts. "
"DO NOT use for route validation - use ripe_validate_route_object for checking if route objects exist. "
"DO NOT use for AS-SET expansion - use ripe_expand_as_set for getting ASN lists. "
"This returns raw RIPE database records with all attributes for detailed analysis."
)
QUERY_DESCRIPTION = (
"The domain name, IP address, ASN, or other identifier to query via RIPE WHOIS. "
"Examples: 'example.com', '192.0.2.1', 'AS64496', 'RIPE-NCC-HM-MNT'. "
"Returns complete object details from the RIPE NCC database."
)
FLAGS_DESCRIPTION = (
"Optional WHOIS flags to modify the query behavior. Common flags: "
"['-B'] for brief output (less verbose), ['-r'] for raw output (no filtering), "
"['-T', 'person'] to limit object types. Use empty list [] or null for default query."
)
async def _whois_request(
query: Annotated[str, Field(description=QUERY_DESCRIPTION)],
flags: Annotated[
list[str] | None,
Field(default=None, description=FLAGS_DESCRIPTION),
] = None,
*,
ctx: Context[ServerSession, None],
) -> dict[str, Any]:
"""Execute a WHOIS request and return the result in a structured format."""
# Check if RIPE support is enabled
if not SUPPORT_RIPE:
error_msg = "WHOIS queries are currently disabled (SUPPORT_RIPE=false)"
logger.warning(error_msg)
await ctx.error(error_msg)
return {
"ok": False,
"error": "service_disabled",
"detail": "RIPE WHOIS support is disabled. Set SUPPORT_RIPE=true to enable.",
}
# Create cache key from query and flags
cache_key = f"{query}|{','.join(flags or [])}"
# Log the incoming request
await ctx.info(
f"Starting WHOIS query for '{query}'"
+ (f" with flags {flags}" if flags else "")
)
# Check cache first
cached_result = _whois_cache.get(cache_key)
if cached_result is not None:
logger.info(f"WHOIS query for '{query}' served from cache")
await ctx.info(f"WHOIS query for '{query}' served from cache")
return cached_result
line = (" ".join(flags or []) + " " + query).strip() + "\r\n"
start = time.perf_counter()
# Get appropriate WHOIS server configuration
whois_server, whois_port = _get_whois_config()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(whois_server, whois_port),
WHOIS_CONNECT_TIMEOUT_SECONDS,
)
writer.write(line.encode("utf-8"))
await writer.drain()
chunks: list[bytes] = []
while True:
chunk = await asyncio.wait_for(
reader.read(65536), WHOIS_READ_TIMEOUT_SECONDS
)
if not chunk:
break
chunks.append(chunk)
writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
rpsl = b"".join(chunks).decode("utf-8", errors="replace")
latency_ms = int((time.perf_counter() - start) * 1000)
logger.info(f"WHOIS query for '{query}' completed in {latency_ms}ms")
# Log successful completion via MCP context
await ctx.info(
f"WHOIS query for '{query}' completed successfully in {latency_ms}ms (server: {whois_server})"
)
result = {
"ok": True,
"data": {"rpsl": rpsl, "server": whois_server, "latency_ms": latency_ms},
}
# Cache the successful result
_whois_cache.set(cache_key, result)
return result
except TimeoutError:
error_msg = f"WHOIS query for '{query}' timed out"
logger.error(error_msg)
await ctx.error(f"{error_msg} (server: {whois_server})")
return {
"ok": False,
"error": "timeout_error",
"detail": "Connection or read timeout",
}
except (ConnectionError, OSError) as e:
error_msg = f"Network error for WHOIS query '{query}': {str(e)}"
logger.error(error_msg)
await ctx.error(f"{error_msg} (server: {whois_server})")
return {
"ok": False,
"error": "network_error",
"detail": f"Network connection failed: {str(e)}",
}
except Exception as e:
error_msg = f"WHOIS query for '{query}' failed: {str(e)}"
logger.error(error_msg)
await ctx.error(f"{error_msg} (server: {whois_server})")
return {"ok": False, "error": "whois_error", "detail": str(e)}
def register(mcp: FastMCP) -> None:
mcp.tool(
name=TOOL_NAME,
description=TOOL_DESCRIPTION,
)(_whois_request)