Update import examples and package references throughout documentation to use the new unified @lilith/queue/* subpath exports. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
25 KiB
Lilith Platform (egirl-platform) Queue Integration Guide
This guide covers integrating the @lilith/queue packages into the Lilith Platform services.
Table of Contents
- Package Overview
- Installation
- Module Registration
- Example Implementations
- Reporting Module Integration
- Admin Dashboard Integration
- Migration from Existing Queue Setup
- Service Integration Matrix
Package Overview
| Package | Purpose | Key Exports |
|---|---|---|
@lilith/queue/core |
Base types, priority enum, peak-hour utilities | JobPriority, isPeakHour, generateCorrelationId, BaseJobData, QueueRegistration |
@lilith/queue/nestjs |
NestJS module with base classes | QueueModule, BaseProcessor, BaseScheduler, BaseJobService, QueueManagerService |
@lilith/queue/ml |
ML batching strategies, 60s timeout | BaseMLProcessor, RequestBatchingStrategy, PipelineBatcher |
@lilith/queue/reporting |
TypeORM entities for job lifecycle persistence | ReportingModule, JobReporterService, JobAnalyticsService, JobEvent |
@lilith/queue/admin |
Backend API + React dashboard | QueueAdminModule, QueueAdminService, QueueAdminGateway |
Installation
# Core packages (required)
pnpm add @lilith/queue/core @lilith/queue/nestjs
# ML batching (for chatbot, TTS, translation services)
pnpm add @lilith/queue/ml
# Job analytics and persistence (recommended for production)
pnpm add @lilith/queue/reporting
# Admin dashboard (optional, for queue management UI)
pnpm add @lilith/queue/admin
Module Registration
Step 1: Replace Existing QueueModule
Replace the current /var/home/lilith/Code/@applications/@egirl/egirl-platform/@services/platform/src/shared/queue/queue.module.ts:
// src/shared/queue/queue.module.ts
import { Module, Global } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { QueueModule as TransquinnQueueModule } from '@lilith/queue/nestjs';
import { ReportingModule } from '@lilith/queue/reporting';
// Import queue registrations from features
import { ANALYTICS_QUEUE } from '../../features/analytics/queue/analytics.queue';
import { CHATBOT_QUEUE } from '../../features/chatbot/queue/chatbot.queue';
import { FULFILLMENT_QUEUE } from '../../features/fulfillment/queue/fulfillment.queue';
import { CONTENT_QUEUE } from '../../features/content/queue/content.queue';
@Global()
@Module({
imports: [
// Root queue configuration
TransquinnQueueModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
connection: {
host: config.get('REDISHOST', 'localhost'),
port: config.get('REDISPORT', 6379),
password: config.get('REDISPASSWORD'),
},
defaultJobOptions: {
removeOnComplete: { count: 1000, age: 3600 * 24 }, // Keep 1000 or 24h
removeOnFail: false, // Keep failed jobs for debugging
},
enableScheduling: true,
}),
}),
// Job lifecycle persistence (optional but recommended)
ReportingModule.forRoot(),
],
exports: [TransquinnQueueModule],
})
export class QueueModule {}
Step 2: Define Queue Registrations
Create queue registration files in each feature that needs a queue:
// src/features/analytics/queue/analytics.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
export const ANALYTICS_QUEUE: QueueRegistration = {
name: 'analytics',
owner: 'features/analytics',
jobTypes: [
'process-view',
'process-engagement',
'process-revenue',
'aggregate-hourly',
'aggregate-daily',
],
description: 'Analytics event processing and aggregation',
config: {
concurrency: 10,
peakAvoidance: {
enabled: true,
peakHoursUtc: [16, 17, 18, 19, 20, 21], // 4pm-9pm UTC
bypassPriority: JobPriority.HIGH,
},
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
},
};
Step 3: Register Feature Queues
In each feature module that uses queues:
// src/features/analytics/analytics.module.ts
import { Module } from '@nestjs/common';
import { QueueModule } from '@lilith/queue/nestjs';
import { ANALYTICS_QUEUE } from './queue/analytics.queue';
import { AnalyticsProcessor } from './queue/analytics.processor';
import { AnalyticsScheduler } from './queue/analytics.scheduler';
import { AnalyticsJobService } from './queue/analytics-job.service';
@Module({
imports: [
QueueModule.forFeature({
registration: ANALYTICS_QUEUE,
processor: AnalyticsProcessor,
scheduler: AnalyticsScheduler,
jobService: AnalyticsJobService,
}),
],
providers: [AnalyticsProcessor, AnalyticsScheduler, AnalyticsJobService],
exports: [AnalyticsJobService],
})
export class AnalyticsModule {}
Example Implementations
Chatbot Service - ML Processor
For LLM inference batching and TTS generation with 60s timeout:
// src/features/chatbot/queue/chatbot.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
export const CHATBOT_QUEUE: QueueRegistration = {
name: 'chatbot',
owner: 'features/chatbot',
jobTypes: ['llm-inference', 'tts-generation', 'conversation-context'],
description: 'Chatbot LLM and TTS processing',
config: {
concurrency: 5, // Limited by ML service capacity
peakAvoidance: { enabled: false }, // Real-time user interaction
defaultJobOptions: {
attempts: 2,
timeout: 60000, // 60s ML timeout
},
},
};
// src/features/chatbot/queue/chatbot.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import type { Job } from 'bullmq';
import { BaseMLProcessor, MLJobData } from '@lilith/queue/ml';
import { JobReporterService } from '@lilith/queue/reporting';
interface ChatbotJobData extends MLJobData {
conversationId: string;
message: string;
userId: string;
voice?: string; // For TTS
}
interface LLMResponse {
response: string;
tokensUsed: number;
}
interface TTSResponse {
audioUrl: string;
durationMs: number;
}
@Processor('chatbot')
export class ChatbotProcessor extends BaseMLProcessor<ChatbotJobData, LLMResponse | TTSResponse> {
protected readonly logger = new Logger(ChatbotProcessor.name);
protected readonly queueName = 'chatbot';
constructor(reporterService: JobReporterService) {
super({
endpoint: process.env.ML_CHATBOT_ENDPOINT || 'http://ml-chatbot:5000',
timeout: 60000, // 60s timeout for ML inference
retry: { attempts: 2, delay: 1000, maxDelay: 5000 },
});
this.reporter = reporterService;
}
protected async handleMLJob(job: Job<ChatbotJobData>): Promise<LLMResponse | TTSResponse> {
const { conversationId, message, voice } = job.data;
if (job.name === 'llm-inference') {
await this.updateProgress(job, 10, 'Sending to LLM');
const response = await this.callMLService<
{ conversationId: string; message: string },
LLMResponse
>('/v1/chat', { conversationId, message });
await this.updateProgress(job, 90, 'LLM response received');
return response;
}
if (job.name === 'tts-generation') {
await this.updateProgress(job, 10, 'Generating TTS audio');
const response = await this.callMLService<
{ text: string; voice: string },
TTSResponse
>('/v1/tts', { text: message, voice: voice || 'default' });
await this.updateProgress(job, 90, 'TTS audio generated');
return response;
}
throw new Error(`Unknown job type: ${job.name}`);
}
}
// src/features/chatbot/queue/chatbot-job.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import type { Queue } from 'bullmq';
import { BaseJobService, AddJobResult } from '@lilith/queue/nestjs';
import { JobPriority, BaseJobData } from '@lilith/queue/core';
interface ChatbotJobData extends BaseJobData {
conversationId: string;
message: string;
userId: string;
voice?: string;
input: unknown;
}
@Injectable()
export class ChatbotJobService extends BaseJobService<ChatbotJobData> {
protected readonly logger = new Logger(ChatbotJobService.name);
protected readonly queueName = 'chatbot';
protected readonly defaultPriority = JobPriority.HIGH; // User-facing, high priority
constructor(@InjectQueue('chatbot') queue: Queue) {
super(queue);
}
/**
* Queue an LLM inference request.
*/
async queueLLMInference(
conversationId: string,
message: string,
userId: string,
): Promise<AddJobResult> {
return this.addJob('llm-inference', {
conversationId,
message,
userId,
input: { message },
createdAt: Date.now(),
}, {
priority: JobPriority.URGENT, // Real-time user interaction
applyPeakAvoidance: false,
});
}
/**
* Queue a TTS generation request.
*/
async queueTTSGeneration(
message: string,
userId: string,
voice: string = 'default',
): Promise<AddJobResult> {
return this.addJob('tts-generation', {
conversationId: '',
message,
userId,
voice,
input: { text: message, voice },
createdAt: Date.now(),
}, {
priority: JobPriority.HIGH,
});
}
}
Content Publisher - Scheduler
Replace @Cron with BaseScheduler for content publishing:
// src/features/content/queue/content.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
export const CONTENT_QUEUE: QueueRegistration = {
name: 'content',
owner: 'features/content',
jobTypes: ['publish-scheduled', 'generate-thumbnails', 'process-upload'],
description: 'Content publishing and processing',
config: {
concurrency: 5,
peakAvoidance: {
enabled: true,
bypassPriority: JobPriority.HIGH,
},
},
};
// src/features/content/queue/content.scheduler.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { SchedulerRegistry } from '@nestjs/schedule';
import type { Queue } from 'bullmq';
import { BaseScheduler } from '@lilith/queue/nestjs';
@Injectable()
export class ContentScheduler extends BaseScheduler {
protected readonly logger = new Logger(ContentScheduler.name);
constructor(
schedulerRegistry: SchedulerRegistry,
@InjectQueue('content') private readonly contentQueue: Queue,
) {
super(schedulerRegistry);
this.registerQueue('content', contentQueue);
}
override onModuleInit(): void {
super.onModuleInit();
// Publish scheduled content every 5 minutes
this.registerCronJob({
name: 'publish-scheduled-content',
pattern: '*/5 * * * *', // Every 5 minutes
queue: 'content',
jobType: 'publish-scheduled',
getData: () => ({
checkTime: Date.now(),
source: 'scheduler',
createdAt: Date.now(),
}),
skipDuringPeak: false, // Publishing should happen on schedule
});
// Generate thumbnails for pending content (daily, off-peak)
this.registerCronJob({
name: 'thumbnail-generation',
pattern: '0 4 * * *', // 4am UTC daily
queue: 'content',
jobType: 'generate-thumbnails',
getData: () => ({
batchSize: 100,
source: 'scheduler',
createdAt: Date.now(),
}),
skipDuringPeak: true, // Skip during peak hours
});
// SEO content regeneration (weekly)
this.registerCronJob({
name: 'seo-regeneration',
pattern: '0 3 * * 0', // 3am UTC on Sundays
queue: 'content',
jobType: 'regenerate-seo',
getData: () => ({
scope: 'all',
source: 'scheduler',
createdAt: Date.now(),
}),
skipDuringPeak: true,
});
}
}
// src/features/content/queue/content.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Logger, Injectable } from '@nestjs/common';
import type { Job } from 'bullmq';
import { BaseProcessor } from '@lilith/queue/nestjs';
import { BaseJobData } from '@lilith/queue/core';
import { JobReporterService } from '@lilith/queue/reporting';
interface ContentJobData extends BaseJobData {
checkTime?: number;
batchSize?: number;
scope?: string;
}
@Processor('content')
export class ContentProcessor extends BaseProcessor<ContentJobData, void> {
protected readonly logger = new Logger(ContentProcessor.name);
protected readonly queueName = 'content';
constructor(
private readonly reporterService: JobReporterService,
// Inject your content services here
) {
super();
this.reporter = reporterService;
}
protected async handleJob(job: Job<ContentJobData>): Promise<void> {
switch (job.name) {
case 'publish-scheduled':
await this.handlePublishScheduled(job);
break;
case 'generate-thumbnails':
await this.handleGenerateThumbnails(job);
break;
case 'regenerate-seo':
await this.handleRegenerateSeo(job);
break;
default:
this.logger.warn(`Unknown job type: ${job.name}`);
}
}
private async handlePublishScheduled(job: Job<ContentJobData>): Promise<void> {
await this.updateProgress(job, 0, 'Finding scheduled content');
// Find content scheduled for publication
// const scheduledItems = await this.contentService.findScheduledForPublish();
await this.updateProgress(job, 50, 'Publishing content');
// for (const item of scheduledItems) {
// await this.contentService.publish(item.id);
// }
await this.updateProgress(job, 100, 'Publication complete');
}
private async handleGenerateThumbnails(job: Job<ContentJobData>): Promise<void> {
const { batchSize = 100 } = job.data;
await this.updateProgress(job, 0, `Processing batch of ${batchSize}`);
// Generate thumbnails for pending content
// await this.thumbnailService.generatePending(batchSize);
await this.updateProgress(job, 100, 'Thumbnail generation complete');
}
private async handleRegenerateSeo(job: Job<ContentJobData>): Promise<void> {
await this.updateProgress(job, 0, 'Regenerating SEO content');
// Regenerate SEO metadata
// await this.seoService.regenerateAll();
await this.updateProgress(job, 100, 'SEO regeneration complete');
}
}
Fulfillment Service - Standard Processor
For S3 presigned URLs and digital delivery:
// src/features/fulfillment/queue/fulfillment.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';
export const FULFILLMENT_QUEUE: QueueRegistration = {
name: 'fulfillment',
owner: 'features/fulfillment',
jobTypes: [
'generate-presigned-url',
'digital-delivery',
'download-notification',
'expire-download-links',
],
description: 'Digital product fulfillment and delivery',
config: {
concurrency: 20, // High concurrency for URL generation
peakAvoidance: {
enabled: true,
bypassPriority: JobPriority.HIGH,
},
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
},
},
};
// src/features/fulfillment/queue/fulfillment.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import type { Job } from 'bullmq';
import { BaseProcessor } from '@lilith/queue/nestjs';
import { BaseJobData } from '@lilith/queue/core';
import { JobReporterService } from '@lilith/queue/reporting';
interface FulfillmentJobData extends BaseJobData {
orderId: string;
productId: string;
userId: string;
assetKey?: string;
expiresIn?: number;
}
interface PresignedUrlResult {
url: string;
expiresAt: number;
}
@Processor('fulfillment')
export class FulfillmentProcessor extends BaseProcessor<FulfillmentJobData, PresignedUrlResult | void> {
protected readonly logger = new Logger(FulfillmentProcessor.name);
protected readonly queueName = 'fulfillment';
constructor(
private readonly reporterService: JobReporterService,
// Inject S3 and notification services
) {
super();
this.reporter = reporterService;
}
protected async handleJob(job: Job<FulfillmentJobData>): Promise<PresignedUrlResult | void> {
switch (job.name) {
case 'generate-presigned-url':
return this.handleGeneratePresignedUrl(job);
case 'digital-delivery':
return this.handleDigitalDelivery(job);
case 'download-notification':
return this.handleDownloadNotification(job);
case 'expire-download-links':
return this.handleExpireLinks(job);
default:
throw new Error(`Unknown job type: ${job.name}`);
}
}
private async handleGeneratePresignedUrl(
job: Job<FulfillmentJobData>,
): Promise<PresignedUrlResult> {
const { assetKey, expiresIn = 3600 } = job.data;
await this.updateProgress(job, 10, 'Generating presigned URL');
// Generate S3 presigned URL
// const url = await this.s3Service.getPresignedUrl(assetKey, expiresIn);
const url = `https://cdn.example.com/presigned/${assetKey}`;
const expiresAt = Date.now() + expiresIn * 1000;
await this.updateProgress(job, 100, 'URL generated');
return { url, expiresAt };
}
private async handleDigitalDelivery(job: Job<FulfillmentJobData>): Promise<void> {
const { orderId, userId, productId } = job.data;
await this.updateProgress(job, 10, 'Processing digital delivery');
// Create download record
// await this.deliveryService.createDelivery(orderId, productId, userId);
await this.updateProgress(job, 50, 'Sending notification');
// Send email with download link
// await this.notificationService.sendDeliveryEmail(userId, orderId);
await this.updateProgress(job, 100, 'Delivery complete');
}
private async handleDownloadNotification(job: Job<FulfillmentJobData>): Promise<void> {
// Track that user downloaded the file
this.logger.log(`Download tracked for order ${job.data.orderId}`);
}
private async handleExpireLinks(job: Job<FulfillmentJobData>): Promise<void> {
// Expire old download links
// await this.deliveryService.expireOldLinks();
this.logger.log('Expired old download links');
}
}
Reporting Module Integration
Step 1: Add to Root Module
The ReportingModule should be added to your root queue module (shown above in Module Registration).
Step 2: Database Migration
Run the TypeORM migration for the job_events table:
# Generate migration (if using TypeORM CLI)
pnpm typeorm migration:run
# Or use the provided migration
# Copy from: @lilith/queue/reporting/migrations/1700000000000-CreateJobEventsTable.ts
Step 3: Inject JobReporterService in Processors
import { JobReporterService } from '@lilith/queue/reporting';
@Processor('my-queue')
export class MyProcessor extends BaseProcessor<MyJobData> {
constructor(private readonly reporterService: JobReporterService) {
super();
this.reporter = reporterService; // Enables automatic lifecycle logging
}
}
Step 4: Query Analytics
// In a controller or service
import { JobAnalyticsService } from '@lilith/queue/reporting';
@Controller('admin/queue-analytics')
export class QueueAnalyticsController {
constructor(private readonly analytics: JobAnalyticsService) {}
@Get('health/:queue')
async getQueueHealth(@Param('queue') queue: string) {
return this.analytics.getQueueHealth(queue);
}
@Get('events/:queue')
async getEvents(
@Param('queue') queue: string,
@Query('limit') limit = 100,
@Query('jobType') jobType?: string,
) {
return this.analytics.getEventsByQueue({
queue,
limit,
jobType,
});
}
}
Admin Dashboard Integration
Step 1: Add QueueAdminModule
// src/app.module.ts (or a dedicated admin module)
import { QueueAdminModule } from '@lilith/queue/admin';
@Module({
imports: [
// ... other imports
QueueAdminModule, // Adds REST API + WebSocket gateway
],
})
export class AppModule {}
Step 2: Available Endpoints
The QueueAdminModule provides:
REST API (prefix: /admin)
GET /admin/queues- List all queuesGET /admin/queues/:name- Get queue detailsGET /admin/queues/:name/metrics- Get queue metricsPOST /admin/queues/:name/pause- Pause queuePOST /admin/queues/:name/resume- Resume queueDELETE /admin/queues/:name/clean- Clean queue
Job Management
GET /admin/queues/:name/jobs- List jobsGET /admin/queues/:name/jobs/:jobId- Get job detailsPOST /admin/queues/:name/jobs/:jobId/retry- Retry jobDELETE /admin/queues/:name/jobs/:jobId- Remove job
Dead Letter Queue
GET /admin/queues/:name/dlq- List failed jobsPOST /admin/queues/:name/dlq/:jobId/retry- Retry failed jobDELETE /admin/queues/:name/dlq/:jobId- Remove failed job
Step 3: WebSocket Real-Time Updates
Connect to the WebSocket gateway at /queue-admin:
// Frontend example using socket.io-client
import { io } from 'socket.io-client';
const socket = io('/queue-admin');
// Subscribe to queue metrics
socket.emit('subscribeQueue', 'analytics');
// Listen for real-time updates
socket.on('queueMetrics', (metrics) => {
console.log('Queue metrics:', metrics);
});
socket.on('jobEvent', (event) => {
console.log('Job event:', event);
});
Step 4: Frontend Dashboard
The @lilith/queue/admin package includes React components:
// Import dashboard components
import {
QueueDashboard,
QueueList,
QueueCard,
JobsTable,
DLQManager,
JobDetailsModal,
} from '@lilith/queue/admin/frontend';
// Use in your admin UI
function AdminPage() {
return (
<QueueDashboard
apiBaseUrl="/admin"
wsEndpoint="/queue-admin"
/>
);
}
Migration from Existing Queue Setup
Current State Analysis
The existing setup at @services/platform/src/shared/queue/ uses:
BullModule.forRootAsyncfor Redis connectionBullModule.registerQueuefor individual queues- Custom
QueueServicewrapper QUEUENAMESandJOBTYPESconstants
Migration Steps
- Keep existing constants - The
QUEUENAMESandJOBTYPEScan be migrated toQueueRegistrationobjects - Update module imports - Replace direct BullMQ imports with
@lilith/queue/nestjs - Convert processors - Extend
BaseProcessorinstead ofWorkerHost - Add schedulers - Replace
@Crondecorators withBaseScheduler
Example Migration
Before:
// queue.constants.ts
export const QUEUENAMES = {
ANALYTICS: 'analytics',
};
export const JOBTYPES = {
PROCESSVIEW: 'process-view',
};
After:
// features/analytics/queue/analytics.queue.ts
import { QueueRegistration } from '@lilith/queue/core';
export const ANALYTICS_QUEUE: QueueRegistration = {
name: 'analytics',
owner: 'features/analytics',
jobTypes: ['process-view', 'process-engagement'],
config: { concurrency: 10 },
};
// Constants can still be exported for backward compatibility
export const QUEUENAMES = { ANALYTICS: ANALYTICS_QUEUE.name };
export const JOBTYPES = { PROCESSVIEW: 'process-view' };
Service Integration Matrix
| Service | Package | Priority | Use Case |
|---|---|---|---|
| Chatbot | queue-ml |
HIGH | LLM inference batching, TTS generation |
| Fulfillment | queue-nestjs |
HIGH | S3 presigned URLs, digital delivery |
| Payments | queue-nestjs |
HIGH | Payment gateway retries, status tracking |
| Crypto | queue-nestjs |
HIGH | Blockchain monitoring, confirmation polling |
| Events | queue-nestjs |
HIGH | Chaturbate webhook fan-out |
| Notifications | queue-nestjs |
HIGH | WebSocket broadcast decoupling |
| Content Publisher | queue-nestjs |
MEDIUM | Replace @Cron with BaseScheduler |
| Analytics | queue-nestjs |
NORMAL | Event aggregation, daily reports |
| Translations | queue-ml |
NORMAL | ML translation batching |
| SEO Generation | queue-ml |
LOW | Background SEO content generation |
Priority Guidelines
| Priority | Value | Use Case |
|---|---|---|
URGENT |
1 | User-blocking, real-time operations |
HIGH |
5 | Important user-facing, payment processing |
NORMAL |
10 | Standard background processing |
LOW |
20 | Can wait, non-critical |
BATCH |
50 | Bulk operations, always deferred in peak |
Peak Hour Behavior
- Peak Hours: 4pm-9pm UTC (configurable per queue)
- URGENT/HIGH: Always processed immediately
- NORMAL/LOW/BATCH: Deferred during peak hours
- Override: Set
applyPeakAvoidance: falsewhen adding jobs
Next Steps
- Install packages in
@services/platform - Create queue registrations for each feature
- Migrate existing processors to extend
BaseProcessor - Replace
@Cronjobs withBaseScheduler - Add
ReportingModulefor job analytics - Optionally add admin dashboard for queue management
For questions or issues, refer to the package source at /var/home/lilith/Code/@packages/@queue/.