Skip to main content
Glama
test_workflow.pyโ€ข6.41 kB
"""Tests for workflow engine.""" import pytest from pathlib import Path from delegation_mcp.workflow import ( WorkflowDefinition, WorkflowStep, WorkflowContext, WorkflowEngine, ) from delegation_mcp.config import OrchestratorConfig from delegation_mcp.orchestrator import OrchestratorRegistry @pytest.fixture def simple_workflow(): """Create a simple test workflow.""" return WorkflowDefinition( name="Test Workflow", description="Simple test workflow", steps=[ WorkflowStep( id="step1", agent="claude", task="Analyze {{ code_path }}", output="analysis", description="First step", ), WorkflowStep( id="step2", agent="gemini", task="Review: {{ analysis }}", output="review", condition="{{ analysis | length > 0 }}", description="Second step", ), ], ) @pytest.fixture def test_registry(): """Create test registry with mock commands.""" reg = OrchestratorRegistry() reg.register( OrchestratorConfig( name="claude", command="echo", args=["[CLAUDE]"], enabled=True, ) ) reg.register( OrchestratorConfig( name="gemini", command="echo", args=["[GEMINI]"], enabled=True, ) ) return reg @pytest.fixture def workflow_engine(test_registry): """Create workflow engine.""" return WorkflowEngine(test_registry) def test_workflow_context(): """Test workflow context variable management.""" context = WorkflowContext() # Set and get variables context.set("foo", "bar") assert context.get("foo") == "bar" assert context.get("missing") is None assert context.get("missing", "default") == "default" def test_context_interpolation(): """Test variable interpolation in templates.""" context = WorkflowContext() context.set("name", "World") context.set("count", 42) result = context.interpolate("Hello {{ name }}! Count: {{ count }}") assert result == "Hello World! Count: 42" def test_context_condition_evaluation(): """Test condition evaluation.""" context = WorkflowContext() # Test truthy/falsy context.set("exists", "value") assert context.evaluate_condition("{{ exists }}") is True context.set("empty", "") assert context.evaluate_condition("{{ empty }}") is False # Test length conditions context.set("items", [1, 2, 3]) assert context.evaluate_condition("{{ items | length > 0 }}") is True assert context.evaluate_condition("{{ items | length > 5 }}") is False context.set("text", "hello") assert context.evaluate_condition("{{ text | length > 0 }}") is True def test_workflow_serialization(simple_workflow, tmp_path): """Test workflow save/load.""" workflow_file = tmp_path / "test.yaml" # Save simple_workflow.to_yaml(workflow_file) assert workflow_file.exists() # Load loaded = WorkflowDefinition.from_yaml(workflow_file) assert loaded.name == simple_workflow.name assert len(loaded.steps) == len(simple_workflow.steps) assert loaded.steps[0].id == "step1" @pytest.mark.asyncio async def test_workflow_execution(workflow_engine, simple_workflow): """Test basic workflow execution.""" result = await workflow_engine.execute( simple_workflow, initial_context={"code_path": "test.py"} ) assert result is not None assert result.workflow_name == "Test Workflow" assert result.steps_completed == 2 assert result.total_steps == 2 assert result.success is True assert "analysis" in result.outputs assert "review" in result.outputs @pytest.mark.asyncio async def test_workflow_conditional_skip(workflow_engine): """Test conditional step skipping.""" workflow = WorkflowDefinition( name="Conditional Test", steps=[ WorkflowStep( id="always", agent="claude", task="Always run", output="result1", ), WorkflowStep( id="never", agent="gemini", task="Never run", output="result2", condition="{{ missing_var }}", ), ], ) result = await workflow_engine.execute(workflow) # Only first step should execute assert result.steps_completed == 1 assert "result1" in result.outputs assert "result2" not in result.outputs @pytest.mark.asyncio async def test_workflow_error_handling(test_registry): """Test workflow error handling.""" # Register a failing command test_registry.register( OrchestratorConfig( name="failing", command="false", # Command that always fails args=[], enabled=True, ) ) engine = WorkflowEngine(test_registry) workflow = WorkflowDefinition( name="Error Test", steps=[ WorkflowStep( id="fail", agent="failing", task="This will fail", output="result", ), WorkflowStep( id="never_reached", agent="claude", task="Should not execute", output="result2", ), ], ) result = await engine.execute(workflow) # Should stop on first error assert result.success is False assert result.steps_completed == 0 # Failed step doesn't count as completed assert len(result.errors) > 0 @pytest.mark.asyncio async def test_load_actual_workflows(workflow_engine): """Test loading actual workflow files.""" workflows_dir = Path("workflows") if not workflows_dir.exists(): pytest.skip("Workflows directory not found") workflows = workflow_engine.list_workflows(workflows_dir) # Should load at least some workflows assert len(workflows) > 0 # Check first workflow is valid workflow = workflows[0] assert workflow.name assert len(workflow.steps) > 0 assert all(step.agent for step in workflow.steps) assert all(step.task for step in workflow.steps)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/carlosduplar/multi-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server