"""MCP Server that bridges MCP clients to Temporal Nexus operations."""
import asyncio
import logging
import sys
from typing import Optional
from mcp.server.lowlevel import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from nexusmcp import InboundGateway
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CalculatorMCPServer:
"""MCP Server for the calculator application."""
def __init__(
self,
temporal_host: str = "localhost:7233",
namespace: str = "my-caller-namespace",
endpoint: str = "mcp-gateway",
) -> None:
self.temporal_host = temporal_host
self.namespace = namespace
self.endpoint = endpoint
self.client: Optional[Client] = None
self.server = Server("nexus-mcp-calculator")
async def connect(self) -> None:
"""Connect to Temporal server."""
logger.info(f"Connecting to Temporal at {self.temporal_host}, namespace: {self.namespace}")
self.client = await Client.connect(
self.temporal_host,
namespace=self.namespace,
data_converter=pydantic_data_converter,
)
logger.info("Successfully connected to Temporal")
async def setup_gateway(self) -> InboundGateway:
"""Set up the MCP gateway."""
if not self.client:
await self.connect()
assert self.client is not None
logger.info(f"Setting up MCP gateway for endpoint: {self.endpoint}")
# Create the MCP gateway
gateway = InboundGateway(
client=self.client,
endpoint=self.endpoint,
)
# Register the server with the gateway
gateway.register(self.server)
return gateway
async def run(self) -> None:
"""Run the MCP server."""
try:
gateway = await self.setup_gateway()
logger.info("Starting MCP server...")
logger.info("Available calculator tools will be automatically discovered from Nexus operations")
logger.info("Connect your MCP client to start using calculator tools")
# Run the gateway and stdio server
async with gateway.run():
# Set up stdio transport for MCP communication
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP server running on stdio transport")
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="nexus-mcp-calculator",
server_version="0.1.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(
tools_changed=True,
resources_changed=True,
prompts_changed=True,
),
experimental_capabilities={},
),
),
)
except Exception as e:
logger.error(f"Error in MCP server run method: {e}", exc_info=True)
raise
async def main() -> None:
"""Main entry point for running the MCP server."""
import argparse
parser = argparse.ArgumentParser(description="Run the Nexus MCP Calculator server")
parser.add_argument(
"--temporal-host",
default="localhost:7233",
help="Temporal server host (default: localhost:7233)"
)
parser.add_argument(
"--namespace",
default="my-caller-namespace",
help="Temporal namespace (default: my-caller-namespace)"
)
parser.add_argument(
"--endpoint",
default="mcp-gateway",
help="Nexus endpoint name (default: mcp-gateway)"
)
args = parser.parse_args()
server = CalculatorMCPServer(
temporal_host=args.temporal_host,
namespace=args.namespace,
endpoint=args.endpoint,
)
try:
await server.run()
except KeyboardInterrupt:
logger.info("MCP server stopped")
except Exception as e:
logger.error(f"MCP server failed: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())