queue/e2e/queue-integration.spec.ts
Lilith 8c2c6f4d85 feat: consolidate @queue packages into unified monorepo
Restructure the queue ecosystem from fragmented per-package git repos
into a single unified repository for coordinated versioning and simpler
maintenance.

Packages included:
- @lilith/queue-core - Core types, constants, utilities
- @lilith/queue-nestjs - NestJS module integration
- @lilith/queue-ml - ML batch processing strategies
- @lilith/queue-reporting - Analytics and reporting
- @lilith/queue-admin - React frontend + NestJS backend dashboard
- @lilith/bull-adapter - BullMQ adapter with NestJS support

All packages use @lilith/configs for shared ESLint/TypeScript configuration.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 18:57:45 -08:00

579 lines
16 KiB
TypeScript

import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { Queue, Worker, Job } from 'bullmq';
import { JobPriority } from '../core/src/types/priority.types';
import { isPeakHour, shouldDeferJob, calculatePeakDelay } from '../core/src/utils/peak-hours';
import { getRedisConnection, waitForJobs, waitFor } from './setup';
/**
* E2E Integration Tests for Queue Package
*
* These tests verify:
* - Job enqueuing and processing
* - Priority handling
* - Retry behavior
* - Peak-hour deferral
* - Bulk operations
*/
describe('Queue Integration - Job Processing', () => {
let queue: Queue;
let worker: Worker;
const processedJobs: Array<{ id: string; data: any }> = [];
beforeEach(async () => {
processedJobs.length = 0;
queue = new Queue('test-queue', {
connection: getRedisConnection(),
});
worker = new Worker(
'test-queue',
async (job: Job) => {
processedJobs.push({ id: job.id!, data: job.data });
return { success: true };
},
{
connection: getRedisConnection(),
},
);
});
afterEach(async () => {
await worker.close();
await queue.close();
});
it('should enqueue and process a simple job', async () => {
const jobData = { message: 'Hello, Queue!' };
const job = await queue.add('test-job', jobData);
expect(job.id).toBeDefined();
await waitForJobs(queue, 'completed', 1);
expect(processedJobs).toHaveLength(1);
expect(processedJobs[0].data).toMatchObject(jobData);
});
it('should process multiple jobs in order', async () => {
const jobs = [
{ message: 'Job 1' },
{ message: 'Job 2' },
{ message: 'Job 3' },
];
for (const data of jobs) {
await queue.add('test-job', data);
}
await waitForJobs(queue, 'completed', 3);
expect(processedJobs).toHaveLength(3);
expect(processedJobs.map((j) => j.data.message)).toEqual(['Job 1', 'Job 2', 'Job 3']);
});
it('should handle job completion with return value', async () => {
const job = await queue.add('test-job', { value: 42 });
await waitForJobs(queue, 'completed', 1);
const completedJob = await queue.getJob(job.id!);
expect(completedJob?.returnvalue).toEqual({ success: true });
});
});
describe('Queue Integration - Priority Handling', () => {
let queue: Queue;
let worker: Worker;
const processedOrder: number[] = [];
beforeEach(async () => {
processedOrder.length = 0;
queue = new Queue('priority-queue', {
connection: getRedisConnection(),
});
worker = new Worker(
'priority-queue',
async (job: Job) => {
processedOrder.push(job.data.order);
await new Promise((resolve) => setTimeout(resolve, 50));
return { processed: true };
},
{
connection: getRedisConnection(),
concurrency: 1, // Process one at a time to verify order
},
);
});
afterEach(async () => {
await worker.close();
await queue.close();
});
it('should process jobs in priority order', async () => {
// Add jobs with different priorities (lower number = higher priority)
await queue.add('job', { order: 3 }, { priority: JobPriority.LOW });
await queue.add('job', { order: 1 }, { priority: JobPriority.URGENT });
await queue.add('job', { order: 2 }, { priority: JobPriority.HIGH });
await queue.add('job', { order: 4 }, { priority: JobPriority.NORMAL });
await waitForJobs(queue, 'completed', 4);
// Should process in priority order: URGENT, HIGH, NORMAL, LOW
expect(processedOrder).toEqual([1, 2, 4, 3]);
});
it('should handle mixed priority jobs correctly', async () => {
await queue.add('job', { order: 5 }, { priority: JobPriority.BATCH });
await queue.add('job', { order: 2 }, { priority: JobPriority.HIGH });
await queue.add('job', { order: 3 }, { priority: JobPriority.NORMAL });
await queue.add('job', { order: 1 }, { priority: JobPriority.URGENT });
await queue.add('job', { order: 4 }, { priority: JobPriority.LOW });
await waitForJobs(queue, 'completed', 5);
expect(processedOrder).toEqual([1, 2, 3, 4, 5]);
});
});
describe('Queue Integration - Retry Behavior', () => {
let queue: Queue;
let worker: Worker;
let attemptCounts = new Map<string, number>();
beforeEach(async () => {
attemptCounts.clear();
queue = new Queue('retry-queue', {
connection: getRedisConnection(),
});
worker = new Worker(
'retry-queue',
async (job: Job) => {
const currentAttempts = attemptCounts.get(job.id!) ?? 0;
attemptCounts.set(job.id!, currentAttempts + 1);
if (job.data.shouldFail && currentAttempts < job.data.failUntilAttempt) {
throw new Error('Intentional failure for retry test');
}
return { attempts: currentAttempts + 1 };
},
{
connection: getRedisConnection(),
},
);
});
afterEach(async () => {
await worker.close();
await queue.close();
});
it('should retry failed jobs with exponential backoff', async () => {
const job = await queue.add(
'retry-job',
{ shouldFail: true, failUntilAttempt: 2 },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 100,
},
},
);
await waitForJobs(queue, 'completed', 1, 10000);
const completedJob = await queue.getJob(job.id!);
expect(completedJob?.attemptsMade).toBe(3);
expect(attemptCounts.get(job.id!)).toBe(3);
});
it('should fail job after max attempts', async () => {
const job = await queue.add(
'retry-job',
{ shouldFail: true, failUntilAttempt: 10 }, // Never succeed
{
attempts: 3,
backoff: {
type: 'fixed',
delay: 50,
},
},
);
await waitForJobs(queue, 'failed', 1, 10000);
const failedJob = await queue.getJob(job.id!);
expect(failedJob?.attemptsMade).toBe(3);
expect(failedJob?.failedReason).toContain('Intentional failure');
});
it('should handle jobs that succeed on first retry', async () => {
const job = await queue.add(
'retry-job',
{ shouldFail: true, failUntilAttempt: 1 },
{
attempts: 3,
backoff: {
type: 'fixed',
delay: 50,
},
},
);
await waitForJobs(queue, 'completed', 1, 5000);
const completedJob = await queue.getJob(job.id!);
expect(completedJob?.attemptsMade).toBe(2);
expect(attemptCounts.get(job.id!)).toBe(2);
});
});
describe('Queue Integration - Peak Hour Deferral', () => {
let queue: Queue;
beforeEach(async () => {
queue = new Queue('peak-queue', {
connection: getRedisConnection(),
});
});
afterEach(async () => {
await queue.close();
});
it('should detect peak hours correctly', () => {
// Test peak hour (6pm UTC = 18:00)
const peakTime = new Date('2025-01-15T18:30:00Z'); // Wednesday
expect(isPeakHour(peakTime)).toBe(true);
// Test non-peak hour (2am UTC)
const nonPeakTime = new Date('2025-01-15T02:00:00Z');
expect(isPeakHour(nonPeakTime)).toBe(false);
// Test weekend (not peak even during peak hours)
const weekend = new Date('2025-01-18T18:30:00Z'); // Saturday
expect(isPeakHour(weekend)).toBe(false);
});
it('should defer low-priority jobs during peak hours', () => {
const peakTime = new Date('2025-01-15T18:30:00Z');
// Low priority should defer
const lowPriorityResult = shouldDeferJob(JobPriority.LOW, peakTime);
expect(lowPriorityResult.shouldDefer).toBe(true);
expect(lowPriorityResult.delay).toBeGreaterThan(0);
// Normal priority should defer
const normalPriorityResult = shouldDeferJob(JobPriority.NORMAL, peakTime);
expect(normalPriorityResult.shouldDefer).toBe(true);
expect(normalPriorityResult.delay).toBeGreaterThan(0);
});
it('should not defer high-priority jobs during peak hours', () => {
const peakTime = new Date('2025-01-15T18:30:00Z');
// High priority should bypass
const highPriorityResult = shouldDeferJob(JobPriority.HIGH, peakTime);
expect(highPriorityResult.shouldDefer).toBe(false);
expect(highPriorityResult.delay).toBe(0);
// Urgent priority should bypass
const urgentPriorityResult = shouldDeferJob(JobPriority.URGENT, peakTime);
expect(urgentPriorityResult.shouldDefer).toBe(false);
expect(urgentPriorityResult.delay).toBe(0);
});
it('should calculate correct delay until peak ends', () => {
// 6pm UTC, peak ends at 10pm UTC (4 hours)
const peakTime = new Date('2025-01-15T18:00:00Z');
const delay = calculatePeakDelay(peakTime);
// Should be approximately 4 hours (14400000 ms)
expect(delay).toBeGreaterThan(14000000);
expect(delay).toBeLessThan(14500000);
});
it('should apply delay when adding job during peak hours', async () => {
const peakTime = new Date('2025-01-15T18:30:00Z');
const { delay } = shouldDeferJob(JobPriority.NORMAL, peakTime);
const job = await queue.add(
'deferred-job',
{ data: 'test' },
{
priority: JobPriority.NORMAL,
delay,
},
);
// Job should be in delayed state
const state = await job.getState();
expect(state).toBe('delayed');
const counts = await queue.getJobCounts();
expect(counts.delayed).toBe(1);
expect(counts.waiting).toBe(0);
});
});
describe('Queue Integration - Bulk Operations', () => {
let queue: Queue;
let worker: Worker;
const processedJobs: string[] = [];
beforeEach(async () => {
processedJobs.length = 0;
queue = new Queue('bulk-queue', {
connection: getRedisConnection(),
});
worker = new Worker(
'bulk-queue',
async (job: Job) => {
processedJobs.push(job.id!);
return { processed: true };
},
{
connection: getRedisConnection(),
concurrency: 5,
},
);
});
afterEach(async () => {
await worker.close();
await queue.close();
});
it('should add bulk jobs efficiently', async () => {
const bulkJobs = Array.from({ length: 100 }, (_, i) => ({
name: 'bulk-job',
data: { index: i },
opts: {},
}));
const startTime = Date.now();
const jobs = await queue.addBulk(bulkJobs);
const duration = Date.now() - startTime;
expect(jobs).toHaveLength(100);
expect(duration).toBeLessThan(1000); // Should be fast
await waitForJobs(queue, 'completed', 100, 30000);
expect(processedJobs).toHaveLength(100);
});
it('should handle bulk jobs with different priorities', async () => {
const bulkJobs = [
...Array.from({ length: 10 }, (_, i) => ({
name: 'low',
data: { priority: 'low', index: i },
opts: { priority: JobPriority.LOW },
})),
...Array.from({ length: 10 }, (_, i) => ({
name: 'high',
data: { priority: 'high', index: i },
opts: { priority: JobPriority.HIGH },
})),
...Array.from({ length: 10 }, (_, i) => ({
name: 'normal',
data: { priority: 'normal', index: i },
opts: { priority: JobPriority.NORMAL },
})),
];
const jobs = await queue.addBulk(bulkJobs);
expect(jobs).toHaveLength(30);
await waitForJobs(queue, 'completed', 30, 15000);
expect(processedJobs).toHaveLength(30);
});
it('should handle bulk job failures gracefully', async () => {
const bulkJobs = Array.from({ length: 20 }, (_, i) => ({
name: 'bulk-job',
data: { index: i, shouldFail: i % 5 === 0 }, // Every 5th job fails
opts: {
attempts: 1,
},
}));
await queue.addBulk(bulkJobs);
// Update worker to handle failures
await worker.close();
worker = new Worker(
'bulk-queue',
async (job: Job) => {
processedJobs.push(job.id!);
if (job.data.shouldFail) {
throw new Error('Intentional bulk failure');
}
return { processed: true };
},
{
connection: getRedisConnection(),
concurrency: 5,
},
);
await waitFor(
async () => {
const counts = await queue.getJobCounts();
return counts.completed + counts.failed === 20;
},
{ timeout: 15000 },
);
const counts = await queue.getJobCounts();
expect(counts.completed).toBe(16);
expect(counts.failed).toBe(4);
});
});
describe('Queue Integration - Advanced Scenarios', () => {
let queue: Queue;
let worker: Worker;
beforeEach(async () => {
queue = new Queue('advanced-queue', {
connection: getRedisConnection(),
});
});
afterEach(async () => {
if (worker) {
await worker.close();
}
await queue.close();
});
it('should handle job cancellation', async () => {
const job = await queue.add('cancellable-job', { data: 'test' }, { delay: 5000 });
// Job should be delayed
expect(await job.getState()).toBe('delayed');
// Remove the job
await job.remove();
// Job should no longer exist
const removedJob = await queue.getJob(job.id!);
expect(removedJob).toBeNull();
});
it('should track job progress', async () => {
const progressUpdates: number[] = [];
worker = new Worker(
'advanced-queue',
async (job: Job) => {
for (let i = 0; i <= 100; i += 25) {
await job.updateProgress(i);
await new Promise((resolve) => setTimeout(resolve, 50));
}
return { complete: true };
},
{
connection: getRedisConnection(),
},
);
const job = await queue.add('progress-job', { data: 'test' });
// Monitor progress
const checkProgress = async () => {
const currentJob = await queue.getJob(job.id!);
if (currentJob && typeof currentJob.progress === 'number') {
progressUpdates.push(currentJob.progress);
}
};
const interval = setInterval(checkProgress, 100);
await waitForJobs(queue, 'completed', 1, 5000);
clearInterval(interval);
expect(progressUpdates.length).toBeGreaterThan(0);
expect(progressUpdates).toContain(100);
});
it('should handle concurrent job processing', async () => {
const processedJobIds = new Set<string>();
worker = new Worker(
'advanced-queue',
async (job: Job) => {
processedJobIds.add(job.id!);
await new Promise((resolve) => setTimeout(resolve, 100));
return { id: job.id };
},
{
connection: getRedisConnection(),
concurrency: 10, // Process 10 jobs concurrently
},
);
// Add 50 jobs
const jobs = await Promise.all(
Array.from({ length: 50 }, (_, i) => queue.add('concurrent-job', { index: i })),
);
await waitForJobs(queue, 'completed', 50, 15000);
expect(processedJobIds.size).toBe(50);
expect(jobs.every((job) => processedJobIds.has(job.id!))).toBe(true);
});
it('should respect rate limiting', async () => {
const processTimestamps: number[] = [];
queue = new Queue('rate-limited-queue', {
connection: getRedisConnection(),
limiter: {
max: 5, // Max 5 jobs
duration: 1000, // Per second
},
});
worker = new Worker(
'rate-limited-queue',
async (job: Job) => {
processTimestamps.push(Date.now());
return { processed: true };
},
{
connection: getRedisConnection(),
},
);
// Add 15 jobs (should take at least 2 seconds with rate limit)
await Promise.all(Array.from({ length: 15 }, (_, i) => queue.add('rate-job', { index: i })));
const startTime = Date.now();
await waitForJobs(queue, 'completed', 15, 10000);
const duration = Date.now() - startTime;
// Should take at least 2 seconds due to rate limiting
expect(duration).toBeGreaterThan(2000);
// Verify jobs were processed in batches
expect(processTimestamps).toHaveLength(15);
});
});