"""Temporal worker for running calculator service handlers."""
import asyncio
import logging
import sys
from typing import Optional
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.worker import Worker
from nexus_mcp_calculator.service_handler import mcp_service_handler, CalculatorHandler
from nexus_mcp_calculator.workflows import (
CalculateWorkflow,
AddWorkflow,
SubtractWorkflow,
MultiplyWorkflow,
DivideWorkflow,
PowerWorkflow,
SumListWorkflow,
)
from nexus_mcp_calculator.activities import (
calculate_activity,
add_activity,
subtract_activity,
multiply_activity,
divide_activity,
power_activity,
sum_list_activity,
)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CalculatorWorker:
"""Worker class for running the calculator service."""
def __init__(
self,
temporal_host: str = "localhost:7233",
namespace: str = "my-handler-namespace",
task_queue: str = "mcp",
) -> None:
self.temporal_host = temporal_host
self.namespace = namespace
self.task_queue = task_queue
self.client: Optional[Client] = None
async def connect(self) -> None:
"""Connect to Temporal server."""
logger.info(f"Connecting to Temporal at {self.temporal_host}, namespace: {self.namespace}")
self.client = await Client.connect(
self.temporal_host,
namespace=self.namespace,
data_converter=pydantic_data_converter,
)
logger.info("Successfully connected to Temporal")
async def run(self) -> None:
"""Run the worker with calculator service handlers."""
if not self.client:
await self.connect()
assert self.client is not None
logger.info(f"Starting worker on task queue: {self.task_queue}")
# Create the worker with our service handlers, workflows, and activities
async with Worker(
self.client,
task_queue=self.task_queue,
nexus_service_handlers=[CalculatorHandler(), mcp_service_handler],
workflows=[
CalculateWorkflow,
AddWorkflow,
SubtractWorkflow,
MultiplyWorkflow,
DivideWorkflow,
PowerWorkflow,
SumListWorkflow,
],
activities=[
calculate_activity,
add_activity,
subtract_activity,
multiply_activity,
divide_activity,
power_activity,
sum_list_activity,
],
):
logger.info("Worker started successfully. Waiting for tasks...")
logger.info("Registered 7 workflows and 7 activities for calculator operations:")
logger.info(" - calculate: Evaluate mathematical expressions (workflow + activity)")
logger.info(" - add: Add two numbers (workflow + activity)")
logger.info(" - subtract: Subtract two numbers (workflow + activity)")
logger.info(" - multiply: Multiply two numbers (workflow + activity)")
logger.info(" - divide: Divide two numbers (workflow + activity)")
logger.info(" - power: Raise a number to a power (workflow + activity)")
logger.info(" - sum_list: Sum a list of numbers (workflow + activity)")
logger.info("All operations now visible in Temporal UI as workflow executions!")
logger.info("Press Ctrl+C to stop the worker")
try:
# Keep the worker running
await asyncio.Event().wait()
except KeyboardInterrupt:
logger.info("Received interrupt signal, shutting down worker...")
async def main() -> None:
"""Main entry point for running the calculator worker."""
import argparse
parser = argparse.ArgumentParser(description="Run the Nexus MCP Calculator worker")
parser.add_argument(
"--temporal-host",
default="localhost:7233",
help="Temporal server host (default: localhost:7233)"
)
parser.add_argument(
"--namespace",
default="my-handler-namespace",
help="Temporal namespace (default: my-handler-namespace)"
)
parser.add_argument(
"--task-queue",
default="mcp",
help="Task queue name (default: mcp)"
)
args = parser.parse_args()
worker = CalculatorWorker(
temporal_host=args.temporal_host,
namespace=args.namespace,
task_queue=args.task_queue,
)
try:
await worker.run()
except KeyboardInterrupt:
logger.info("Worker stopped")
except Exception as e:
logger.error(f"Worker failed: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())