Skip to main content
Glama
hl7-client-pool.ts9.73 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { ILogger } from '@medplum/core'; import { normalizeErrorString } from '@medplum/core'; import { DEFAULT_ENCODING } from '@medplum/hl7'; import type { ChannelStats } from './channel-stats-tracker'; import { calculateRttStats } from './channel-stats-tracker'; import { CLIENT_RELEASE_COUNTDOWN_MS } from './constants'; import { EnhancedHl7Client } from './enhanced-hl7-client'; import type { HeartbeatEmitter } from './types'; export interface Hl7ClientPoolOptions { host: string; port: number; encoding?: string; keepAlive: boolean; maxClients: number; log: ILogger; heartbeatEmitter: HeartbeatEmitter; } /** * Manages a pool of HL7 clients for a single remote host. * * In keepAlive mode, connections are reused up to maxClients limit. * In non-keepAlive mode, tracks outstanding connections and enforces the limit. */ export class Hl7ClientPool { readonly host: string; readonly port: number; readonly encoding?: string; private readonly keepAlive: boolean; private maxClients: number; private readonly log: ILogger; private readonly clients: EnhancedHl7Client[] = []; private readonly lastUsedTimestamps = new WeakMap<EnhancedHl7Client, number>(); // WeakMap allows entries to be GC'd once key gets GC'd private closingPromise: Promise<void> | undefined; private nextClientIdx: number = 0; private readonly heartbeatEmitter: HeartbeatEmitter; private trackingStats = false; private gcListener: (() => void) | undefined; constructor(options: Hl7ClientPoolOptions) { this.host = options.host; this.port = options.port; this.encoding = options.encoding; this.keepAlive = options.keepAlive; this.maxClients = options.maxClients; this.log = options.log; this.heartbeatEmitter = options.heartbeatEmitter; this.startAutoClientGc(); } /** * Gets a client for sending a message. * In keepAlive mode, reuses or creates a connection up to the limit. * In non-keepAlive mode, creates a temporary connection if under the limit. * * @returns Promise that resolves with an HL7 client */ getClient(): EnhancedHl7Client { if (this.closingPromise) { throw new Error('Cannot get new client, pool is closed'); } return this.getNextClient(); } private closeAndRemoveClient(client: EnhancedHl7Client): void { this.log.info( `Closing client for remote 'mllp://${client.host}:${client.port}?encoding=${client.encoding ?? DEFAULT_ENCODING}' and removing it from the pool...` ); this.removeClient(client); client.close().catch((err: Error) => { this.log.error('Error while closing and removing client', err); }); } /** * Releases a client back to the pool. * In keepAlive mode, marks the client as available. * In non-keepAlive mode, removes the client from tracking. * * @param client - The client to release. * @param forceClose - Optional boolean on whether to force the client to close its connect and be removed from the pool. Defaults to `false`. */ releaseClient(client: EnhancedHl7Client, forceClose = false): void { // If forcing the connection closed // Or if keepAlive is off and connection is undefined // We should close the client and remove it from the pool immediately if (forceClose || (!this.keepAlive && client.connection === undefined)) { this.closeAndRemoveClient(client); return; } // We should track the last used time for non-keepAlive clients if (!this.keepAlive) { this.lastUsedTimestamps.set(client, Date.now()); } } /** * Runs the Hl7Client garbage collection, when not in `keepAlive` mode. * * Clients that have not been used in `CLIENT_RELEASE_COUNTDOWN_MS` milliseconds (10 secs) are closed. */ runClientGc(): void { if (this.keepAlive) { return; } for (const client of this.clients) { // If the last time the client was used was more than CLIENT_RELEASE_COUNTDOWN_MS milliseconds ago, // and there are no pending messages, call closeAndRemoveClient if ( (this.lastUsedTimestamps.get(client) as number) + CLIENT_RELEASE_COUNTDOWN_MS <= Date.now() && !client.connection?.getPendingMessageCount() ) { this.closeAndRemoveClient(client); } } } /** * Starts the automatic Hl7Client garbage collection, when not in `keepAlive` mode. * * Clients that have not been used in `CLIENT_RELEASE_COUNTDOWN_MS` milliseconds (10 secs) are closed automatically. */ startAutoClientGc(): void { if (this.gcListener || this.keepAlive) { return; } const gcListener = (): void => { this.runClientGc(); }; this.heartbeatEmitter.addEventListener('heartbeat', gcListener); this.gcListener = gcListener; } /** * Stops the automatic Hl7Client garbage collection. * * No-ops when GC is not active or if the pool is in `keepAlive` mode. */ stopAutoClientGc(): void { if (!this.gcListener) { return; } this.heartbeatEmitter.removeEventListener('heartbeat', this.gcListener); this.gcListener = undefined; } /** * Removes a client from the pool when it closes or errors. * * @param client - The client to remove */ removeClient(client: EnhancedHl7Client): void { const clientIdx = this.clients.indexOf(client); if (clientIdx === -1) { return; } this.clients.splice(clientIdx, 1); } /** * Closes all clients in the pool. */ async closeAll(): Promise<void> { // If we are already closing the pool, return the existing closing promise if (this.closingPromise) { await this.closingPromise; return; } this.stopAutoClientGc(); const closePromises = this.clients.map((client) => client.close()); this.closingPromise = new Promise<void>((resolve, reject) => { Promise.all(closePromises) .then(() => resolve()) .catch(reject); }); // Remove any clients that didn't get cleaned up by close listener // This is especially for when this method is called in the same tick as a client is created for (const client of this.clients) { this.removeClient(client); } // We wait for the closing promise to resolve await this.closingPromise; } /** * Gets the number of clients currently in the pool. * @returns the number of clients in the pool. */ size(): number { return this.clients.length; } /** * Gets all clients in the pool. * @returns An array of the raw `EnhancedHl7Client`s. */ getClients(): EnhancedHl7Client[] { return this.clients; } /** * Gets a client for keepAlive mode. * Reuses an available client or creates a new one up to the limit. * @returns An `EnhancedHl7Client`. */ private getNextClient(): EnhancedHl7Client { // If we're under the limit, create a new client if (this.clients.length < this.maxClients) { const client = this.createAndTrackClient(); return client; } // If we can't create a new client, try to get the next one // We use a naive round-robin strategy for getting the next client const client = this.clients[this.nextClientIdx]; this.nextClientIdx = (this.nextClientIdx + 1) % this.clients.length; this.lastUsedTimestamps.set(client, Date.now()); return client; } /** * Creates a new client and adds it to the pool. * @returns a new `EnhancedHl7Client`. */ private createAndTrackClient(): EnhancedHl7Client { const client = new EnhancedHl7Client({ host: this.host, port: this.port, encoding: this.encoding, keepAlive: this.keepAlive, log: this.log, }); // If GC is running, we should add the current timestamp as last used for this client if (this.gcListener) { this.lastUsedTimestamps.set(client, Date.now()); } if (this.trackingStats) { client.startTrackingStats({ heartbeatEmitter: this.heartbeatEmitter }); } this.clients.push(client); // Set up event listeners client.addEventListener('close', () => { this.removeClient(client); if (this.keepAlive) { this.log.info(`Persistent connection to remote 'mllp://${this.host}:${this.port}' closed`); } }); client.addEventListener('error', (event) => { this.closeAndRemoveClient(client); if (this.keepAlive) { this.log.error( `Persistent connection to remote 'mllp://${this.host}:${this.port}' encountered error: '${normalizeErrorString(event.error)}' - Closing connection...` ); } }); return client; } setMaxClients(maxClients: number): void { this.maxClients = maxClients; } getMaxClients(): number { return this.maxClients; } startTrackingStats(): void { this.trackingStats = true; for (const client of this.clients) { client.startTrackingStats({ heartbeatEmitter: this.heartbeatEmitter }); } } stopTrackingStats(): void { this.trackingStats = false; for (const client of this.clients) { client.stopTrackingStats(); } } isTrackingStats(): boolean { return this.trackingStats; } getPoolStats(): ChannelStats | undefined { if (!this.trackingStats) { return undefined; } const allRttSamples = this.clients.flatMap((client) => client.stats?.getRttSamples() ?? []); let totalPendingCount = 0; for (const client of this.clients) { totalPendingCount += client.stats?.getPendingCount() ?? 0; } return { rtt: calculateRttStats(allRttSamples, totalPendingCount) }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server