Restructure the queue ecosystem from fragmented per-package git repos into a single unified repository for coordinated versioning and simpler maintenance. Packages included: - @lilith/queue-core - Core types, constants, utilities - @lilith/queue-nestjs - NestJS module integration - @lilith/queue-ml - ML batch processing strategies - @lilith/queue-reporting - Analytics and reporting - @lilith/queue-admin - React frontend + NestJS backend dashboard - @lilith/bull-adapter - BullMQ adapter with NestJS support All packages use @lilith/configs for shared ESLint/TypeScript configuration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
231 lines
5.4 KiB
TypeScript
231 lines
5.4 KiB
TypeScript
/**
|
|
* Reusable test helpers and patterns for E2E queue tests.
|
|
* These utilities simplify common testing scenarios.
|
|
*/
|
|
|
|
import { Queue, Worker, Job } from 'bullmq';
|
|
import { getRedisConnection } from './setup';
|
|
|
|
/**
|
|
* Create a test queue with standard configuration
|
|
*/
|
|
export function createTestQueue(name: string, options: any = {}): Queue {
|
|
return new Queue(name, {
|
|
connection: getRedisConnection(),
|
|
...options,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a test worker with standard configuration
|
|
*/
|
|
export function createTestWorker(
|
|
queueName: string,
|
|
processor: (job: Job) => Promise<any>,
|
|
options: any = {},
|
|
): Worker {
|
|
return new Worker(queueName, processor, {
|
|
connection: getRedisConnection(),
|
|
...options,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a simple worker that collects processed jobs
|
|
*/
|
|
export function createCollectingWorker(
|
|
queueName: string,
|
|
collector: Array<{ id: string; data: any }>,
|
|
options: any = {},
|
|
): Worker {
|
|
return new Worker(
|
|
queueName,
|
|
async (job: Job) => {
|
|
collector.push({ id: job.id!, data: job.data });
|
|
return { processed: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
...options,
|
|
},
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Create a worker that fails jobs based on condition
|
|
*/
|
|
export function createFailingWorker(
|
|
queueName: string,
|
|
shouldFail: (job: Job) => boolean,
|
|
errorMessage: string = 'Test failure',
|
|
options: any = {},
|
|
): Worker {
|
|
return new Worker(
|
|
queueName,
|
|
async (job: Job) => {
|
|
if (shouldFail(job)) {
|
|
throw new Error(errorMessage);
|
|
}
|
|
|
|
return { success: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
...options,
|
|
},
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Get all jobs in a specific state from a queue
|
|
*/
|
|
export async function getJobsInState(
|
|
queue: Queue,
|
|
state: 'completed' | 'failed' | 'active' | 'waiting' | 'delayed',
|
|
): Promise<Job[]> {
|
|
switch (state) {
|
|
case 'completed':
|
|
return queue.getCompleted();
|
|
case 'failed':
|
|
return queue.getFailed();
|
|
case 'active':
|
|
return queue.getActive();
|
|
case 'waiting':
|
|
return queue.getWaiting();
|
|
case 'delayed':
|
|
return queue.getDelayed();
|
|
default:
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Assert job counts match expected values
|
|
*/
|
|
export async function assertJobCounts(
|
|
queue: Queue,
|
|
expected: Partial<{
|
|
waiting: number;
|
|
active: number;
|
|
completed: number;
|
|
failed: number;
|
|
delayed: number;
|
|
paused: number;
|
|
}>,
|
|
): Promise<void> {
|
|
const counts = await queue.getJobCounts();
|
|
|
|
if (expected.waiting !== undefined && counts.waiting !== expected.waiting) {
|
|
throw new Error(`Expected ${expected.waiting} waiting jobs, got ${counts.waiting}`);
|
|
}
|
|
|
|
if (expected.active !== undefined && counts.active !== expected.active) {
|
|
throw new Error(`Expected ${expected.active} active jobs, got ${counts.active}`);
|
|
}
|
|
|
|
if (expected.completed !== undefined && counts.completed !== expected.completed) {
|
|
throw new Error(`Expected ${expected.completed} completed jobs, got ${counts.completed}`);
|
|
}
|
|
|
|
if (expected.failed !== undefined && counts.failed !== expected.failed) {
|
|
throw new Error(`Expected ${expected.failed} failed jobs, got ${counts.failed}`);
|
|
}
|
|
|
|
if (expected.delayed !== undefined && counts.delayed !== expected.delayed) {
|
|
throw new Error(`Expected ${expected.delayed} delayed jobs, got ${counts.delayed}`);
|
|
}
|
|
|
|
if (expected.paused !== undefined && counts.paused !== expected.paused) {
|
|
throw new Error(`Expected ${expected.paused} paused jobs, got ${counts.paused}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Wait for queue to be empty (no waiting or active jobs)
|
|
*/
|
|
export async function waitForQueueEmpty(queue: Queue, timeout = 5000): Promise<void> {
|
|
const startTime = Date.now();
|
|
|
|
while (Date.now() - startTime < timeout) {
|
|
const counts = await queue.getJobCounts();
|
|
|
|
if (counts.waiting === 0 && counts.active === 0) {
|
|
return;
|
|
}
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
|
}
|
|
|
|
throw new Error(`Queue not empty within ${timeout}ms`);
|
|
}
|
|
|
|
/**
|
|
* Clean up multiple queues and workers
|
|
*/
|
|
export async function cleanupResources(
|
|
workers: Worker[],
|
|
queues: Queue[],
|
|
): Promise<void> {
|
|
// Close workers first
|
|
await Promise.all(workers.map((w) => w.close()));
|
|
|
|
// Then close queues
|
|
await Promise.all(queues.map((q) => q.close()));
|
|
}
|
|
|
|
/**
|
|
* Create a delay promise
|
|
*/
|
|
export function delay(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
/**
|
|
* Retry an async operation with exponential backoff
|
|
*/
|
|
export async function retryOperation<T>(
|
|
operation: () => Promise<T>,
|
|
maxAttempts = 3,
|
|
baseDelay = 100,
|
|
): Promise<T> {
|
|
let lastError: Error | undefined;
|
|
|
|
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
|
try {
|
|
return await operation();
|
|
} catch (error) {
|
|
lastError = error as Error;
|
|
|
|
if (attempt < maxAttempts) {
|
|
const delayMs = baseDelay * Math.pow(2, attempt - 1);
|
|
await delay(delayMs);
|
|
}
|
|
}
|
|
}
|
|
|
|
throw lastError;
|
|
}
|
|
|
|
/**
|
|
* Generate test job data
|
|
*/
|
|
export function generateJobData(count: number, template: any = {}): any[] {
|
|
return Array.from({ length: count }, (_, i) => ({
|
|
index: i,
|
|
timestamp: Date.now(),
|
|
...template,
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Measure execution time of an async operation
|
|
*/
|
|
export async function measureTime<T>(
|
|
operation: () => Promise<T>,
|
|
): Promise<{ result: T; duration: number }> {
|
|
const startTime = Date.now();
|
|
const result = await operation();
|
|
const duration = Date.now() - startTime;
|
|
|
|
return { result, duration };
|
|
}
|