Skip to main content
Glama
tunnel-manager.jsโ€ข14.8 kB
/** * SSH Tunnel Manager * Manages SSH port forwarding and SOCKS proxy tunnels */ import { v4 as uuidv4 } from 'uuid'; import net from 'net'; import { logger } from './logger.js'; // Map to store active tunnels const tunnels = new Map(); // Tunnel types export const TUNNEL_TYPES = { LOCAL: 'local', // Local port forwarding (access remote service locally) REMOTE: 'remote', // Remote port forwarding (expose local service remotely) DYNAMIC: 'dynamic' // SOCKS proxy }; // Tunnel states export const TUNNEL_STATES = { CONNECTING: 'connecting', ACTIVE: 'active', RECONNECTING: 'reconnecting', FAILED: 'failed', CLOSED: 'closed' }; class SSHTunnel { constructor(id, serverName, ssh, config) { this.id = id; this.serverName = serverName; this.ssh = ssh; this.type = config.type; this.config = config; this.state = TUNNEL_STATES.CONNECTING; this.createdAt = new Date(); this.lastActivity = new Date(); this.connections = new Set(); this.server = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.stats = { bytesTransferred: 0, connectionsTotal: 0, connectionsActive: 0, errors: 0 }; } /** * Start the tunnel */ async start() { try { switch (this.type) { case TUNNEL_TYPES.LOCAL: await this.startLocalForwarding(); break; case TUNNEL_TYPES.REMOTE: await this.startRemoteForwarding(); break; case TUNNEL_TYPES.DYNAMIC: await this.startDynamicForwarding(); break; default: throw new Error(`Unknown tunnel type: ${this.type}`); } this.state = TUNNEL_STATES.ACTIVE; this.lastActivity = new Date(); logger.info(`SSH tunnel ${this.id} started`, { type: this.type, server: this.serverName, local: `${this.config.localHost}:${this.config.localPort}`, remote: this.type !== TUNNEL_TYPES.DYNAMIC ? `${this.config.remoteHost}:${this.config.remotePort}` : 'SOCKS' }); } catch (error) { this.state = TUNNEL_STATES.FAILED; logger.error(`Failed to start tunnel ${this.id}`, { error: error.message }); throw error; } } /** * Start local port forwarding */ async startLocalForwarding() { const { localHost, localPort, remoteHost, remotePort } = this.config; // Create local server this.server = net.createServer(async (localSocket) => { this.stats.connectionsTotal++; this.stats.connectionsActive++; this.connections.add(localSocket); this.lastActivity = new Date(); logger.debug(`New connection to tunnel ${this.id}`, { from: localSocket.remoteAddress }); try { // Forward to remote via SSH const stream = await this.ssh.forwardOut( localSocket.remoteAddress || '127.0.0.1', localSocket.remotePort || 0, remoteHost, remotePort ); // Pipe data between local and remote localSocket.pipe(stream).pipe(localSocket); // Track data transfer localSocket.on('data', (chunk) => { this.stats.bytesTransferred += chunk.length; this.lastActivity = new Date(); }); stream.on('data', (chunk) => { this.stats.bytesTransferred += chunk.length; this.lastActivity = new Date(); }); // Handle disconnection const cleanup = () => { this.stats.connectionsActive--; this.connections.delete(localSocket); localSocket.destroy(); stream.destroy(); }; localSocket.on('close', cleanup); localSocket.on('error', cleanup); stream.on('close', cleanup); stream.on('error', cleanup); } catch (error) { this.stats.errors++; logger.error('Tunnel forwarding error', { tunnel: this.id, error: error.message }); localSocket.destroy(); } }); // Start listening await new Promise((resolve, reject) => { this.server.listen(localPort, localHost, (err) => { if (err) reject(err); else resolve(); }); }); logger.info('Local forwarding established', { local: `${localHost}:${localPort}`, remote: `${remoteHost}:${remotePort}` }); } /** * Start remote port forwarding */ async startRemoteForwarding() { const { localHost, localPort, remoteHost, remotePort } = this.config; // Request remote forwarding from SSH server await new Promise((resolve, reject) => { this.ssh.forwardIn(remoteHost, remotePort, (err) => { if (err) reject(err); else resolve(); }); }); // Handle incoming connections from remote this.ssh.on('tcp connection', (info, accept, reject) => { if (info.destPort !== remotePort) return; this.stats.connectionsTotal++; this.stats.connectionsActive++; this.lastActivity = new Date(); const remoteSocket = accept(); // Connect to local service const localSocket = net.connect(localPort, localHost, () => { // Pipe data between remote and local remoteSocket.pipe(localSocket).pipe(remoteSocket); // Track data transfer remoteSocket.on('data', (chunk) => { this.stats.bytesTransferred += chunk.length; this.lastActivity = new Date(); }); localSocket.on('data', (chunk) => { this.stats.bytesTransferred += chunk.length; this.lastActivity = new Date(); }); }); // Handle errors and cleanup const cleanup = () => { this.stats.connectionsActive--; remoteSocket.destroy(); localSocket.destroy(); }; localSocket.on('error', (err) => { this.stats.errors++; logger.error('Remote forwarding error', { tunnel: this.id, error: err.message }); cleanup(); }); remoteSocket.on('close', cleanup); localSocket.on('close', cleanup); }); logger.info('Remote forwarding established', { local: `${localHost}:${localPort}`, remote: `${remoteHost}:${remotePort}` }); } /** * Start dynamic port forwarding (SOCKS proxy) */ async startDynamicForwarding() { const { localHost, localPort } = this.config; // Create SOCKS server this.server = net.createServer(async (localSocket) => { this.stats.connectionsTotal++; this.stats.connectionsActive++; this.connections.add(localSocket); this.lastActivity = new Date(); let targetHost = null; let targetPort = null; let stream = null; // Simple SOCKS5 implementation (basic) localSocket.once('data', async (chunk) => { // Parse SOCKS request (simplified) if (chunk[0] === 0x05) { // SOCKS5 // Send auth method response localSocket.write(Buffer.from([0x05, 0x00])); localSocket.once('data', async (chunk2) => { // Parse connection request if (chunk2[0] === 0x05 && chunk2[1] === 0x01) { // CONNECT const addrType = chunk2[3]; let offset = 4; if (addrType === 0x01) { // IPv4 targetHost = `${chunk2[4]}.${chunk2[5]}.${chunk2[6]}.${chunk2[7]}`; offset = 8; } else if (addrType === 0x03) { // Domain const domainLen = chunk2[4]; targetHost = chunk2.slice(5, 5 + domainLen).toString(); offset = 5 + domainLen; } targetPort = (chunk2[offset] << 8) | chunk2[offset + 1]; try { // Create SSH forwarding stream stream = await this.ssh.forwardOut( '127.0.0.1', 0, targetHost, targetPort ); // Send success response const response = Buffer.from([ 0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, // Bind address (0.0.0.0) 0, 0 // Bind port ]); localSocket.write(response); // Pipe data localSocket.pipe(stream).pipe(localSocket); // Track data localSocket.on('data', (chunk) => { this.stats.bytesTransferred += chunk.length; this.lastActivity = new Date(); }); stream.on('data', (chunk) => { this.stats.bytesTransferred += chunk.length; this.lastActivity = new Date(); }); } catch (error) { // Send error response const response = Buffer.from([ 0x05, 0x01, 0x00, 0x01, 0, 0, 0, 0, 0, 0 ]); localSocket.write(response); localSocket.destroy(); this.stats.errors++; } } }); } else { // Not SOCKS5, close connection localSocket.destroy(); } }); // Cleanup on disconnect localSocket.on('close', () => { this.stats.connectionsActive--; this.connections.delete(localSocket); if (stream) stream.destroy(); }); localSocket.on('error', () => { this.stats.errors++; this.stats.connectionsActive--; this.connections.delete(localSocket); if (stream) stream.destroy(); }); }); // Start listening await new Promise((resolve, reject) => { this.server.listen(localPort, localHost, (err) => { if (err) reject(err); else resolve(); }); }); logger.info('SOCKS proxy established', { local: `${localHost}:${localPort}` }); } /** * Get tunnel information */ getInfo() { return { id: this.id, server: this.serverName, type: this.type, state: this.state, config: { localHost: this.config.localHost, localPort: this.config.localPort, remoteHost: this.config.remoteHost, remotePort: this.config.remotePort }, stats: this.stats, created: this.createdAt, lastActivity: this.lastActivity, activeConnections: this.connections.size }; } /** * Close the tunnel */ close() { logger.info(`Closing tunnel ${this.id}`); this.state = TUNNEL_STATES.CLOSED; // Close all active connections for (const conn of this.connections) { conn.destroy(); } this.connections.clear(); // Close server if (this.server) { this.server.close(); this.server = null; } // Cancel remote forwarding if needed if (this.type === TUNNEL_TYPES.REMOTE) { this.ssh.unforwardIn(this.config.remoteHost, this.config.remotePort); } tunnels.delete(this.id); } /** * Reconnect tunnel */ async reconnect() { if (this.reconnectAttempts >= this.maxReconnectAttempts) { logger.error(`Max reconnect attempts reached for tunnel ${this.id}`); this.state = TUNNEL_STATES.FAILED; return false; } this.reconnectAttempts++; this.state = TUNNEL_STATES.RECONNECTING; logger.info(`Reconnecting tunnel ${this.id}`, { attempt: this.reconnectAttempts }); try { await this.start(); this.reconnectAttempts = 0; return true; } catch (error) { logger.error(`Reconnect failed for tunnel ${this.id}`, { error: error.message }); // Retry with exponential backoff const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); setTimeout(() => this.reconnect(), delay); return false; } } } /** * Create a new SSH tunnel */ export async function createTunnel(serverName, ssh, config) { const tunnelId = `tunnel_${Date.now()}_${uuidv4().substring(0, 8)}`; // Validate config if (!config.type || !Object.values(TUNNEL_TYPES).includes(config.type)) { throw new Error(`Invalid tunnel type: ${config.type}`); } // Set defaults config.localHost = config.localHost || '127.0.0.1'; if (config.type !== TUNNEL_TYPES.DYNAMIC) { if (!config.remoteHost || !config.remotePort) { throw new Error('Remote host and port required for port forwarding'); } } if (!config.localPort) { throw new Error('Local port required'); } const tunnel = new SSHTunnel(tunnelId, serverName, ssh, config); tunnels.set(tunnelId, tunnel); try { await tunnel.start(); logger.info('SSH tunnel created', { id: tunnelId, type: config.type, server: serverName }); return tunnel; } catch (error) { tunnels.delete(tunnelId); throw error; } } /** * Get an existing tunnel */ export function getTunnel(tunnelId) { const tunnel = tunnels.get(tunnelId); if (!tunnel) { throw new Error(`Tunnel ${tunnelId} not found`); } return tunnel; } /** * List all active tunnels */ export function listTunnels(serverName = null) { const activeTunnels = []; for (const [id, tunnel] of tunnels.entries()) { if (tunnel.state !== TUNNEL_STATES.CLOSED) { if (!serverName || tunnel.serverName === serverName) { activeTunnels.push(tunnel.getInfo()); } } } return activeTunnels; } /** * Close a tunnel */ export function closeTunnel(tunnelId) { const tunnel = tunnels.get(tunnelId); if (!tunnel) { throw new Error(`Tunnel ${tunnelId} not found`); } tunnel.close(); return true; } /** * Close all tunnels for a server */ export function closeServerTunnels(serverName) { let closedCount = 0; for (const [id, tunnel] of tunnels.entries()) { if (tunnel.serverName === serverName) { tunnel.close(); closedCount++; } } return closedCount; } /** * Monitor tunnel health */ export function monitorTunnels() { const now = Date.now(); const healthTimeout = 60 * 1000; // 1 minute for (const [id, tunnel] of tunnels.entries()) { if (tunnel.state === TUNNEL_STATES.ACTIVE) { const idle = now - tunnel.lastActivity.getTime(); // Check if tunnel is still healthy if (idle > healthTimeout && tunnel.connections.size === 0) { logger.debug(`Tunnel ${id} idle for ${idle}ms`); } // Auto-reconnect failed tunnels if (tunnel.state === TUNNEL_STATES.FAILED) { tunnel.reconnect(); } } } } // Monitor tunnels periodically setInterval(monitorTunnels, 30 * 1000); // Every 30 seconds export default { createTunnel, getTunnel, listTunnels, closeTunnel, closeServerTunnels, TUNNEL_TYPES, TUNNEL_STATES };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bvisible/mcp-ssh-manager'

If you have feedback or need assistance with the MCP directory API, please join our Discord server