Restructure the queue ecosystem from fragmented per-package git repos into a single unified repository for coordinated versioning and simpler maintenance. Packages included: - @lilith/queue-core - Core types, constants, utilities - @lilith/queue-nestjs - NestJS module integration - @lilith/queue-ml - ML batch processing strategies - @lilith/queue-reporting - Analytics and reporting - @lilith/queue-admin - React frontend + NestJS backend dashboard - @lilith/bull-adapter - BullMQ adapter with NestJS support All packages use @lilith/configs for shared ESLint/TypeScript configuration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
579 lines
16 KiB
TypeScript
579 lines
16 KiB
TypeScript
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
|
import { Queue, Worker, Job } from 'bullmq';
|
|
import { JobPriority } from '../core/src/types/priority.types';
|
|
import { isPeakHour, shouldDeferJob, calculatePeakDelay } from '../core/src/utils/peak-hours';
|
|
import { getRedisConnection, waitForJobs, waitFor } from './setup';
|
|
|
|
/**
|
|
* E2E Integration Tests for Queue Package
|
|
*
|
|
* These tests verify:
|
|
* - Job enqueuing and processing
|
|
* - Priority handling
|
|
* - Retry behavior
|
|
* - Peak-hour deferral
|
|
* - Bulk operations
|
|
*/
|
|
|
|
describe('Queue Integration - Job Processing', () => {
|
|
let queue: Queue;
|
|
let worker: Worker;
|
|
const processedJobs: Array<{ id: string; data: any }> = [];
|
|
|
|
beforeEach(async () => {
|
|
processedJobs.length = 0;
|
|
|
|
queue = new Queue('test-queue', {
|
|
connection: getRedisConnection(),
|
|
});
|
|
|
|
worker = new Worker(
|
|
'test-queue',
|
|
async (job: Job) => {
|
|
processedJobs.push({ id: job.id!, data: job.data });
|
|
return { success: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
},
|
|
);
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await worker.close();
|
|
await queue.close();
|
|
});
|
|
|
|
it('should enqueue and process a simple job', async () => {
|
|
const jobData = { message: 'Hello, Queue!' };
|
|
const job = await queue.add('test-job', jobData);
|
|
|
|
expect(job.id).toBeDefined();
|
|
|
|
await waitForJobs(queue, 'completed', 1);
|
|
|
|
expect(processedJobs).toHaveLength(1);
|
|
expect(processedJobs[0].data).toMatchObject(jobData);
|
|
});
|
|
|
|
it('should process multiple jobs in order', async () => {
|
|
const jobs = [
|
|
{ message: 'Job 1' },
|
|
{ message: 'Job 2' },
|
|
{ message: 'Job 3' },
|
|
];
|
|
|
|
for (const data of jobs) {
|
|
await queue.add('test-job', data);
|
|
}
|
|
|
|
await waitForJobs(queue, 'completed', 3);
|
|
|
|
expect(processedJobs).toHaveLength(3);
|
|
expect(processedJobs.map((j) => j.data.message)).toEqual(['Job 1', 'Job 2', 'Job 3']);
|
|
});
|
|
|
|
it('should handle job completion with return value', async () => {
|
|
const job = await queue.add('test-job', { value: 42 });
|
|
|
|
await waitForJobs(queue, 'completed', 1);
|
|
|
|
const completedJob = await queue.getJob(job.id!);
|
|
expect(completedJob?.returnvalue).toEqual({ success: true });
|
|
});
|
|
});
|
|
|
|
describe('Queue Integration - Priority Handling', () => {
|
|
let queue: Queue;
|
|
let worker: Worker;
|
|
const processedOrder: number[] = [];
|
|
|
|
beforeEach(async () => {
|
|
processedOrder.length = 0;
|
|
|
|
queue = new Queue('priority-queue', {
|
|
connection: getRedisConnection(),
|
|
});
|
|
|
|
worker = new Worker(
|
|
'priority-queue',
|
|
async (job: Job) => {
|
|
processedOrder.push(job.data.order);
|
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
|
return { processed: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
concurrency: 1, // Process one at a time to verify order
|
|
},
|
|
);
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await worker.close();
|
|
await queue.close();
|
|
});
|
|
|
|
it('should process jobs in priority order', async () => {
|
|
// Add jobs with different priorities (lower number = higher priority)
|
|
await queue.add('job', { order: 3 }, { priority: JobPriority.LOW });
|
|
await queue.add('job', { order: 1 }, { priority: JobPriority.URGENT });
|
|
await queue.add('job', { order: 2 }, { priority: JobPriority.HIGH });
|
|
await queue.add('job', { order: 4 }, { priority: JobPriority.NORMAL });
|
|
|
|
await waitForJobs(queue, 'completed', 4);
|
|
|
|
// Should process in priority order: URGENT, HIGH, NORMAL, LOW
|
|
expect(processedOrder).toEqual([1, 2, 4, 3]);
|
|
});
|
|
|
|
it('should handle mixed priority jobs correctly', async () => {
|
|
await queue.add('job', { order: 5 }, { priority: JobPriority.BATCH });
|
|
await queue.add('job', { order: 2 }, { priority: JobPriority.HIGH });
|
|
await queue.add('job', { order: 3 }, { priority: JobPriority.NORMAL });
|
|
await queue.add('job', { order: 1 }, { priority: JobPriority.URGENT });
|
|
await queue.add('job', { order: 4 }, { priority: JobPriority.LOW });
|
|
|
|
await waitForJobs(queue, 'completed', 5);
|
|
|
|
expect(processedOrder).toEqual([1, 2, 3, 4, 5]);
|
|
});
|
|
});
|
|
|
|
describe('Queue Integration - Retry Behavior', () => {
|
|
let queue: Queue;
|
|
let worker: Worker;
|
|
let attemptCounts = new Map<string, number>();
|
|
|
|
beforeEach(async () => {
|
|
attemptCounts.clear();
|
|
|
|
queue = new Queue('retry-queue', {
|
|
connection: getRedisConnection(),
|
|
});
|
|
|
|
worker = new Worker(
|
|
'retry-queue',
|
|
async (job: Job) => {
|
|
const currentAttempts = attemptCounts.get(job.id!) ?? 0;
|
|
attemptCounts.set(job.id!, currentAttempts + 1);
|
|
|
|
if (job.data.shouldFail && currentAttempts < job.data.failUntilAttempt) {
|
|
throw new Error('Intentional failure for retry test');
|
|
}
|
|
|
|
return { attempts: currentAttempts + 1 };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
},
|
|
);
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await worker.close();
|
|
await queue.close();
|
|
});
|
|
|
|
it('should retry failed jobs with exponential backoff', async () => {
|
|
const job = await queue.add(
|
|
'retry-job',
|
|
{ shouldFail: true, failUntilAttempt: 2 },
|
|
{
|
|
attempts: 3,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 100,
|
|
},
|
|
},
|
|
);
|
|
|
|
await waitForJobs(queue, 'completed', 1, 10000);
|
|
|
|
const completedJob = await queue.getJob(job.id!);
|
|
expect(completedJob?.attemptsMade).toBe(3);
|
|
expect(attemptCounts.get(job.id!)).toBe(3);
|
|
});
|
|
|
|
it('should fail job after max attempts', async () => {
|
|
const job = await queue.add(
|
|
'retry-job',
|
|
{ shouldFail: true, failUntilAttempt: 10 }, // Never succeed
|
|
{
|
|
attempts: 3,
|
|
backoff: {
|
|
type: 'fixed',
|
|
delay: 50,
|
|
},
|
|
},
|
|
);
|
|
|
|
await waitForJobs(queue, 'failed', 1, 10000);
|
|
|
|
const failedJob = await queue.getJob(job.id!);
|
|
expect(failedJob?.attemptsMade).toBe(3);
|
|
expect(failedJob?.failedReason).toContain('Intentional failure');
|
|
});
|
|
|
|
it('should handle jobs that succeed on first retry', async () => {
|
|
const job = await queue.add(
|
|
'retry-job',
|
|
{ shouldFail: true, failUntilAttempt: 1 },
|
|
{
|
|
attempts: 3,
|
|
backoff: {
|
|
type: 'fixed',
|
|
delay: 50,
|
|
},
|
|
},
|
|
);
|
|
|
|
await waitForJobs(queue, 'completed', 1, 5000);
|
|
|
|
const completedJob = await queue.getJob(job.id!);
|
|
expect(completedJob?.attemptsMade).toBe(2);
|
|
expect(attemptCounts.get(job.id!)).toBe(2);
|
|
});
|
|
});
|
|
|
|
describe('Queue Integration - Peak Hour Deferral', () => {
|
|
let queue: Queue;
|
|
|
|
beforeEach(async () => {
|
|
queue = new Queue('peak-queue', {
|
|
connection: getRedisConnection(),
|
|
});
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await queue.close();
|
|
});
|
|
|
|
it('should detect peak hours correctly', () => {
|
|
// Test peak hour (6pm UTC = 18:00)
|
|
const peakTime = new Date('2025-01-15T18:30:00Z'); // Wednesday
|
|
expect(isPeakHour(peakTime)).toBe(true);
|
|
|
|
// Test non-peak hour (2am UTC)
|
|
const nonPeakTime = new Date('2025-01-15T02:00:00Z');
|
|
expect(isPeakHour(nonPeakTime)).toBe(false);
|
|
|
|
// Test weekend (not peak even during peak hours)
|
|
const weekend = new Date('2025-01-18T18:30:00Z'); // Saturday
|
|
expect(isPeakHour(weekend)).toBe(false);
|
|
});
|
|
|
|
it('should defer low-priority jobs during peak hours', () => {
|
|
const peakTime = new Date('2025-01-15T18:30:00Z');
|
|
|
|
// Low priority should defer
|
|
const lowPriorityResult = shouldDeferJob(JobPriority.LOW, peakTime);
|
|
expect(lowPriorityResult.shouldDefer).toBe(true);
|
|
expect(lowPriorityResult.delay).toBeGreaterThan(0);
|
|
|
|
// Normal priority should defer
|
|
const normalPriorityResult = shouldDeferJob(JobPriority.NORMAL, peakTime);
|
|
expect(normalPriorityResult.shouldDefer).toBe(true);
|
|
expect(normalPriorityResult.delay).toBeGreaterThan(0);
|
|
});
|
|
|
|
it('should not defer high-priority jobs during peak hours', () => {
|
|
const peakTime = new Date('2025-01-15T18:30:00Z');
|
|
|
|
// High priority should bypass
|
|
const highPriorityResult = shouldDeferJob(JobPriority.HIGH, peakTime);
|
|
expect(highPriorityResult.shouldDefer).toBe(false);
|
|
expect(highPriorityResult.delay).toBe(0);
|
|
|
|
// Urgent priority should bypass
|
|
const urgentPriorityResult = shouldDeferJob(JobPriority.URGENT, peakTime);
|
|
expect(urgentPriorityResult.shouldDefer).toBe(false);
|
|
expect(urgentPriorityResult.delay).toBe(0);
|
|
});
|
|
|
|
it('should calculate correct delay until peak ends', () => {
|
|
// 6pm UTC, peak ends at 10pm UTC (4 hours)
|
|
const peakTime = new Date('2025-01-15T18:00:00Z');
|
|
const delay = calculatePeakDelay(peakTime);
|
|
|
|
// Should be approximately 4 hours (14400000 ms)
|
|
expect(delay).toBeGreaterThan(14000000);
|
|
expect(delay).toBeLessThan(14500000);
|
|
});
|
|
|
|
it('should apply delay when adding job during peak hours', async () => {
|
|
const peakTime = new Date('2025-01-15T18:30:00Z');
|
|
const { delay } = shouldDeferJob(JobPriority.NORMAL, peakTime);
|
|
|
|
const job = await queue.add(
|
|
'deferred-job',
|
|
{ data: 'test' },
|
|
{
|
|
priority: JobPriority.NORMAL,
|
|
delay,
|
|
},
|
|
);
|
|
|
|
// Job should be in delayed state
|
|
const state = await job.getState();
|
|
expect(state).toBe('delayed');
|
|
|
|
const counts = await queue.getJobCounts();
|
|
expect(counts.delayed).toBe(1);
|
|
expect(counts.waiting).toBe(0);
|
|
});
|
|
});
|
|
|
|
describe('Queue Integration - Bulk Operations', () => {
|
|
let queue: Queue;
|
|
let worker: Worker;
|
|
const processedJobs: string[] = [];
|
|
|
|
beforeEach(async () => {
|
|
processedJobs.length = 0;
|
|
|
|
queue = new Queue('bulk-queue', {
|
|
connection: getRedisConnection(),
|
|
});
|
|
|
|
worker = new Worker(
|
|
'bulk-queue',
|
|
async (job: Job) => {
|
|
processedJobs.push(job.id!);
|
|
return { processed: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
concurrency: 5,
|
|
},
|
|
);
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await worker.close();
|
|
await queue.close();
|
|
});
|
|
|
|
it('should add bulk jobs efficiently', async () => {
|
|
const bulkJobs = Array.from({ length: 100 }, (_, i) => ({
|
|
name: 'bulk-job',
|
|
data: { index: i },
|
|
opts: {},
|
|
}));
|
|
|
|
const startTime = Date.now();
|
|
const jobs = await queue.addBulk(bulkJobs);
|
|
const duration = Date.now() - startTime;
|
|
|
|
expect(jobs).toHaveLength(100);
|
|
expect(duration).toBeLessThan(1000); // Should be fast
|
|
|
|
await waitForJobs(queue, 'completed', 100, 30000);
|
|
|
|
expect(processedJobs).toHaveLength(100);
|
|
});
|
|
|
|
it('should handle bulk jobs with different priorities', async () => {
|
|
const bulkJobs = [
|
|
...Array.from({ length: 10 }, (_, i) => ({
|
|
name: 'low',
|
|
data: { priority: 'low', index: i },
|
|
opts: { priority: JobPriority.LOW },
|
|
})),
|
|
...Array.from({ length: 10 }, (_, i) => ({
|
|
name: 'high',
|
|
data: { priority: 'high', index: i },
|
|
opts: { priority: JobPriority.HIGH },
|
|
})),
|
|
...Array.from({ length: 10 }, (_, i) => ({
|
|
name: 'normal',
|
|
data: { priority: 'normal', index: i },
|
|
opts: { priority: JobPriority.NORMAL },
|
|
})),
|
|
];
|
|
|
|
const jobs = await queue.addBulk(bulkJobs);
|
|
expect(jobs).toHaveLength(30);
|
|
|
|
await waitForJobs(queue, 'completed', 30, 15000);
|
|
|
|
expect(processedJobs).toHaveLength(30);
|
|
});
|
|
|
|
it('should handle bulk job failures gracefully', async () => {
|
|
const bulkJobs = Array.from({ length: 20 }, (_, i) => ({
|
|
name: 'bulk-job',
|
|
data: { index: i, shouldFail: i % 5 === 0 }, // Every 5th job fails
|
|
opts: {
|
|
attempts: 1,
|
|
},
|
|
}));
|
|
|
|
await queue.addBulk(bulkJobs);
|
|
|
|
// Update worker to handle failures
|
|
await worker.close();
|
|
worker = new Worker(
|
|
'bulk-queue',
|
|
async (job: Job) => {
|
|
processedJobs.push(job.id!);
|
|
|
|
if (job.data.shouldFail) {
|
|
throw new Error('Intentional bulk failure');
|
|
}
|
|
|
|
return { processed: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
concurrency: 5,
|
|
},
|
|
);
|
|
|
|
await waitFor(
|
|
async () => {
|
|
const counts = await queue.getJobCounts();
|
|
return counts.completed + counts.failed === 20;
|
|
},
|
|
{ timeout: 15000 },
|
|
);
|
|
|
|
const counts = await queue.getJobCounts();
|
|
expect(counts.completed).toBe(16);
|
|
expect(counts.failed).toBe(4);
|
|
});
|
|
});
|
|
|
|
describe('Queue Integration - Advanced Scenarios', () => {
|
|
let queue: Queue;
|
|
let worker: Worker;
|
|
|
|
beforeEach(async () => {
|
|
queue = new Queue('advanced-queue', {
|
|
connection: getRedisConnection(),
|
|
});
|
|
});
|
|
|
|
afterEach(async () => {
|
|
if (worker) {
|
|
await worker.close();
|
|
}
|
|
|
|
await queue.close();
|
|
});
|
|
|
|
it('should handle job cancellation', async () => {
|
|
const job = await queue.add('cancellable-job', { data: 'test' }, { delay: 5000 });
|
|
|
|
// Job should be delayed
|
|
expect(await job.getState()).toBe('delayed');
|
|
|
|
// Remove the job
|
|
await job.remove();
|
|
|
|
// Job should no longer exist
|
|
const removedJob = await queue.getJob(job.id!);
|
|
expect(removedJob).toBeNull();
|
|
});
|
|
|
|
it('should track job progress', async () => {
|
|
const progressUpdates: number[] = [];
|
|
|
|
worker = new Worker(
|
|
'advanced-queue',
|
|
async (job: Job) => {
|
|
for (let i = 0; i <= 100; i += 25) {
|
|
await job.updateProgress(i);
|
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
|
}
|
|
|
|
return { complete: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
},
|
|
);
|
|
|
|
const job = await queue.add('progress-job', { data: 'test' });
|
|
|
|
// Monitor progress
|
|
const checkProgress = async () => {
|
|
const currentJob = await queue.getJob(job.id!);
|
|
|
|
if (currentJob && typeof currentJob.progress === 'number') {
|
|
progressUpdates.push(currentJob.progress);
|
|
}
|
|
};
|
|
|
|
const interval = setInterval(checkProgress, 100);
|
|
|
|
await waitForJobs(queue, 'completed', 1, 5000);
|
|
clearInterval(interval);
|
|
|
|
expect(progressUpdates.length).toBeGreaterThan(0);
|
|
expect(progressUpdates).toContain(100);
|
|
});
|
|
|
|
it('should handle concurrent job processing', async () => {
|
|
const processedJobIds = new Set<string>();
|
|
|
|
worker = new Worker(
|
|
'advanced-queue',
|
|
async (job: Job) => {
|
|
processedJobIds.add(job.id!);
|
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
|
return { id: job.id };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
concurrency: 10, // Process 10 jobs concurrently
|
|
},
|
|
);
|
|
|
|
// Add 50 jobs
|
|
const jobs = await Promise.all(
|
|
Array.from({ length: 50 }, (_, i) => queue.add('concurrent-job', { index: i })),
|
|
);
|
|
|
|
await waitForJobs(queue, 'completed', 50, 15000);
|
|
|
|
expect(processedJobIds.size).toBe(50);
|
|
expect(jobs.every((job) => processedJobIds.has(job.id!))).toBe(true);
|
|
});
|
|
|
|
it('should respect rate limiting', async () => {
|
|
const processTimestamps: number[] = [];
|
|
|
|
queue = new Queue('rate-limited-queue', {
|
|
connection: getRedisConnection(),
|
|
limiter: {
|
|
max: 5, // Max 5 jobs
|
|
duration: 1000, // Per second
|
|
},
|
|
});
|
|
|
|
worker = new Worker(
|
|
'rate-limited-queue',
|
|
async (job: Job) => {
|
|
processTimestamps.push(Date.now());
|
|
return { processed: true };
|
|
},
|
|
{
|
|
connection: getRedisConnection(),
|
|
},
|
|
);
|
|
|
|
// Add 15 jobs (should take at least 2 seconds with rate limit)
|
|
await Promise.all(Array.from({ length: 15 }, (_, i) => queue.add('rate-job', { index: i })));
|
|
|
|
const startTime = Date.now();
|
|
await waitForJobs(queue, 'completed', 15, 10000);
|
|
const duration = Date.now() - startTime;
|
|
|
|
// Should take at least 2 seconds due to rate limiting
|
|
expect(duration).toBeGreaterThan(2000);
|
|
|
|
// Verify jobs were processed in batches
|
|
expect(processTimestamps).toHaveLength(15);
|
|
});
|
|
});
|