Skip to main content
Glama
server.py7.81 kB
"""FastMCP server for background job management.""" import logging from typing import Optional from fastmcp import FastMCP from fastmcp.exceptions import ToolError from pydantic import Field from .config import load_config from .models import ExecuteOutput, KillOutput, ListOutput, ProcessOutput, StatusOutput from .service import JobManager logger = logging.getLogger(__name__) # Global job manager instance _job_manager: Optional[JobManager] = None def get_job_manager() -> JobManager: """Get or create the global job manager instance.""" global _job_manager if _job_manager is None: config = load_config() _job_manager = JobManager(config) logger.info("Initialized JobManager") return _job_manager # Initialize FastMCP server mcp = FastMCP("mcp-background-job") @mcp.tool() async def list_jobs() -> ListOutput: """List all background jobs with their status. Returns a list of all background jobs, including their job ID, status, command, and start time. Jobs are sorted by start time (newest first). """ try: job_manager = get_job_manager() jobs = await job_manager.list_jobs() return ListOutput(jobs=jobs) except Exception as e: logger.error(f"Error listing jobs: {e}") raise ToolError(f"Failed to list jobs: {str(e)}") @mcp.tool() async def get_job_status( job_id: str = Field(..., description="Job ID to check"), ) -> StatusOutput: """Get the current status of a background job. Args: job_id: The UUID of the job to check Returns: The current status of the job (running, completed, failed, or killed) """ try: job_manager = get_job_manager() job_status = await job_manager.get_job_status(job_id) return StatusOutput(status=job_status) except KeyError: raise ToolError(f"Job {job_id} not found") except Exception as e: logger.error(f"Error getting job status for {job_id}: {e}") raise ToolError(f"Failed to get job status: {str(e)}") @mcp.tool() async def get_job_output( job_id: str = Field(..., description="Job ID to get output from"), ) -> ProcessOutput: """Get the complete stdout and stderr output of a job. Args: job_id: The UUID of the job to get output from Returns: ProcessOutput containing the complete stdout and stderr content """ try: job_manager = get_job_manager() job_output = await job_manager.get_job_output(job_id) return job_output except KeyError: raise ToolError(f"Job {job_id} not found") except Exception as e: logger.error(f"Error getting job output for {job_id}: {e}") raise ToolError(f"Failed to get job output: {str(e)}") @mcp.tool() async def tail_job_output( job_id: str = Field(..., description="Job ID to tail"), lines: int = Field(50, description="Number of lines to return", ge=1, le=1000), ) -> ProcessOutput: """Get the last N lines of stdout and stderr from a job. Args: job_id: The UUID of the job to tail lines: Number of lines to return (1-1000, default 50) Returns: ProcessOutput containing the last N lines of stdout and stderr """ try: job_manager = get_job_manager() job_output = await job_manager.tail_job_output(job_id, lines) return job_output except KeyError: raise ToolError(f"Job {job_id} not found") except ValueError as e: raise ToolError(f"Invalid parameter: {str(e)}") except Exception as e: logger.error(f"Error tailing job output for {job_id}: {e}") raise ToolError(f"Failed to tail job output: {str(e)}") @mcp.tool() async def execute_command( command: str = Field(..., description="Shell command to execute"), ) -> ExecuteOutput: """Execute a command as a background job and return job ID. Args: command: Shell command to execute in the background Returns: ExecuteOutput containing the job ID (UUID) of the started job """ try: job_manager = get_job_manager() job_id = await job_manager.execute_command(command) return ExecuteOutput(job_id=job_id) except ValueError as e: raise ToolError(f"Invalid command: {str(e)}") except RuntimeError as e: if "Maximum concurrent jobs limit" in str(e): raise ToolError(f"Job limit reached: {str(e)}") else: raise ToolError(f"Failed to start job: {str(e)}") except Exception as e: logger.error(f"Error executing command '{command}': {e}") raise ToolError(f"Failed to execute command: {str(e)}") @mcp.tool() async def interact_with_job( job_id: str = Field(..., description="Job ID to interact with"), input: str = Field(..., description="Input to send to the job's stdin"), ) -> ProcessOutput: """Send input to a job's stdin and return any immediate output. Args: job_id: The UUID of the job to interact with input: Text to send to the job's stdin Returns: ProcessOutput containing any immediate stdout/stderr output after sending input """ try: job_manager = get_job_manager() interaction_result = await job_manager.interact_with_job(job_id, input) return interaction_result except KeyError: raise ToolError(f"Job {job_id} not found") except RuntimeError as e: if "not running" in str(e): raise ToolError(f"Job {job_id} is not running and cannot accept input") else: raise ToolError(f"Failed to interact with job: {str(e)}") except Exception as e: logger.error(f"Error interacting with job {job_id}: {e}") raise ToolError(f"Failed to interact with job: {str(e)}") @mcp.tool() async def kill_job( job_id: str = Field(..., description="Job ID to kill"), ) -> KillOutput: """Kill a running background job. Args: job_id: The UUID of the job to terminate Returns: KillOutput indicating the result of the kill operation """ try: job_manager = get_job_manager() kill_result = await job_manager.kill_job(job_id) return KillOutput(status=kill_result) except Exception as e: logger.error(f"Error killing job {job_id}: {e}") raise ToolError(f"Failed to kill job: {str(e)}") async def cleanup_on_shutdown(): """Cleanup function called on server shutdown.""" global _job_manager if _job_manager: logger.info("Shutting down JobManager...") await _job_manager.shutdown() logger.info("JobManager shutdown complete") def main(): """Main entry point for the MCP server.""" import asyncio import signal import sys # Set up logging to stderr (required for stdio transport) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stderr, ) logger.info("Starting MCP Background Job Server") # Set up graceful shutdown async def shutdown_handler(): await cleanup_on_shutdown() sys.exit(0) def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") asyncio.create_task(shutdown_handler()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: # Run the FastMCP server mcp.run() except KeyboardInterrupt: logger.info("Received KeyboardInterrupt, shutting down...") asyncio.run(cleanup_on_shutdown()) except Exception as e: logger.error(f"Server error: {e}") asyncio.run(cleanup_on_shutdown()) sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server