No description
Find a file
2026-03-18 14:34:59 -07:00
.forgejo/workflows fix(ci): disable SSL verification for bun install (self-signed cert) 2026-01-30 15:16:57 -08:00
.githooks ci(git-hooks): 👷 Update pre-push hooks to enforce code quality checks before allowing pushes 2026-02-27 22:45:55 -08:00
admin chore(admin-frontend): 🔧 Update TypeScript compiler options and dependencies in admin-frontend tsconfig.json 2026-03-18 14:34:59 -07:00
bull-adapter chore(deps): 🔧 Update dependency JSON configuration files (7 files) 2026-02-01 23:46:32 -08:00
cli deps-upgrade(deps): ⬆️ Update dependencies to latest stable versions across root and CLI packages 2026-03-08 19:24:18 -07:00
core deps-upgrade: ⬆️ Upgrade dependencies in admin/backend, admin/frontend, bull-adapter, core, ml, nestjs, and reporting to latest stable versions 2026-01-23 07:19:03 -08:00
docs 📝 Update documentation to reflect @lilith/queue package structure 2025-12-30 20:28:34 -08:00
e2e 📝 Update documentation to reflect @lilith/queue package structure 2025-12-30 20:28:34 -08:00
ml chore(deps): 🔧 Update dependency JSON configuration files (7 files) 2026-02-01 23:46:32 -08:00
nestjs chore(deps): 🔧 Update dependency JSON configuration files (7 files) 2026-02-01 23:46:32 -08:00
reporting chore(deps): 🔧 Update dependency JSON configuration files (7 files) 2026-02-01 23:46:32 -08:00
.gitignore feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00
eslint.config.js chore(shared): 🔧 Hello! I'm a mock assistant responding to your message. 2026-01-05 12:19:15 -08:00
package.json chore: bump version to 1.3.29 2026-03-08 19:24:19 -07:00
README.md chore: trigger CI publish 2026-01-30 15:05:14 -08:00
tsconfig.base.json chore(config): 🔧 Update TypeScript config files across 8 modules 2026-01-21 12:54:46 -08:00
vitest.config.ts 🔧 Add unified TypeScript and Vitest configuration 2025-12-30 20:27:44 -08:00

@lilith/queue

Job queue ecosystem for NestJS with BullMQ: core types, NestJS integration, ML batching, reporting, and admin dashboard.

Features

  • Core Types: Shared types, constants, and utilities
  • NestJS Integration: Base classes for processors, schedulers, and services
  • ML Batching: Request and pipeline batching strategies for ML workloads
  • Reporting: Queue metrics and historical reporting
  • Bull Adapter: BullMQ adapter with NestJS module support
  • Admin Dashboard: React-based queue management UI

Installation

pnpm add @lilith/queue bullmq ioredis

Peer Dependencies (as needed)

# For NestJS integration
pnpm add @nestjs/common @nestjs/core @nestjs/bullmq

# For scheduling
pnpm add @nestjs/schedule cron

# For admin dashboard
pnpm add @nestjs/websockets @nestjs/platform-socket.io socket.io

# For reporting with TypeORM
pnpm add @nestjs/typeorm typeorm

Quick Start

import { Module } from '@nestjs/common';
import { QueueModule } from '@lilith/queue/nestjs';

@Module({
  imports: [
    QueueModule.forRoot({
      connection: { host: 'localhost', port: 6379 },
      enableScheduling: true,
    }),
  ],
})
export class AppModule {}

Subpath Exports

Core (@lilith/queue/core)

Types, constants, and utilities:

import {
  JobPriority,
  isPeakHour,
  shouldDeferJob,
  generateCorrelationId,
  DEFAULT_PRIORITY,
  DEFAULT_RETRY_ATTEMPTS,
} from '@lilith/queue/core';

// Check peak hours
if (isPeakHour()) {
  options.delay = calculateDelay();
}

// Generate tracking ID
const correlationId = generateCorrelationId('analytics');

NestJS (@lilith/queue/nestjs)

NestJS integration with base classes:

import {
  QueueModule,
  BaseProcessor,
  BaseScheduler,
  BaseJobService,
  QueueManagerService,
} from '@lilith/queue/nestjs';

// Processor with lifecycle hooks
@Processor('analytics')
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJob> {
  async process(job: Job<AnalyticsJob>) {
    // Process job
  }

  onCompleted(job: Job<AnalyticsJob>) {
    this.logger.log(`Job ${job.id} completed`);
  }
}

// Scheduler with peak-hour awareness
@Injectable()
export class ReportScheduler extends BaseScheduler {
  @Cron('0 */6 * * *')
  async generateReports() {
    if (this.shouldDeferDuringPeak()) {
      return this.deferJob(/* ... */);
    }
    // Generate reports
  }
}

// Type-safe job service
@Injectable()
export class AnalyticsJobService extends BaseJobService<AnalyticsJob> {
  queueName = 'analytics';

  async trackEvent(event: AnalyticsEvent) {
    return this.addJob('track', { event });
  }
}

ML (@lilith/queue/ml)

Batching strategies for ML workloads:

import {
  RequestBatchingStrategy,
  PipelineBatchingStrategy,
  BaseMLProcessor,
} from '@lilith/queue/ml';

// Request batching - collect multiple requests
const batching = new RequestBatchingStrategy({
  maxBatchSize: 32,
  maxWaitMs: 100,
  onBatch: async (items) => {
    return await model.batchInference(items);
  },
});

// Pipeline batching - stage-based processing
const pipeline = new PipelineBatchingStrategy({
  stages: ['preprocess', 'inference', 'postprocess'],
  batchSizes: { inference: 16 },
});

Reporting (@lilith/queue/reporting)

Queue metrics and reporting:

import {
  ReportingModule,
  QueueReportingService,
  QueueMetricsEntity,
} from '@lilith/queue/reporting';

// Get historical metrics
const metrics = await reportingService.getMetrics({
  queueName: 'analytics',
  from: startDate,
  to: endDate,
});

Bull Adapter (@lilith/queue/bull-adapter)

BullMQ adapter with NestJS module:

import { QueueModule, BaseQueueService } from '@lilith/queue/bull-adapter';

// Add jobs
await queueService.addJob<EmailPayload>('send-email', {
  to: 'user@example.com',
  subject: 'Hello',
});

// Get metrics
const metrics = await queueService.getMetrics();

// Queue control
await queueService.pause();
await queueService.resume();
await queueService.drain();

Admin Dashboard (@lilith/queue/admin)

React-based admin UI:

// Backend - NestJS gateway
import { QueueGateway } from '@lilith/queue/admin/backend';

// Frontend - React hooks
import { useQueues, useQueueMetrics } from '@lilith/queue/admin/frontend';

function QueueDashboard() {
  const { queues, isLoading } = useQueues();
  const metrics = useQueueMetrics('main');

  return (
    <div>
      {queues.map(queue => (
        <QueueCard key={queue.name} queue={queue} />
      ))}
    </div>
  );
}

Types

JobPriority

enum JobPriority {
  CRITICAL = 1,
  HIGH = 2,
  NORMAL = 3,
  LOW = 4,
  BACKGROUND = 5,
}

BaseJobData

interface BaseJobData<T = unknown> {
  type: string;
  data: T;
  correlationId?: string;
  timestamp?: number;
}

QueueRegistration

interface QueueRegistration {
  name: string;
  defaultJobOptions?: JobOptions;
  limiter?: RateLimiterOptions;
}

QueueMetrics

interface QueueMetrics {
  name: string;
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  paused: number;
  timestamp: number;
}

DX Mode

Enable development experience mode for testing:

import { enableDxMode, isDxMode } from '@lilith/queue/core';

// Enable DX mode (skips Redis, uses in-memory)
enableDxMode();

if (isDxMode()) {
  console.log('Running in DX mode');
}

Peak Hours

Built-in peak hour awareness for job deferral:

import { isPeakHour, shouldDeferJob, PEAK_HOURS } from '@lilith/queue/core';

// Check if current time is peak hours
if (isPeakHour()) {
  // Defer non-critical jobs
}

// Check if specific job should be deferred
if (shouldDeferJob(jobPriority)) {
  options.delay = 3600000; // Delay 1 hour
}

License

MIT

Test $(date +%s)