Skip to main content
Glama

MCP Scheduler

by RadiumGu
main.py9.5 kB
""" Entry point for MCP Scheduler. """ import asyncio import argparse import logging import os import signal import sys import traceback import threading import json from mcp_scheduler.config import Config from mcp_scheduler.persistence import Database from mcp_scheduler.executor import Executor from mcp_scheduler.scheduler import Scheduler from mcp_scheduler.server import SchedulerServer from mcp_scheduler.utils import setup_logging # For well-known endpoint from mcp_scheduler.well_known import setup_well_known import aiohttp # Import our custom JSON parser utilities try: from mcp_scheduler.json_parser import patch_fastmcp_parser, install_stdio_wrapper except ImportError: # Define dummies if the module isn't available def patch_fastmcp_parser(): return False def install_stdio_wrapper(): return False # Global variables for cleanup scheduler = None server = None scheduler_task = None def log_to_stderr(message): """Log messages to stderr instead of stdout to avoid interfering with stdio transport.""" print(message, file=sys.stderr, flush=True) def run_scheduler_in_thread(scheduler_instance): """Run the scheduler in a separate thread.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def scheduler_loop(): await scheduler_instance.start() try: while True: await asyncio.sleep(1) except asyncio.CancelledError: await scheduler_instance.stop() loop.run_until_complete(scheduler_loop()) loop.close() def handle_sigterm(signum, frame): """Handle SIGTERM gracefully""" log_to_stderr("Received SIGTERM signal. Shutting down...") if scheduler: try: # Create a new event loop for clean shutdown loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(scheduler.stop()) loop.close() except Exception as e: log_to_stderr(f"Error during shutdown: {e}") sys.exit(0) # Custom input wrapper to handle malformed JSON class SafeJsonStdin: def __init__(self, original_stdin): self.original_stdin = original_stdin def readline(self): line = self.original_stdin.readline() if not line: return line # Only try to fix lines that look like JSON if line.strip().startswith('{') or line.strip().startswith('['): try: # Parse the JSON to ensure it's valid json_obj = json.loads(line) return line except json.JSONDecodeError as e: log_to_stderr(f"Fixing malformed JSON input: {e}") # Try to extract the ID if present try: import re id_match = re.search(r'"id"\s*:\s*(\d+)', line) if id_match: id_val = id_match.group(1) return f'{{"jsonrpc":"2.0","id":{id_val},"error":{{"code":-32700,"message":"Parse error"}}}}\n' # Default response for malformed JSON return '{"jsonrpc":"2.0","id":null,"error":{"code":-32700,"message":"Parse error"}}\n' except Exception as ex: log_to_stderr(f"Error creating error response: {ex}") # Ultimate fallback return '{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}\n' return line def __getattr__(self, name): return getattr(self.original_stdin, name) def start_well_known_server(): app = aiohttp.web.Application() setup_well_known(app) # Use the same address/port as the MCP server if possible from mcp_scheduler.config import Config config = Config() aiohttp.web.run_app(app, host=config.server_address, port=config.server_port + 1) def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="MCP Scheduler Server") parser.add_argument( "--address", default=None, help="Server address (default: localhost or from config/env)" ) parser.add_argument( "--port", type=int, default=None, help="Server port (default: 8080 or from config/env)" ) parser.add_argument( "--transport", choices=["sse", "stdio"], default=None, help="Transport mode (sse or stdio) (default: sse or from config/env)" ) parser.add_argument( "--log-level", default=None, help="Logging level (default: INFO or from config/env)" ) parser.add_argument( "--log-file", default=None, help="Log file path (default: None or from config/env)" ) parser.add_argument( "--db-path", default=None, help="SQLite database path (default: scheduler.db or from config/env)" ) parser.add_argument( "--config", default=None, help="Path to JSON configuration file" ) parser.add_argument( "--ai-model", default=None, help="AI model to use for AI tasks (default: gpt-4o or from config/env)" ) parser.add_argument( "--version", action="store_true", help="Show version and exit" ) parser.add_argument( "--debug", action="store_true", help="Enable debug mode with full traceback" ) parser.add_argument( "--fix-json", action="store_true", help="Enable JSON fixing for malformed messages" ) args = parser.parse_args() # Set environment variables for config if args.config: os.environ["MCP_SCHEDULER_CONFIG_FILE"] = args.config if args.address: os.environ["MCP_SCHEDULER_ADDRESS"] = args.address if args.port: os.environ["MCP_SCHEDULER_PORT"] = str(args.port) if args.transport: os.environ["MCP_SCHEDULER_TRANSPORT"] = args.transport if args.log_level: os.environ["MCP_SCHEDULER_LOG_LEVEL"] = args.log_level if args.log_file: os.environ["MCP_SCHEDULER_LOG_FILE"] = args.log_file if args.db_path: os.environ["MCP_SCHEDULER_DB_PATH"] = args.db_path if args.ai_model: os.environ["MCP_SCHEDULER_AI_MODEL"] = args.ai_model # Enable debug mode debug_mode = args.debug # Register signal handlers signal.signal(signal.SIGTERM, handle_sigterm) try: # Install JSON fixing patches if requested or always in debug mode if args.fix_json or debug_mode: # Try to patch the FastMCP parser directly if not patch_fastmcp_parser(): # If direct patching fails, use stdin wrapper as fallback if not install_stdio_wrapper(): # Last resort: replace stdin with our custom wrapper log_to_stderr("Installing basic JSON fix wrapper") sys.stdin = SafeJsonStdin(sys.stdin) # Load configuration config = Config() # Show version and exit if requested if args.version: log_to_stderr(f"{config.server_name} version {config.server_version}") sys.exit(0) # Configure logging - ensure it goes to a file or stderr, not stdout if not config.log_file and config.transport == "stdio": # Force a log file when using stdio transport if none was specified config.log_file = "mcp_scheduler.log" setup_logging(config.log_level, config.log_file) # Initialize components database = Database(config.db_path) executor = Executor(config.openai_api_key, config.ai_model) executor.execution_timeout = config.execution_timeout global scheduler scheduler = Scheduler(database, executor) global server server = SchedulerServer(scheduler, config) # Start the scheduler in a separate thread scheduler_thread = threading.Thread( target=run_scheduler_in_thread, args=(scheduler,), daemon=True # Make thread a daemon so it exits when the main thread exits ) scheduler_thread.start() log_to_stderr(f"Scheduler started in background thread") # Si el transporte es SSE, lanza el servidor well-known en un thread aparte if config.transport == "sse": log_to_stderr(f"Starting well-known server on port {config.server_port + 1}") threading.Thread(target=start_well_known_server, daemon=True).start() # Start the MCP server (this will block with stdio transport) log_to_stderr(f"Starting MCP server with {config.transport} transport") server.start() except KeyboardInterrupt: log_to_stderr("Interrupted by user") sys.exit(0) except json.JSONDecodeError as e: log_to_stderr(f"JSON parsing error: {e}") if debug_mode: traceback.print_exc(file=sys.stderr) sys.exit(1) except Exception as e: log_to_stderr(f"Error during initialization: {e}") if debug_mode: traceback.print_exc(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/RadiumGu/Q-scheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server