Skip to main content
Glama
redis.ts13.4 kB
import Redis from 'ioredis'; import { config } from '../config/config'; import { logger } from '../utils/logger'; // Redis client instances export let redis: Redis; export let redisSubscriber: Redis; export let redisPublisher: Redis; /** * Initialize Redis connections */ export const initializeRedis = async (): Promise<void> => { try { // Main Redis client redis = new Redis(config.redis.url, { password: config.redis.password, db: config.redis.db, connectTimeout: config.redis.connectTimeout, commandTimeout: config.redis.commandTimeout, maxRetriesPerRequest: 3, lazyConnect: true, keepAlive: 30000, keyPrefix: 'mcp:', enableOfflineQueue: false, maxLoadingTimeout: 5000, enableReadyCheck: true, } as any); // Subscriber client (separate connection for pub/sub) redisSubscriber = new Redis(config.redis.url, { password: config.redis.password, db: config.redis.db, connectTimeout: config.redis.connectTimeout, lazyConnect: true, enableOfflineQueue: false, keyPrefix: 'mcp:', }); // Publisher client (separate connection for pub/sub) redisPublisher = new Redis(config.redis.url, { password: config.redis.password, db: config.redis.db, connectTimeout: config.redis.connectTimeout, lazyConnect: true, enableOfflineQueue: false, keyPrefix: 'mcp:', }); // Set up event listeners for main client redis.on('connect', () => { logger.info('Redis connected'); }); redis.on('ready', () => { logger.info('Redis ready for commands'); }); redis.on('error', (error) => { logger.error('Redis error', { error }); }); redis.on('close', () => { logger.warn('Redis connection closed'); }); redis.on('reconnecting', (delay) => { logger.info('Redis reconnecting', { delay }); }); redis.on('end', () => { logger.warn('Redis connection ended'); }); // Set up retry strategy redis.on('retryDelayOnFailure', (error, delay) => { logger.warn('Redis retry delay', { error: error.message, delay }); return Math.min(delay * 2, config.redis.maxRetryDelay); }); // Connect all clients await Promise.all([ redis.connect(), redisSubscriber.connect(), redisPublisher.connect(), ]); // Test connection with a simple command await redis.ping(); logger.info('Redis initialized successfully', { url: config.redis.url.replace(/\/\/.*@/, '//***@'), // Hide credentials in logs db: config.redis.db, connectTimeout: config.redis.connectTimeout, }); // Set up health monitoring setupRedisHealthCheck(); } catch (error) { logger.error('Failed to initialize Redis', { error }); throw new Error(`Redis initialization failed: ${error.message}`); } }; /** * Gracefully disconnect Redis connections */ export const disconnectRedis = async (): Promise<void> => { try { await Promise.all([ redis?.quit(), redisSubscriber?.quit(), redisPublisher?.quit(), ]); logger.info('Redis connections closed successfully'); } catch (error) { logger.error('Error closing Redis connections', { error }); } }; /** * Get Redis health status */ export const getRedisHealth = async (): Promise<{ status: 'healthy' | 'unhealthy'; latency: number; memory: any; connections: number; error?: string; }> => { try { const startTime = Date.now(); await redis.ping(); const endTime = Date.now(); // Get memory info const memoryInfo = await redis.memory('USAGE', 'test-key'); // Get connection info const info = await redis.info('clients'); const connections = parseInt(info.split('\r\n') .find(line => line.startsWith('connected_clients:')) ?.split(':')[1] || '0', 10); return { status: 'healthy', latency: endTime - startTime, memory: memoryInfo, connections, }; } catch (error) { logger.error('Redis health check failed', { error }); return { status: 'unhealthy', latency: -1, memory: null, connections: -1, error: error.message, }; } }; /** * Redis cache wrapper with automatic serialization */ export class RedisCache { private static defaultTTL = 3600; // 1 hour /** * Set a key-value pair with optional TTL */ static async set(key: string, value: any, ttl: number = this.defaultTTL): Promise<void> { try { const serialized = typeof value === 'string' ? value : JSON.stringify(value); await redis.setex(key, ttl, serialized); } catch (error) { logger.error('Redis set error', { error, key }); throw error; } } /** * Get a value by key with automatic deserialization */ static async get<T = any>(key: string): Promise<T | null> { try { const value = await redis.get(key); if (value === null) return null; try { return JSON.parse(value); } catch { return value as T; } } catch (error) { logger.error('Redis get error', { error, key }); throw error; } } /** * Delete a key */ static async del(key: string): Promise<number> { try { return await redis.del(key); } catch (error) { logger.error('Redis del error', { error, key }); throw error; } } /** * Check if key exists */ static async exists(key: string): Promise<boolean> { try { const result = await redis.exists(key); return result === 1; } catch (error) { logger.error('Redis exists error', { error, key }); throw error; } } /** * Set with expiration time */ static async expire(key: string, ttl: number): Promise<boolean> { try { const result = await redis.expire(key, ttl); return result === 1; } catch (error) { logger.error('Redis expire error', { error, key, ttl }); throw error; } } /** * Get remaining TTL */ static async ttl(key: string): Promise<number> { try { return await redis.ttl(key); } catch (error) { logger.error('Redis ttl error', { error, key }); throw error; } } /** * Increment counter */ static async incr(key: string, ttl?: number): Promise<number> { try { const result = await redis.incr(key); if (ttl && result === 1) { await redis.expire(key, ttl); } return result; } catch (error) { logger.error('Redis incr error', { error, key }); throw error; } } /** * Decrement counter */ static async decr(key: string): Promise<number> { try { return await redis.decr(key); } catch (error) { logger.error('Redis decr error', { error, key }); throw error; } } /** * Get multiple keys */ static async mget<T = any>(keys: string[]): Promise<(T | null)[]> { try { const values = await redis.mget(...keys); return values.map(value => { if (value === null) return null; try { return JSON.parse(value); } catch { return value as T; } }); } catch (error) { logger.error('Redis mget error', { error, keys }); throw error; } } /** * Set multiple key-value pairs */ static async mset(keyValues: Record<string, any>, ttl?: number): Promise<void> { try { const pipeline = redis.pipeline(); Object.entries(keyValues).forEach(([key, value]) => { const serialized = typeof value === 'string' ? value : JSON.stringify(value); if (ttl) { pipeline.setex(key, ttl, serialized); } else { pipeline.set(key, serialized); } }); await pipeline.exec(); } catch (error) { logger.error('Redis mset error', { error, keyCount: Object.keys(keyValues).length }); throw error; } } /** * Get keys by pattern */ static async keys(pattern: string): Promise<string[]> { try { return await redis.keys(pattern); } catch (error) { logger.error('Redis keys error', { error, pattern }); throw error; } } /** * Scan keys by pattern (memory efficient) */ static async scan(pattern: string, count: number = 100): Promise<string[]> { try { const keys: string[] = []; const stream = redis.scanStream({ match: pattern, count, }); return new Promise((resolve, reject) => { stream.on('data', (batch: string[]) => { keys.push(...batch); }); stream.on('end', () => { resolve(keys); }); stream.on('error', (error) => { reject(error); }); }); } catch (error) { logger.error('Redis scan error', { error, pattern }); throw error; } } } /** * Redis pub/sub utilities */ export class RedisPubSub { /** * Publish message to channel */ static async publish(channel: string, message: any): Promise<number> { try { const serialized = typeof message === 'string' ? message : JSON.stringify(message); return await redisPublisher.publish(channel, serialized); } catch (error) { logger.error('Redis publish error', { error, channel }); throw error; } } /** * Subscribe to channel */ static async subscribe(channel: string, callback: (message: any) => void): Promise<void> { try { await redisSubscriber.subscribe(channel); redisSubscriber.on('message', (receivedChannel: string, message: string) => { if (receivedChannel === channel) { try { const parsed = JSON.parse(message); callback(parsed); } catch { callback(message); } } }); } catch (error) { logger.error('Redis subscribe error', { error, channel }); throw error; } } /** * Unsubscribe from channel */ static async unsubscribe(channel: string): Promise<void> { try { await redisSubscriber.unsubscribe(channel); } catch (error) { logger.error('Redis unsubscribe error', { error, channel }); throw error; } } /** * Pattern subscribe */ static async psubscribe(pattern: string, callback: (channel: string, message: any) => void): Promise<void> { try { await redisSubscriber.psubscribe(pattern); redisSubscriber.on('pmessage', (receivedPattern: string, channel: string, message: string) => { if (receivedPattern === pattern) { try { const parsed = JSON.parse(message); callback(channel, parsed); } catch { callback(channel, message); } } }); } catch (error) { logger.error('Redis psubscribe error', { error, pattern }); throw error; } } } /** * Redis distributed lock implementation */ export class RedisLock { private key: string; private value: string; private ttl: number; constructor(key: string, ttl: number = 30) { this.key = `lock:${key}`; this.value = `${Date.now()}-${Math.random()}`; this.ttl = ttl; } /** * Acquire lock */ async acquire(): Promise<boolean> { try { const result = await redis.set(this.key, this.value, 'EX', this.ttl, 'NX'); return result === 'OK'; } catch (error) { logger.error('Redis lock acquire error', { error, key: this.key }); return false; } } /** * Release lock */ async release(): Promise<boolean> { try { const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end `; const result = await redis.eval(script, 1, this.key, this.value); return result === 1; } catch (error) { logger.error('Redis lock release error', { error, key: this.key }); return false; } } /** * Extend lock TTL */ async extend(newTtl: number = this.ttl): Promise<boolean> { try { const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("EXPIRE", KEYS[1], ARGV[2]) else return 0 end `; const result = await redis.eval(script, 1, this.key, this.value, newTtl); return result === 1; } catch (error) { logger.error('Redis lock extend error', { error, key: this.key }); return false; } } } /** * Setup Redis health check monitoring */ const setupRedisHealthCheck = (): void => { setInterval(async () => { try { const health = await getRedisHealth(); if (health.status === 'unhealthy') { logger.error('Redis health check failed', health); } else if (health.latency > 1000) { logger.warn('Redis high latency detected', { latency: health.latency }); } } catch (error) { logger.error('Redis health check error', { error }); } }, 30000); // Check every 30 seconds }; // Redis clients and classes are already exported above // Graceful shutdown handling process.on('SIGTERM', async () => { logger.info('Received SIGTERM, closing Redis connections'); await disconnectRedis(); }); process.on('SIGINT', async () => { logger.info('Received SIGINT, closing Redis connections'); await disconnectRedis(); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/perfecxion-ai/secure-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server