Skip to main content
Glama
http.ts10.3 kB
import { createServer } from "http"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { parse } from "url"; import { randomUUID } from "node:crypto"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import { createServerInstance } from "../server/server.js"; import { ServerConfig } from "../server/middleware.js"; import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { IncomingMessage } from "node:http"; // Session management - persistent storage by session ID const transports: Record<string, StreamableHTTPServerTransport> = {}; const servers: Record<string, Server> = {}; const sessionClientInfo: Record<string, { name?: string; userAgent?: string }> = {}; /** * Get information about active sessions (for debugging/monitoring) */ function getSessionInfo() { return { activeSessions: Object.keys(transports).length, sessionIds: Object.keys(transports), clientInfo: { ...sessionClientInfo }, }; } /** * Parse request body as JSON */ function parseRequestBody(req: IncomingMessage): Promise<any> { return new Promise((resolve, reject) => { let body = ""; req.on("data", (chunk) => { body += chunk.toString(); }); req.on("end", () => { try { if (body) { resolve(JSON.parse(body)); } else { resolve({}); } } catch (error) { console.error("Error parsing request body:", error); reject(new Error("Invalid JSON in request body")); } }); req.on("error", (error) => { reject(error); }); }); } /** * Clean up a specific session by ID */ export function cleanupSession(sessionId: string): boolean { if (!transports[sessionId]) { return false; } const transport = transports[sessionId]; delete transports[sessionId]; delete servers[sessionId]; delete sessionClientInfo[sessionId]; // Close transport if it has a close method if (transport && typeof transport.close === "function") { transport.close(); } return true; } /** * Start the server with HTTP-based transports (streamable-http) */ export async function startHttpServer(config: ServerConfig): Promise<void> { try { // Get initial port from config const initialPort = config.port; // Keep track of which port we end up using let actualPort = initialPort; const httpServer = createServer(async (req, res) => { const url = parse(req.url || "").pathname; // Set CORS headers for all responses res.setHeader("Access-Control-Allow-Origin", "*"); res.setHeader("Access-Control-Allow-Methods", "GET,POST,OPTIONS,DELETE"); res.setHeader( "Access-Control-Allow-Headers", "Content-Type, MCP-Session-Id, mcp-session-id, MCP-Protocol-Version" ); res.setHeader("Access-Control-Expose-Headers", "MCP-Session-Id"); // Handle preflight OPTIONS requests if (req.method === "OPTIONS") { res.writeHead(200); res.end(); return; } try { if (url === "/mcp") { if (req.method === "POST") { // Parse request body for POST requests let requestBody = {}; try { requestBody = await parseRequestBody(req); } catch (error) { res.writeHead(400, { "Content-Type": "application/json" }); res.end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32700, message: "Parse error", }, id: null, }) ); return; } // Check for existing session ID const sessionId = req.headers["mcp-session-id"] as | string | undefined; let transport: StreamableHTTPServerTransport; if (sessionId && transports[sessionId]) { // Reuse existing transport and server transport = transports[sessionId]; } else if (!sessionId && isInitializeRequest(requestBody)) { // New initialization request - create new session try { const newSessionId = randomUUID(); transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => newSessionId, }); // Create new server instance const server = createServerInstance(config); // Store session data transports[newSessionId] = transport; servers[newSessionId] = server; sessionClientInfo[newSessionId] = { name: (requestBody as any)?.params?.clientInfo?.name, userAgent: req.headers["user-agent"], }; // Clean up session when transport closes transport.onclose = () => { delete transports[newSessionId]; delete servers[newSessionId]; delete sessionClientInfo[newSessionId]; }; // Connect server to transport await server.connect(transport); } catch (error) { console.error("Error initializing session:", error); res.writeHead(500, { "Content-Type": "application/json" }); res.end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: `Failed to initialize server session: ${ error instanceof Error ? error.message : String(error) }`, }, id: null, }) ); return; } } else { // Invalid request - missing session ID or not an initialize request res.writeHead(400, { "Content-Type": "application/json" }); res.end( JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: sessionId ? "Session not found" : "Bad Request: No valid session ID provided", }, id: null, }) ); return; } // Handle the request await transport.handleRequest(req, res, requestBody); } else if (req.method === "DELETE") { // Handle session termination const sessionId = req.headers["mcp-session-id"] as | string | undefined; if (!sessionId) { res.writeHead(400, { "Content-Type": "text/plain" }); res.end("Missing session ID"); return; } const success = cleanupSession(sessionId); if (!success) { res.writeHead(404, { "Content-Type": "text/plain" }); res.end("Session not found"); return; } res.writeHead(200, { "Content-Type": "text/plain" }); res.end("Session terminated"); } else { res.writeHead(405, { "Content-Type": "text/plain" }); res.end("Method not allowed"); } } else if (url === "/ping") { res.writeHead(200, { "Content-Type": "text/plain" }); res.end("pong"); } else if (url === "/sessions" && req.method === "GET") { // Return session information for monitoring res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(getSessionInfo(), null, 2)); } else { res.writeHead(404); res.end("Not found"); } } catch (error) { console.error("Error handling request:", error); if (!res.headersSent) { res.writeHead(500); res.end("Internal Server Error"); } } }); // Function to find available port and start server const findAvailablePort = async ( startPort: number, maxAttempts = 10 ): Promise<number> => { for (let port = startPort; port < startPort + maxAttempts; port++) { try { await new Promise<void>((resolve, reject) => { const testServer = createServer(); testServer.once("error", (err: NodeJS.ErrnoException) => { if (err.code === "EADDRINUSE") { reject(err); } else { reject(err); } }); testServer.listen(port, () => { testServer.close(() => resolve()); }); }); return port; // Port is available } catch (err: any) { if (err.code === "EADDRINUSE") { console.warn(`Port ${port} is in use, trying port ${port + 1}...`); continue; } else { throw err; } } } throw new Error( `No available ports found in range ${startPort}-${startPort + maxAttempts - 1}` ); }; // Find available port first, then start the server once const availablePort = await findAvailablePort(initialPort); actualPort = availablePort; await new Promise<void>((resolve, reject) => { httpServer.once("error", reject); httpServer.listen(availablePort, () => { console.error( `Docfork MCP Server running on ${config.transport.toUpperCase()}:` ); console.error(` • HTTP endpoint: http://localhost:${actualPort}/mcp`); console.error(` • Health check: http://localhost:${actualPort}/ping`); console.error( ` • Session info: http://localhost:${actualPort}/sessions` ); resolve(); }); }); } catch (error) { console.error("Failed to start HTTP server:", error); throw new Error( `Failed to start HTTP server: ${ error instanceof Error ? error.message : String(error) }` ); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/docfork/docfork-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server