"""Calculator workflows for Temporal execution."""
from temporalio import workflow
from nexus_mcp_calculator.service import (
CalculateRequest,
CalculateResponse,
AddRequest,
SubtractRequest,
MultiplyRequest,
DivideRequest,
PowerRequest,
SumListRequest,
BasicOperationResponse,
)
from nexus_mcp_calculator.activities import (
calculate_activity,
add_activity,
subtract_activity,
multiply_activity,
divide_activity,
power_activity,
sum_list_activity,
)
@workflow.defn
class CalculateWorkflow:
"""Workflow for evaluating mathematical expressions."""
@workflow.run
async def run(self, input: CalculateRequest) -> CalculateResponse:
"""Run the calculate workflow."""
workflow.logger.info(f"๐งฎ Starting calculate workflow for: '{input.expression}'")
try:
result = await workflow.execute_activity(
calculate_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"๐งฎ Calculate workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"๐งฎ Calculate workflow failed: {e}")
raise
@workflow.defn
class AddWorkflow:
"""Workflow for adding two numbers."""
@workflow.run
async def run(self, input: AddRequest) -> BasicOperationResponse:
"""Run the add workflow."""
workflow.logger.info(f"โ Starting add workflow for: {input.a} + {input.b}")
try:
result = await workflow.execute_activity(
add_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"โ Add workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"โ Add workflow failed: {e}")
raise
@workflow.defn
class SubtractWorkflow:
"""Workflow for subtracting two numbers."""
@workflow.run
async def run(self, input: SubtractRequest) -> BasicOperationResponse:
"""Run the subtract workflow."""
workflow.logger.info(f"โ Starting subtract workflow for: {input.a} - {input.b}")
try:
result = await workflow.execute_activity(
subtract_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"โ Subtract workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"โ Subtract workflow failed: {e}")
raise
@workflow.defn
class MultiplyWorkflow:
"""Workflow for multiplying two numbers."""
@workflow.run
async def run(self, input: MultiplyRequest) -> BasicOperationResponse:
"""Run the multiply workflow."""
workflow.logger.info(f"โ๏ธ Starting multiply workflow for: {input.a} * {input.b}")
try:
result = await workflow.execute_activity(
multiply_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"โ๏ธ Multiply workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"โ๏ธ Multiply workflow failed: {e}")
raise
@workflow.defn
class DivideWorkflow:
"""Workflow for dividing two numbers."""
@workflow.run
async def run(self, input: DivideRequest) -> BasicOperationResponse:
"""Run the divide workflow."""
workflow.logger.info(f"โ Starting divide workflow for: {input.a} / {input.b}")
try:
result = await workflow.execute_activity(
divide_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"โ Divide workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"โ Divide workflow failed: {e}")
raise
@workflow.defn
class PowerWorkflow:
"""Workflow for raising a number to a power."""
@workflow.run
async def run(self, input: PowerRequest) -> BasicOperationResponse:
"""Run the power workflow."""
workflow.logger.info(f"๐ข Starting power workflow for: {input.base} ^ {input.exponent}")
try:
result = await workflow.execute_activity(
power_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"๐ข Power workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"๐ข Power workflow failed: {e}")
raise
@workflow.defn
class SumListWorkflow:
"""Workflow for summing a list of numbers."""
@workflow.run
async def run(self, input: SumListRequest) -> BasicOperationResponse:
"""Run the sum_list workflow."""
workflow.logger.info(f"๐ Starting sum_list workflow for {len(input.numbers)} numbers")
try:
result = await workflow.execute_activity(
sum_list_activity,
input,
start_to_close_timeout=workflow.timedelta(seconds=30),
)
workflow.logger.info(f"๐ Sum_list workflow completed: {result.result}")
return result
except Exception as e:
workflow.logger.error(f"๐ Sum_list workflow failed: {e}")
raise