whois_query.py•6.49 kB
import asyncio
import contextlib
import logging
import time
from typing import Annotated, Any
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession
from pydantic import Field
from whois_mcp.cache import TTLCache
from whois_mcp.config import (
ARIN_WHOIS_PORT,
ARIN_WHOIS_SERVER,
SUPPORT_ARIN,
WHOIS_CONNECT_TIMEOUT_SECONDS,
WHOIS_READ_TIMEOUT_SECONDS,
)
__all__ = ["register"]
# Configure logging
logger = logging.getLogger(__name__)
# Initialize cache with 5-minute TTL for WHOIS results
_whois_cache: TTLCache[str, Any] = TTLCache(max_items=1000, ttl_seconds=300.0)
def _get_whois_config() -> tuple[str, int]:
"""Get the appropriate WHOIS server and port based on RIR support configuration."""
if SUPPORT_ARIN:
return ARIN_WHOIS_SERVER, ARIN_WHOIS_PORT
else:
# When ARIN is disabled, no other RIRs are supported yet
# This will be expanded when other RIRs are added
raise RuntimeError(
"No RIR support enabled. Set SUPPORT_ARIN=true to enable ARIN queries."
)
# Tool metadata constants
TOOL_NAME = "arin_whois_query"
TOOL_DESCRIPTION = (
"Perform raw WHOIS queries against the ARIN database to get complete object information in RPSL format. "
"This tool is specifically for the ARIN RIR (North America region - United States, Canada, parts of Caribbean). "
"Use ONLY when you need full object details or administrative data from ARIN. "
"DO NOT use for contact information - use arin_contact_card for abuse, NOC, admin, or tech contacts. "
"DO NOT use for route validation - use arin_validate_route_object for checking if route objects exist. "
"DO NOT use for AS-SET expansion - use arin_expand_as_set for getting ASN lists. "
"This returns raw ARIN database records with all attributes for detailed analysis."
)
QUERY_DESCRIPTION = (
"The domain name, IP address, ASN, or other identifier to query via ARIN WHOIS. "
"Examples: 'example.com', '8.8.8.8', 'AS15169', 'GOOGLE'. "
"Returns complete object details from the ARIN database."
)
FLAGS_DESCRIPTION = (
"Optional WHOIS flags to modify the query behavior. Common ARIN flags: "
"['+'] for full details, ['-'] for brief output. ARIN uses different flags than RIPE. "
"Use empty list [] or null for default query."
)
async def _whois_request(
query: Annotated[str, Field(description=QUERY_DESCRIPTION)],
flags: Annotated[
list[str] | None,
Field(default=None, description=FLAGS_DESCRIPTION),
] = None,
*,
ctx: Context[ServerSession, None],
) -> dict[str, Any]:
"""Execute a WHOIS request and return the result in a structured format."""
# Check if ARIN support is enabled
if not SUPPORT_ARIN:
error_msg = "WHOIS queries are currently disabled (SUPPORT_ARIN=false)"
logger.warning(error_msg)
await ctx.error(error_msg)
return {
"ok": False,
"error": "service_disabled",
"detail": "ARIN WHOIS support is disabled. Set SUPPORT_ARIN=true to enable.",
}
# Create cache key from query and flags
cache_key = f"arin:{query}|{','.join(flags or [])}"
# Log the incoming request
await ctx.info(
f"ARIN WHOIS query requested: '{query}'"
+ (f" with flags: {flags}" if flags else "")
)
# Check cache first
cached_result = _whois_cache.get(cache_key)
if cached_result is not None:
logger.info(f"ARIN WHOIS query for '{query}' served from cache")
await ctx.info(f"ARIN WHOIS query for '{query}' served from cache")
return cached_result
line = (" ".join(flags or []) + " " + query).strip() + "\r\n"
start = time.perf_counter()
# Get appropriate WHOIS server configuration
whois_server, whois_port = _get_whois_config()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(whois_server, whois_port),
WHOIS_CONNECT_TIMEOUT_SECONDS,
)
writer.write(line.encode("utf-8"))
await writer.drain()
chunks: list[bytes] = []
while True:
try:
chunk = await asyncio.wait_for(
reader.read(8192), WHOIS_READ_TIMEOUT_SECONDS
)
if not chunk:
break
chunks.append(chunk)
except TimeoutError:
break
with contextlib.suppress(Exception):
writer.close()
await writer.wait_closed()
rpsl = b"".join(chunks).decode("utf-8", errors="replace")
latency_ms = int((time.perf_counter() - start) * 1000)
logger.info(f"ARIN WHOIS query for '{query}' completed in {latency_ms}ms")
# Log successful completion via MCP context
await ctx.info(
f"ARIN WHOIS query for '{query}' completed successfully in {latency_ms}ms (server: {whois_server})"
)
result = {
"ok": True,
"data": {"rpsl": rpsl, "server": whois_server, "latency_ms": latency_ms},
}
# Cache the successful result
_whois_cache.set(cache_key, result)
return result
except TimeoutError:
error_msg = f"ARIN WHOIS query for '{query}' timed out"
logger.error(error_msg)
await ctx.error(f"{error_msg} (server: {whois_server})")
return {
"ok": False,
"error": "timeout_error",
"detail": "Connection or read timeout",
}
except (ConnectionError, OSError) as e:
error_msg = f"Network error for ARIN WHOIS query '{query}': {str(e)}"
logger.error(error_msg)
await ctx.error(f"{error_msg} (server: {whois_server})")
return {
"ok": False,
"error": "network_error",
"detail": f"Network connection failed: {str(e)}",
}
except Exception as e:
error_msg = f"Unexpected error for ARIN WHOIS query '{query}': {str(e)}"
logger.error(error_msg)
await ctx.error(f"{error_msg} (server: {whois_server})")
return {
"ok": False,
"error": "internal_error",
"detail": f"Internal server error: {str(e)}",
}
def register(mcp: FastMCP) -> None:
"""Register the ARIN WHOIS query tool with the MCP server."""
mcp.tool(
name=TOOL_NAME,
description=TOOL_DESCRIPTION,
)(_whois_request)