|
|
||
|---|---|---|
| .. | ||
| .githooks | ||
| src | ||
| .gitignore | ||
| package.json | ||
| README.md | ||
| tsconfig.json | ||
| tsup.config.ts | ||
@lilith/queue/bull-adapter
Bull/BullMQ queue adapter with NestJS module support. Provides a unified interface for queue operations with full TypeScript support.
Features
- Works with both Bull and BullMQ
- NestJS module with
forRoot()andforRootAsync()configuration - Generic typed job methods
- Queue metrics and introspection
- Configurable retry/backoff strategies
- Bulk job operations
- Job lifecycle management (pause, resume, drain, clean)
Installation
pnpm add @lilith/queue bullmq ioredis
Quick Start
NestJS Module Configuration
Static Configuration
import { Module } from '@nestjs/common';
import { QueueModule } from '@lilith/queue/bull-adapter';
@Module({
imports: [
QueueModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
defaultQueueName: 'main',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
}),
],
})
export class AppModule {}
Async Configuration with ConfigService
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { QueueModule } from '@lilith/queue/bull-adapter';
@Module({
imports: [
ConfigModule.forRoot(),
QueueModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
connection: {
host: config.get('REDIS_HOST', 'localhost'),
port: config.get('REDIS_PORT', 6379),
password: config.get('REDIS_PASSWORD'),
},
defaultQueueName: config.get('QUEUE_NAME', 'default'),
}),
}),
],
})
export class AppModule {}
Using the Queue Service
import { Injectable } from '@nestjs/common';
import { BaseQueueService, JobOptions } from '@lilith/queue/bull-adapter';
interface EmailPayload {
to: string;
subject: string;
body: string;
}
@Injectable()
export class EmailService {
constructor(private readonly queueService: BaseQueueService) {}
async sendEmail(email: EmailPayload): Promise<string> {
const result = await this.queueService.addJob<EmailPayload>(
'send-email',
email,
{ priority: 1 }
);
return result.id;
}
async sendBulkEmails(emails: EmailPayload[]): Promise<void> {
await this.queueService.addBulk(
emails.map(email => ({
type: 'send-email',
data: email,
}))
);
}
async getQueueStatus() {
return this.queueService.getMetrics();
}
}
API Reference
BaseQueueService
addJob<T>(type, data, options?, queueName?)
Add a single job to the queue.
const result = await queueService.addJob<MyPayload>(
'job-type',
{ key: 'value' },
{
attempts: 5,
delay: 5000,
priority: 1,
}
);
console.log(result.id); // Job ID
console.log(result.queue); // Queue name
addBulk<T>(jobs, queueName?)
Add multiple jobs in a single batch operation.
const results = await queueService.addBulk([
{ type: 'email', data: { to: 'user1@example.com' } },
{ type: 'email', data: { to: 'user2@example.com' } },
{ type: 'sms', data: { phone: '+1234567890' } },
]);
getMetrics(queueName?)
Get queue metrics and job counts.
const metrics = await queueService.getMetrics();
console.log(metrics);
// {
// name: 'main',
// waiting: 10,
// active: 2,
// completed: 1500,
// failed: 5,
// delayed: 3,
// paused: 0,
// timestamp: 1703001234567
// }
getJobStatus(jobId, queueName?)
Get detailed status of a specific job.
const status = await queueService.getJobStatus('job-123');
console.log(status);
// {
// id: 'job-123',
// name: 'send-email',
// state: 'completed',
// progress: 100,
// data: { ... },
// attemptsMade: 1,
// timestamp: 1703001234567,
// finishedOn: 1703001234890
// }
waitForJob<R>(jobId, queueName?, timeout?)
Wait for a job to complete and get its result.
try {
const result = await queueService.waitForJob<MyResult>(
'job-123',
undefined,
30000 // 30 second timeout
);
console.log('Job completed:', result);
} catch (error) {
console.error('Job failed:', error.message);
}
Queue Control Methods
// Pause queue processing
await queueService.pause();
// Resume queue processing
await queueService.resume();
// Remove all jobs from queue
await queueService.drain();
// Clean old completed jobs (older than 1 hour, max 1000)
const cleaned = await queueService.clean(3600000, 1000, 'completed');
// Remove a specific job
await queueService.removeJob('job-123');
// Retry a failed job
await queueService.retryJob('job-123');
Job Options
| Option | Type | Default | Description |
|---|---|---|---|
attempts |
number |
3 |
Number of retry attempts |
backoff |
{ type, delay } |
{ type: 'exponential', delay: 1000 } |
Retry backoff strategy |
delay |
number |
undefined |
Delay before processing (ms) |
priority |
number |
undefined |
Job priority (lower = higher) |
removeOnComplete |
boolean | number |
true |
Remove job on completion |
removeOnFail |
boolean | number |
false |
Remove job on failure |
timeout |
number |
undefined |
Job timeout (ms) |
jobId |
string |
undefined |
Custom job ID |
repeat |
object |
undefined |
Repeat job configuration |
Backoff Strategies
Exponential Backoff (Default)
{
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s...
}
}
Fixed Backoff
{
backoff: {
type: 'fixed',
delay: 5000, // Always wait 5s between retries
}
}
Repeat Jobs
Cron Pattern
await queueService.addJob('daily-report', { type: 'summary' }, {
repeat: {
pattern: '0 9 * * *', // Every day at 9 AM
},
});
Fixed Interval
await queueService.addJob('health-check', {}, {
repeat: {
every: 60000, // Every minute
limit: 100, // Stop after 100 executions
},
});
Processing Jobs (Worker)
Create a worker to process jobs:
import { Worker, Job } from 'bullmq';
import { JobData } from '@lilith/queue/bull-adapter';
const worker = new Worker(
'main',
async (job: Job<JobData<MyPayload>>) => {
const { type, data, correlationId } = job.data;
switch (type) {
case 'send-email':
return await sendEmail(data);
case 'generate-report':
return await generateReport(data);
default:
throw new Error(`Unknown job type: ${type}`);
}
},
{
connection: { host: 'localhost', port: 6379 },
concurrency: 5,
}
);
worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', (job, error) => {
console.error(`Job ${job?.id} failed:`, error);
});
Constants
Import default values:
import {
DEFAULT_RETRY_ATTEMPTS, // 3
DEFAULT_BACKOFF_TYPE, // 'exponential'
DEFAULT_BACKOFF_DELAY, // 1000
DEFAULT_JOB_TIMEOUT, // 300000 (5 minutes)
DEFAULT_CONCURRENCY, // 1
DEFAULT_REDIS_CONFIG, // { host: 'localhost', port: 6379 }
} from '@lilith/queue/bull-adapter';
TypeScript Support
Full TypeScript support with generics:
interface OrderPayload {
orderId: string;
items: string[];
total: number;
}
interface OrderResult {
success: boolean;
trackingNumber: string;
}
// Typed job creation
await queueService.addJob<OrderPayload>('process-order', {
orderId: 'ORD-123',
items: ['item-1', 'item-2'],
total: 99.99,
});
// Typed result waiting
const result = await queueService.waitForJob<OrderResult>('job-id');
console.log(result.trackingNumber);