main.py•4.03 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-07-12T09:50:08+00:00
import argparse
import json
import os
from typing import *
from typing import Optional
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import APIKeyHeader, BaseSecurity
from models import (
ArrayOfBatch,
Batch,
BatchPostRequest,
DbGetResponse,
DomainsDomainCheckGetResponse,
DomainsDomainRankGetResponse,
Format,
)
app = MCPProxy(
description='Domain API (WHOIS, Check, Batch)',
title='Bulk WHOIS API',
version='1.0',
servers=[{'url': 'http://localhost:5000'}, {'url': 'https://apispot.io/api'}],
)
@app.get(
'/batch',
description=""" Get your batches """,
tags=['batch_record_management'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def get_batches():
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/batch',
description=""" Create batch. Batch is then being processed until all provided items have been completed. At any time it can be `get` to provide current status with results optionally. """,
tags=['batch_record_management'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def create_batch(body: BatchPostRequest):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/batch/{id}',
description=""" Delete batch """,
tags=['batch_record_management', 'database_query_operations'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def delete_batch(id: str):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/batch/{id}',
description=""" Get batch """,
tags=['database_query_operations'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def get_batch(id: str):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/db',
description=""" Query domain database """,
tags=['domain_query_operations'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def query_db(query: str):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/domains/{domain}/check',
description=""" Check domain availability """,
tags=['domain_query_operations'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def check_domain(domain: str):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/domains/{domain}/rank',
description=""" Check domain rank (authority). """,
tags=['domain_query_operations'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def domain_rank(domain: str):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/domains/{domain}/whois',
description=""" WHOIS query for a domain """,
tags=['domain_query_operations'],
security=[
APIKeyHeader(name="X-API-KEY"),
],
)
def whois(domain: str, format: Optional[Format] = None):
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)