Skip to main content
Glama

MCP Server for Apache Airflow

by tomnagengast
http-server.ts7.57 kB
#!/usr/bin/env node import { createServer } from "node:http"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { z } from "zod"; import { AirflowClient } from "./airflow-client.js"; // Parse command line arguments const args = process.argv.slice(2); const port = parseInt(args.find(arg => arg.startsWith('--port='))?.split('=')[1] || '3000'); const server = new McpServer({ name: "mcp-server-airflow", version: "1.0.0", }); // Initialize Airflow client const airflowClient = new AirflowClient(); // Register all tools (same as in index.ts) server.tool( "airflow_list_dags", "List all DAGs in Airflow", { limit: z.number().default(100).optional().describe("Maximum number of DAGs to return"), offset: z.number().default(0).optional().describe("Number of DAGs to skip"), order_by: z.string().default("dag_id").optional().describe("Field to order by"), }, async (args, _extra) => { return await airflowClient.listDags(args); } ); server.tool( "airflow_get_dag", "Get details of a specific DAG", { dag_id: z.string().describe("The ID of the DAG"), }, async (args, _extra) => { return await airflowClient.getDag(args.dag_id); } ); server.tool( "airflow_trigger_dag", "Trigger a DAG run", { dag_id: z.string().describe("The ID of the DAG to trigger"), dag_run_id: z.string().optional().describe("Custom run ID (optional)"), conf: z.record(z.any()).optional().describe("Configuration parameters for the DAG run"), }, async (args, _extra) => { return await airflowClient.triggerDag(args.dag_id, args.dag_run_id, args.conf); } ); server.tool( "airflow_list_dag_runs", "List DAG runs for a specific DAG", { dag_id: z.string().describe("The ID of the DAG"), limit: z.number().default(25).optional().describe("Maximum number of runs to return"), offset: z.number().default(0).optional().describe("Number of runs to skip"), }, async (args, _extra) => { return await airflowClient.listDagRuns(args.dag_id, args); } ); server.tool( "airflow_get_dag_run", "Get details of a specific DAG run", { dag_id: z.string().describe("The ID of the DAG"), dag_run_id: z.string().describe("The ID of the DAG run"), }, async (args, _extra) => { return await airflowClient.getDagRun(args.dag_id, args.dag_run_id); } ); server.tool( "airflow_list_task_instances", "List task instances for a DAG run", { dag_id: z.string().describe("The ID of the DAG"), dag_run_id: z.string().describe("The ID of the DAG run"), }, async (args, _extra) => { return await airflowClient.listTaskInstances(args.dag_id, args.dag_run_id); } ); server.tool( "airflow_get_task_instance", "Get details of a specific task instance", { dag_id: z.string().describe("The ID of the DAG"), dag_run_id: z.string().describe("The ID of the DAG run"), task_id: z.string().describe("The ID of the task"), }, async (args, _extra) => { return await airflowClient.getTaskInstance(args.dag_id, args.dag_run_id, args.task_id); } ); server.tool( "airflow_pause_dag", "Pause a DAG", { dag_id: z.string().describe("The ID of the DAG to pause"), }, async (args, _extra) => { return await airflowClient.pauseDag(args.dag_id); } ); server.tool( "airflow_unpause_dag", "Unpause a DAG", { dag_id: z.string().describe("The ID of the DAG to unpause"), }, async (args, _extra) => { return await airflowClient.unpauseDag(args.dag_id); } ); server.tool( "airflow_get_task_logs", "Get logs for a specific task instance", { dag_id: z.string().describe("The ID of the DAG"), dag_run_id: z.string().describe("The ID of the DAG run"), task_id: z.string().describe("The ID of the task"), task_try_number: z.number().default(1).optional().describe("The try number of the task (default: 1)"), full_content: z.boolean().default(true).optional().describe("Whether to get full log content (default: true)"), }, async (args, _extra) => { return await airflowClient.getTaskLogs( args.dag_id, args.dag_run_id, args.task_id, args.task_try_number, args.full_content ); } ); server.tool( "airflow_get_dag_run_logs", "Get logs for all tasks in a DAG run", { dag_id: z.string().describe("The ID of the DAG"), dag_run_id: z.string().describe("The ID of the DAG run"), limit: z.number().default(10).optional().describe("Maximum number of tasks to show logs for (default: 10)"), }, async (args, _extra) => { return await airflowClient.getDagRunLogs(args.dag_id, args.dag_run_id, args.limit); } ); server.tool( "airflow_tail_dag_run", "Tail/monitor a DAG run showing recent activity and logs", { dag_id: z.string().describe("The ID of the DAG"), dag_run_id: z.string().describe("The ID of the DAG run"), max_lines: z.number().default(50).optional().describe("Maximum number of log lines to show per task (default: 50)"), }, async (args, _extra) => { return await airflowClient.tailDagRun(args.dag_id, args.dag_run_id, args.max_lines); } ); async function main() { // Create the HTTP transport with session support const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => crypto.randomUUID(), }); await server.connect(transport); // Create HTTP server const httpServer = createServer(async (req, res) => { // Enable CORS res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization'); if (req.method === 'OPTIONS') { res.writeHead(200); res.end(); return; } // Handle health check endpoint if (req.url === '/health') { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ status: 'healthy', service: 'mcp-server-airflow' })); return; } // Parse request body for POST requests let body: any = undefined; if (req.method === 'POST') { const chunks: Buffer[] = []; req.on('data', (chunk) => chunks.push(chunk)); await new Promise((resolve) => req.on('end', resolve)); const bodyText = Buffer.concat(chunks).toString(); try { body = bodyText ? JSON.parse(bodyText) : undefined; } catch (error) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid JSON' })); return; } } // Handle MCP requests try { await transport.handleRequest(req, res, body); } catch (error) { console.error('Error handling request:', error); if (!res.headersSent) { res.writeHead(500, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Internal server error' })); } } }); httpServer.listen(port, () => { console.error(`MCP Server for Airflow started on HTTP port ${port}`); console.error(`Health check: http://localhost:${port}/health`); console.error(`MCP endpoint: http://localhost:${port}/`); }); // Graceful shutdown process.on('SIGINT', () => { console.error('Shutting down HTTP server...'); httpServer.close(() => { process.exit(0); }); }); } main().catch((error) => { console.error("HTTP server failed to start:", error); process.exit(1); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tomnagengast/mcp-server-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server