Skip to main content
Glama
base_agent_executor_template.py14.4 kB
""" A2A Agent Executor Template - Based on A2A SDK v0.2.9 This template demonstrates the latest patterns and best practices for implementing A2A agents. """ import asyncio import logging from typing import Optional, Dict, Any from uuid import uuid4 from a2a.server.agent_execution.agent_executor import AgentExecutor from a2a.server.agent_execution.context import RequestContext from a2a.server.events.event_queue import EventQueue from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore from a2a.server.tasks.task_updater import TaskUpdater from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication from a2a.utils.errors import ServerError from a2a.types import ( AgentCard, AgentSkill, AgentCapabilities, Part, TextPart, DataPart, TaskState, InvalidParamsError, InternalError, ) # LangGraph imports from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import InMemorySaver from typing import TypedDict, Literal from pydantic import BaseModel # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Example State Schema for LangGraph class AgentState(TypedDict): """State schema for the agent's LangGraph workflow""" user_input: str processing_steps: list[str] result: Optional[Dict[str, Any]] error: Optional[str] require_user_input: bool is_complete: bool # Response Format for structured outputs class ResponseFormat(BaseModel): """Structured response format following A2A patterns""" status: Literal['input_required', 'completed', 'error'] message: str data: Optional[Dict[str, Any]] = None next_steps: Optional[list[str]] = None class MyAgentExecutor(AgentExecutor): """ A2A Agent Executor following latest v0.2.9 patterns. This executor demonstrates: - Proper task state management (submitted → working → completed/failed) - Streaming updates with proper event handling - Error handling with A2A specific error codes - Integration with LangGraph for agent logic """ def __init__(self): """Initialize the agent executor""" self.agent = None self.graph = None self._initialize_lock = asyncio.Lock() async def _initialize(self): """Lazy initialization of agent components""" async with self._initialize_lock: if self.agent is not None: return logger.info("Initializing agent components...") # Initialize your LLM here # llm = create_your_llm() # Initialize checkpointer for conversation memory self.checkpointer = InMemorySaver() # Build the LangGraph workflow self.graph = self._build_graph() logger.info("Agent initialization complete") def _build_graph(self): """Build the LangGraph workflow""" workflow = StateGraph(AgentState) # Define nodes workflow.add_node("process_input", self._process_input) workflow.add_node("execute_task", self._execute_task) workflow.add_node("format_response", self._format_response) # Define edges workflow.set_entry_point("process_input") workflow.add_edge("process_input", "execute_task") workflow.add_edge("execute_task", "format_response") workflow.add_edge("format_response", END) return workflow.compile(checkpointer=self.checkpointer) async def _process_input(self, state: AgentState) -> AgentState: """Process user input and plan execution""" state["processing_steps"].append("Analyzing user input...") # Add your input processing logic here return state async def _execute_task(self, state: AgentState) -> AgentState: """Execute the main task logic""" state["processing_steps"].append("Executing task...") # Add your task execution logic here state["result"] = {"example": "result"} state["is_complete"] = True return state async def _format_response(self, state: AgentState) -> AgentState: """Format the final response""" state["processing_steps"].append("Formatting response...") # Add response formatting logic here return state async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: """ Execute the agent's logic for a given request context. This method follows the A2A v0.2.9 patterns: 1. Extract and validate input 2. Initialize task with proper state transitions 3. Execute agent logic (potentially streaming) 4. Handle errors appropriately 5. Ensure task completion or failure state """ # Ensure agent is initialized await self._initialize() # Create TaskUpdater for managing task state task_id = context.task_id or str(uuid4()) context_id = context.context_id or str(uuid4()) task_updater = TaskUpdater(event_queue, task_id, context_id) try: # 1. Submit task (creates task in 'submitted' state) await task_updater.submit() # 2. Start work (transitions to 'working' state) await task_updater.start_work() # 3. Extract and validate input message_text = self._extract_message_text(context) if not message_text: raise ServerError(InvalidParamsError(message="No message text provided")) # 4. Initialize state for LangGraph initial_state: AgentState = { "user_input": message_text, "processing_steps": [], "result": None, "error": None, "require_user_input": False, "is_complete": False, } # 5. Execute agent logic with streaming updates final_state = None step_count = 0 if not self.graph: raise ServerError(InternalError(message="Agent graph not initialized")) async for chunk in self.graph.astream( initial_state, config={"configurable": {"thread_id": task_id}} ): step_count += 1 # Send intermediate updates if "processing_steps" in chunk and chunk["processing_steps"]: latest_step = chunk["processing_steps"][-1] await task_updater.update_status( TaskState.working, message=task_updater.new_agent_message( parts=[Part(root=TextPart(text=f"Step {step_count}: {latest_step}"))] ) ) # Check for user input requirement if chunk.get("require_user_input", False): await task_updater.update_status( TaskState.input_required, message=task_updater.new_agent_message( parts=[Part(root=TextPart(text="Additional input required"))] ) ) return final_state = chunk # 6. Process final result if not final_state or not final_state.get("is_complete"): raise ServerError(InternalError(message="Agent did not complete execution")) # 7. Send final response result_data = final_state.get("result", {}) summary_text = self._generate_summary(final_state if isinstance(final_state, dict) else {}) parts = [Part(root=TextPart(text=summary_text))] if result_data: parts.append(Part(root=DataPart(data=result_data))) # Important: Include 'final' flag for streaming completion await task_updater.update_status( TaskState.completed, message=task_updater.new_agent_message(parts=parts), final=True # This indicates stream end ) except ServerError: # Re-raise A2A specific errors raise except Exception as e: logger.error(f"Unexpected error in agent execution: {str(e)}", exc_info=True) # Update task to failed state await task_updater.update_status( TaskState.failed, message=task_updater.new_agent_message( parts=[Part(root=TextPart(text=f"Error: {str(e)}"))] ), final=True ) # Raise as InternalError for proper error handling raise ServerError(InternalError(message=str(e))) async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: """ Request the agent to cancel an ongoing task. Following A2A v0.2.9 patterns for task cancellation. """ task_id = context.task_id or str(uuid4()) context_id = context.context_id or str(uuid4()) task_updater = TaskUpdater(event_queue, task_id, context_id) try: # Attempt to cancel any ongoing operations # Add your cancellation logic here # Update task state to canceled await task_updater.update_status( TaskState.canceled, message=task_updater.new_agent_message( parts=[Part(root=TextPart(text="Task canceled by user request"))] ), final=True ) except Exception as e: logger.error(f"Error during task cancellation: {str(e)}", exc_info=True) # Even on error, try to mark as canceled await task_updater.update_status( TaskState.canceled, message=task_updater.new_agent_message( parts=[Part(root=TextPart(text=f"Task canceled with error: {str(e)}"))] ), final=True ) def _extract_message_text(self, context: RequestContext) -> Optional[str]: """Extract text from message parts""" if not context.message or not context.message.parts: return None text_parts = [] for part in context.message.parts: if isinstance(part.root, TextPart): text_parts.append(part.root.text) return " ".join(text_parts) if text_parts else None def _generate_summary(self, state: Dict[str, Any]) -> str: """Generate a summary from the final state""" if state.get("error"): return f"Task failed: {state['error']}" steps = state.get("processing_steps", []) result = state.get("result", {}) summary = "Task completed successfully.\n" if steps: summary += f"Completed {len(steps)} steps.\n" if result: summary += f"Generated {len(result)} results." return summary def create_a2a_server( agent_executor: AgentExecutor, agent_name: str = "My A2A Agent", agent_version: str = "1.0.0", port: int = 8000, ) -> A2AStarletteApplication: """ Create a complete A2A server with proper configuration. This follows A2A v0.2.9 patterns for server setup. """ # Define agent skills skills = [ AgentSkill( id='main_skill', name='Main Processing', description='Process user requests and generate responses', tags=['processing', 'analysis', 'generation'], examples=[ 'Analyze this data and provide insights', 'Generate a report based on the following information' ], ) ] # Define agent capabilities capabilities = AgentCapabilities( streaming=True, # Enable streaming responses pushNotifications=False, # Set to True if implementing webhooks stateTransitionHistory=True, # Track state changes ) # Create agent card with all required fields agent_card = AgentCard( name=agent_name, description='An A2A agent following v0.2.9 patterns', url=f'http://localhost:{port}/', version=agent_version, defaultInputModes=['text', 'application/json'], defaultOutputModes=['text', 'application/json'], capabilities=capabilities, skills=skills, # Authentication configuration (important for production) # securitySchemes={ # 'bearerAuth': { # 'type': 'http', # 'scheme': 'bearer', # 'bearerFormat': 'JWT' # } # }, # security=[{'bearerAuth': []}], ) # Create task store (use Redis for production) task_store = InMemoryTaskStore() # Create request handler request_handler = DefaultRequestHandler( agent_executor=agent_executor, task_store=task_store, # Add these for advanced features: # queue_manager=YourQueueManager(), # push_notifier=YourPushNotifier(), ) # Create and return the A2A application app = A2AStarletteApplication( agent_card=agent_card, http_handler=request_handler ) return app # Example usage if __name__ == '__main__': import uvicorn # Create agent executor executor = MyAgentExecutor() # Create A2A server app = create_a2a_server( agent_executor=executor, agent_name="Example A2A Agent", agent_version="1.0.0", port=8000 ) # Build the ASGI application application = app.build() # Start the server logger.info("Starting A2A Agent Server on port 8000") uvicorn.run(application, host='0.0.0.0', port=8000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/HyunjunJeon/vibecoding-lg-mcp-a2a'

If you have feedback or need assistance with the MCP directory API, please join our Discord server