Skip to main content
Glama

KatCoder MySQL MCP Server

by berthojoris
database.ts15.9 kB
import * as mysql from "mysql2/promise"; import { DatabaseConfig } from "./server.js"; import * as winston from "winston"; const logger = winston.createLogger({ level: "info", format: winston.format.combine( winston.format.timestamp(), winston.format.errors({ stack: true }), winston.format.json(), ), transports: [ new winston.transports.Console({ format: winston.format.combine( winston.format.colorize(), winston.format.simple(), ), }), ], }); export class DatabaseManager { private pool: mysql.Pool; private config: DatabaseConfig; private connectionLimit: number; private maxRetries: number; private retryDelay: number; constructor(config: DatabaseConfig) { this.config = config; this.connectionLimit = config.connectionLimit || 20; this.maxRetries = config.maxRetries || 3; this.retryDelay = config.retryDelay || 1000; this.pool = mysql.createPool({ host: config.host, port: config.port, user: config.user, password: config.password, database: config.database, connectionLimit: config.connectionLimit || 20, acquireTimeout: config.acquireTimeout || 120000, timeout: config.timeout || 120000, connectTimeout: config.connectTimeout || 30000, waitForConnections: true, queueLimit: config.queueLimit !== undefined ? config.queueLimit : 0, enableKeepAlive: true, keepAliveInitialDelay: 0, multipleStatements: false, // Security: prevent SQL injection supportBigNumbers: true, bigNumberStrings: true, dateStrings: true, trace: false, // Security: disable stack traces in production ssl: this.getSSLOptions(), // Additional settings for AI agents idleTimeout: 60000, // 60 seconds idle timeout maxIdle: 10, // Maximum idle connections } as mysql.PoolOptions); this.setupConnectionErrorHandling(); } private getSSLOptions() { // For production, you might want to add SSL options // This is a basic implementation - adjust based on your security requirements return undefined; } private setupConnectionErrorHandling(): void { this.pool.on("connection", (connection) => { logger.debug("New database connection established"); connection.on("error", (err) => { logger.error("Database connection error:", err); }); connection.on("close", () => { logger.debug("Database connection closed"); }); }); this.pool.on("acquire", (connection) => { logger.debug("Connection acquired from pool"); }); this.pool.on("release", (connection) => { logger.debug("Connection released to pool"); }); } async testConnection(): Promise<boolean> { let lastError: any; for (let attempt = 1; attempt <= 3; attempt++) { try { const connection = await this.pool.getConnection(); await connection.ping(); connection.release(); logger.info( `Database connection test successful (attempt ${attempt}/3)`, ); return true; } catch (error: any) { lastError = error; logger.warn(`Database connection test failed (attempt ${attempt}/3):`, { error: error.message, code: error.code, }); if (attempt < 3) { await this.sleep(1000 * attempt); } } } logger.error( "Database connection test failed after 3 attempts:", lastError, ); return false; } async warmupPool(): Promise<void> { // Pre-warm the connection pool for better initial performance const warmupCount = Math.min(5, this.connectionLimit); logger.info( `Warming up connection pool with ${warmupCount} connections...`, ); const connections: mysql.PoolConnection[] = []; try { for (let i = 0; i < warmupCount; i++) { const conn = await this.pool.getConnection(); connections.push(conn); } logger.info(`Connection pool warmed up with ${warmupCount} connections`); } catch (error) { logger.warn("Failed to warm up connection pool:", error); } finally { // Release all connections back to the pool connections.forEach((conn) => conn.release()); } } private async sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } private isRetryableError(error: any): boolean { // Errors that can be retried const retryableErrorCodes = [ "ECONNREFUSED", "ETIMEDOUT", "ENOTFOUND", "ECONNRESET", "EPIPE", "ER_LOCK_WAIT_TIMEOUT", "ER_LOCK_DEADLOCK", "PROTOCOL_CONNECTION_LOST", "PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR", ]; return ( retryableErrorCodes.includes(error.code) || error.errno === "ETIMEDOUT" || error.message?.includes("timeout") || error.message?.includes("Connection lost") ); } async query(sql: string, params?: any[]): Promise<any> { let lastError: any; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { try { logger.debug( `Executing query (attempt ${attempt}/${this.maxRetries}): ${sql}`, { params }, ); const [rows] = await this.pool.execute(sql, params); if (attempt > 1) { logger.info( `Query succeeded on attempt ${attempt}/${this.maxRetries}`, ); } return rows; } catch (error: any) { lastError = error; if (attempt < this.maxRetries && this.isRetryableError(error)) { const delay = this.retryDelay * attempt; // Exponential backoff logger.warn( `Query failed (attempt ${attempt}/${this.maxRetries}), retrying in ${delay}ms...`, { error: error.message, code: error.code, }, ); await this.sleep(delay); } else { logger.error( `Query execution failed after ${attempt} attempts: ${sql}`, { error, params, finalAttempt: attempt === this.maxRetries, }, ); break; } } } throw this.formatDatabaseError(lastError); } async transaction( queries: Array<{ sql: string; params?: any[] }>, ): Promise<any[]> { let lastError: any; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { const connection = await this.pool.getConnection(); try { await connection.beginTransaction(); logger.info( `Transaction started (attempt ${attempt}/${this.maxRetries})`, ); const results = []; for (const { sql, params } of queries) { logger.debug(`Transaction query: ${sql}`, { params }); const [result] = await connection.execute(sql, params); results.push(result); } await connection.commit(); logger.info("Transaction committed successfully"); if (attempt > 1) { logger.info( `Transaction succeeded on attempt ${attempt}/${this.maxRetries}`, ); } return results; } catch (error: any) { lastError = error; try { await connection.rollback(); logger.warn("Transaction rolled back"); } catch (rollbackError) { logger.error("Failed to rollback transaction:", rollbackError); } if (attempt < this.maxRetries && this.isRetryableError(error)) { const delay = this.retryDelay * attempt; logger.warn( `Transaction failed (attempt ${attempt}/${this.maxRetries}), retrying in ${delay}ms...`, { error: error.message, code: error.code, }, ); await this.sleep(delay); } else { logger.error(`Transaction failed after ${attempt} attempts`, { error, finalAttempt: attempt === this.maxRetries, }); break; } } finally { connection.release(); logger.debug("Transaction connection released"); } } throw this.formatDatabaseError(lastError); } async schemaTransaction( operations: Array<{ sql: string; params?: any[]; description: string }>, ): Promise<any[]> { const connection = await this.pool.getConnection(); try { await connection.beginTransaction(); logger.info( `Schema transaction started with ${operations.length} operations`, ); const results = []; for (const { sql, params, description } of operations) { logger.info(`Executing schema operation: ${description}`); logger.debug(`Schema SQL: ${sql}`, { params }); const [result] = await connection.execute(sql, params); results.push({ description, result, affectedRows: (result as any).affectedRows || 0, }); } await connection.commit(); logger.info("Schema transaction committed successfully"); return results; } catch (error) { await connection.rollback(); logger.error("Schema transaction rolled back due to error:", error); throw this.formatDatabaseError(error); } finally { connection.release(); logger.debug("Schema transaction connection released"); } } async getConnection(): Promise<mysql.PoolConnection> { return await this.pool.getConnection(); } async releaseConnection(connection: mysql.PoolConnection): Promise<void> { connection.release(); } async bulkInsert(table: string, data: any[]): Promise<any> { try { if (!data || !Array.isArray(data) || data.length === 0) { throw new Error("Data must be a non-empty array"); } // Validate all records have the same structure const firstRecord = data[0]; if (!firstRecord || typeof firstRecord !== "object") { throw new Error("Each record must be a valid object"); } const columns = Object.keys(firstRecord); if (columns.length === 0) { throw new Error("Records must contain at least one column"); } // Validate all records have the same columns for (let i = 1; i < data.length; i++) { const record = data[i]; if (!record || typeof record !== "object") { throw new Error(`Record at index ${i} is not a valid object`); } const recordColumns = Object.keys(record); if ( recordColumns.length !== columns.length || !columns.every((col) => recordColumns.includes(col)) ) { throw new Error( `Record at index ${i} has different structure than the first record`, ); } } // Build SQL query using VALUES clause for bulk insert // Note: Column names should already be validated by the caller (implementation.ts) // But we still escape them with backticks for safety const columnNames = columns .map((col) => { // Basic sanitization - remove any backticks to prevent injection const sanitized = col.replace(/`/g, ""); return `\`${sanitized}\``; }) .join(", "); const placeholders = columns.map(() => "?").join(", "); const valuesPlaceholders = data.map(() => `(${placeholders})`).join(", "); const sql = `INSERT INTO \`${table}\` (${columnNames}) VALUES ${valuesPlaceholders}`; // Flatten all values const allValues: any[] = []; for (const record of data) { for (const column of columns) { allValues.push(record[column]); } } logger.debug(`Executing bulk insert: ${sql}`, { table, recordCount: data.length, columnCount: columns.length, }); const [result] = await this.pool.execute(sql, allValues); return { affectedRows: (result as any).affectedRows, insertId: (result as any).insertId, recordCount: data.length, message: `Successfully inserted ${data.length} records into ${table}`, }; } catch (error) { logger.error(`Bulk insert failed for table ${table}:`, error); throw this.formatDatabaseError(error); } } async close(): Promise<void> { try { await this.pool.end(); logger.info("Database pool closed successfully"); } catch (error) { logger.error("Error closing database pool:", error); throw error; } } private formatDatabaseError(error: any): Error { // Provide clear, actionable error messages for AI agents if (error.code === "ER_ACCESS_DENIED_ERROR") { return new Error( `Database access denied. Check your MySQL credentials (user: ${this.config.user}, host: ${this.config.host}:${this.config.port}). Verify the user has proper permissions.`, ); } else if (error.code === "ER_BAD_DB_ERROR") { return new Error( `Database '${this.config.database}' does not exist on ${this.config.host}:${this.config.port}. Create the database or verify the connection string.`, ); } else if (error.code === "ECONNREFUSED") { return new Error( `Connection refused to MySQL server at ${this.config.host}:${this.config.port}. Ensure MySQL is running and accessible at this address.`, ); } else if (error.code === "ETIMEDOUT" || error.errno === "ETIMEDOUT") { return new Error( `Connection timeout to MySQL server at ${this.config.host}:${this.config.port}. Check network connectivity and firewall settings.`, ); } else if (error.code === "ER_NO_SUCH_TABLE") { return new Error( `Table does not exist in database '${this.config.database}'. Use the 'list' tool to see available tables.`, ); } else if (error.code === "ER_DUP_ENTRY") { const match = error.message?.match( /Duplicate entry '([^']+)' for key '([^']+)'/, ); if (match) { return new Error( `Duplicate entry '${match[1]}' for key '${match[2]}'. A record with this unique value already exists.`, ); } return new Error( "Duplicate entry. A record with this unique value already exists.", ); } else if (error.code === "ER_PARSE_ERROR") { return new Error( `SQL syntax error: ${error.message}. Check your SQL syntax.`, ); } else if (error.code === "ER_BAD_FIELD_ERROR") { return new Error( `Unknown column in query: ${error.message}. Use 'describe_table' to see available columns.`, ); } else if (error.code === "ER_LOCK_WAIT_TIMEOUT") { return new Error( "Lock wait timeout exceeded. The table is locked by another transaction. Operation was retried but still failed.", ); } else if (error.code === "ER_LOCK_DEADLOCK") { return new Error( "Deadlock detected. The transaction was rolled back. You can retry the operation.", ); } else if (error.code === "PROTOCOL_CONNECTION_LOST") { return new Error( "MySQL connection was lost. This may be due to network issues or MySQL server restart. The operation has been retried.", ); } else if (error.code === "ENOTFOUND") { return new Error( `MySQL host '${this.config.host}' not found. Check the hostname in your connection string.`, ); } // Include error code for debugging const errorCode = error.code ? ` (Error code: ${error.code})` : ""; return new Error(`Database error: ${error.message}${errorCode}`); } getPoolStatus(): { connections: number; busyConnections: number } { return { connections: this.connectionLimit, busyConnections: 0, // Simplified for now - MySQL2 doesn't expose this directly }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/berthojoris/katcoder-mysql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server