import { createLogger } from './logger'; import type { BatchedEvent } from './types'; export class BatchQueue { private queue: BatchedEvent[] = []; private batchSize: number; private batchInterval: number; private flushCallback: (events: BatchedEvent[]) => Promise; private intervalId?: ReturnType; private log; constructor( batchSize: number, batchInterval: number, flushCallback: (events: BatchedEvent[]) => Promise, debugLogging = false, ) { this.batchSize = batchSize; this.batchInterval = batchInterval; this.flushCallback = flushCallback; this.log = createLogger(debugLogging); this.startInterval(); } add(event: BatchedEvent): void { this.queue.push(event); this.log.debug(`Event queued: ${event.type} (${this.queue.length} pending)`); if (this.queue.length >= this.batchSize) { this.flush(); } } async flush(): Promise { if (this.queue.length === 0) { return; } const eventsToFlush = [...this.queue]; this.queue = []; this.log.debug(`Flushing batch: ${eventsToFlush.length} events`); try { await this.flushCallback(eventsToFlush); this.log.debug('Batch flushed successfully'); } catch (error) { this.log.error('Batch flush failed:', error); // Re-queue failed events this.queue.unshift(...eventsToFlush); } } private startInterval(): void { this.intervalId = setInterval(() => { this.flush(); }, this.batchInterval); } destroy(): void { if (this.intervalId) { clearInterval(this.intervalId); } this.flush(); } }