No description
|
Some checks failed
Build and Publish / build-and-publish (push) Failing after 50s
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com> |
||
|---|---|---|
| .forgejo/workflows | ||
| .githooks | ||
| admin | ||
| bull-adapter | ||
| cli | ||
| core | ||
| docs | ||
| e2e | ||
| ml | ||
| nestjs | ||
| reporting | ||
| .gitignore | ||
| eslint.config.js | ||
| package.json | ||
| README.md | ||
| tsconfig.base.json | ||
| vitest.config.ts | ||
@lilith/queue
Job queue ecosystem for NestJS with BullMQ: core types, NestJS integration, ML batching, reporting, and admin dashboard.
Features
- Core Types: Shared types, constants, and utilities
- NestJS Integration: Base classes for processors, schedulers, and services
- ML Batching: Request and pipeline batching strategies for ML workloads
- Reporting: Queue metrics and historical reporting
- Bull Adapter: BullMQ adapter with NestJS module support
- Admin Dashboard: React-based queue management UI
Installation
pnpm add @lilith/queue bullmq ioredis
Peer Dependencies (as needed)
# For NestJS integration
pnpm add @nestjs/common @nestjs/core @nestjs/bullmq
# For scheduling
pnpm add @nestjs/schedule cron
# For admin dashboard
pnpm add @nestjs/websockets @nestjs/platform-socket.io socket.io
# For reporting with TypeORM
pnpm add @nestjs/typeorm typeorm
Quick Start
import { Module } from '@nestjs/common';
import { QueueModule } from '@lilith/queue/nestjs';
@Module({
imports: [
QueueModule.forRoot({
connection: { host: 'localhost', port: 6379 },
enableScheduling: true,
}),
],
})
export class AppModule {}
Subpath Exports
Core (@lilith/queue/core)
Types, constants, and utilities:
import {
JobPriority,
isPeakHour,
shouldDeferJob,
generateCorrelationId,
DEFAULT_PRIORITY,
DEFAULT_RETRY_ATTEMPTS,
} from '@lilith/queue/core';
// Check peak hours
if (isPeakHour()) {
options.delay = calculateDelay();
}
// Generate tracking ID
const correlationId = generateCorrelationId('analytics');
NestJS (@lilith/queue/nestjs)
NestJS integration with base classes:
import {
QueueModule,
BaseProcessor,
BaseScheduler,
BaseJobService,
QueueManagerService,
} from '@lilith/queue/nestjs';
// Processor with lifecycle hooks
@Processor('analytics')
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJob> {
async process(job: Job<AnalyticsJob>) {
// Process job
}
onCompleted(job: Job<AnalyticsJob>) {
this.logger.log(`Job ${job.id} completed`);
}
}
// Scheduler with peak-hour awareness
@Injectable()
export class ReportScheduler extends BaseScheduler {
@Cron('0 */6 * * *')
async generateReports() {
if (this.shouldDeferDuringPeak()) {
return this.deferJob(/* ... */);
}
// Generate reports
}
}
// Type-safe job service
@Injectable()
export class AnalyticsJobService extends BaseJobService<AnalyticsJob> {
queueName = 'analytics';
async trackEvent(event: AnalyticsEvent) {
return this.addJob('track', { event });
}
}
ML (@lilith/queue/ml)
Batching strategies for ML workloads:
import {
RequestBatchingStrategy,
PipelineBatchingStrategy,
BaseMLProcessor,
} from '@lilith/queue/ml';
// Request batching - collect multiple requests
const batching = new RequestBatchingStrategy({
maxBatchSize: 32,
maxWaitMs: 100,
onBatch: async (items) => {
return await model.batchInference(items);
},
});
// Pipeline batching - stage-based processing
const pipeline = new PipelineBatchingStrategy({
stages: ['preprocess', 'inference', 'postprocess'],
batchSizes: { inference: 16 },
});
Reporting (@lilith/queue/reporting)
Queue metrics and reporting:
import {
ReportingModule,
QueueReportingService,
QueueMetricsEntity,
} from '@lilith/queue/reporting';
// Get historical metrics
const metrics = await reportingService.getMetrics({
queueName: 'analytics',
from: startDate,
to: endDate,
});
Bull Adapter (@lilith/queue/bull-adapter)
BullMQ adapter with NestJS module:
import { QueueModule, BaseQueueService } from '@lilith/queue/bull-adapter';
// Add jobs
await queueService.addJob<EmailPayload>('send-email', {
to: 'user@example.com',
subject: 'Hello',
});
// Get metrics
const metrics = await queueService.getMetrics();
// Queue control
await queueService.pause();
await queueService.resume();
await queueService.drain();
Admin Dashboard (@lilith/queue/admin)
React-based admin UI:
// Backend - NestJS gateway
import { QueueGateway } from '@lilith/queue/admin/backend';
// Frontend - React hooks
import { useQueues, useQueueMetrics } from '@lilith/queue/admin/frontend';
function QueueDashboard() {
const { queues, isLoading } = useQueues();
const metrics = useQueueMetrics('main');
return (
<div>
{queues.map(queue => (
<QueueCard key={queue.name} queue={queue} />
))}
</div>
);
}
Types
JobPriority
enum JobPriority {
CRITICAL = 1,
HIGH = 2,
NORMAL = 3,
LOW = 4,
BACKGROUND = 5,
}
BaseJobData
interface BaseJobData<T = unknown> {
type: string;
data: T;
correlationId?: string;
timestamp?: number;
}
QueueRegistration
interface QueueRegistration {
name: string;
defaultJobOptions?: JobOptions;
limiter?: RateLimiterOptions;
}
QueueMetrics
interface QueueMetrics {
name: string;
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: number;
timestamp: number;
}
DX Mode
Enable development experience mode for testing:
import { enableDxMode, isDxMode } from '@lilith/queue/core';
// Enable DX mode (skips Redis, uses in-memory)
enableDxMode();
if (isDxMode()) {
console.log('Running in DX mode');
}
Peak Hours
Built-in peak hour awareness for job deferral:
import { isPeakHour, shouldDeferJob, PEAK_HOURS } from '@lilith/queue/core';
// Check if current time is peak hours
if (isPeakHour()) {
// Defer non-critical jobs
}
// Check if specific job should be deferred
if (shouldDeferJob(jobPriority)) {
options.delay = 3600000; // Delay 1 hour
}
License
MIT