# Lilith Platform (egirl-platform) Queue Integration Guide This guide covers integrating the `@lilith/queue` packages into the Lilith Platform services. ## Table of Contents 1. [Package Overview](#package-overview) 2. [Installation](#installation) 3. [Module Registration](#module-registration) 4. [Example Implementations](#example-implementations) - [Chatbot Service (ML Processor)](#chatbot-service---ml-processor) - [Content Publisher (Scheduler)](#content-publisher---scheduler) - [Fulfillment Service (Standard Processor)](#fulfillment-service---standard-processor) 5. [Reporting Module Integration](#reporting-module-integration) 6. [Admin Dashboard Integration](#admin-dashboard-integration) 7. [Migration from Existing Queue Setup](#migration-from-existing-queue-setup) 8. [Service Integration Matrix](#service-integration-matrix) --- ## Package Overview | Package | Purpose | Key Exports | |---------|---------|-------------| | `@lilith/queue/core` | Base types, priority enum, peak-hour utilities | `JobPriority`, `isPeakHour`, `generateCorrelationId`, `BaseJobData`, `QueueRegistration` | | `@lilith/queue/nestjs` | NestJS module with base classes | `QueueModule`, `BaseProcessor`, `BaseScheduler`, `BaseJobService`, `QueueManagerService` | | `@lilith/queue/ml` | ML batching strategies, 60s timeout | `BaseMLProcessor`, `RequestBatchingStrategy`, `PipelineBatcher` | | `@lilith/queue/reporting` | TypeORM entities for job lifecycle persistence | `ReportingModule`, `JobReporterService`, `JobAnalyticsService`, `JobEvent` | | `@lilith/queue/admin` | Backend API + React dashboard | `QueueAdminModule`, `QueueAdminService`, `QueueAdminGateway` | --- ## Installation ```bash # Core packages (required) pnpm add @lilith/queue/core @lilith/queue/nestjs # ML batching (for chatbot, TTS, translation services) pnpm add @lilith/queue/ml # Job analytics and persistence (recommended for production) pnpm add @lilith/queue/reporting # Admin dashboard (optional, for queue management UI) pnpm add @lilith/queue/admin ``` --- ## Module Registration ### Step 1: Replace Existing QueueModule Replace the current `/var/home/lilith/Code/@applications/@egirl/egirl-platform/@services/platform/src/shared/queue/queue.module.ts`: ```typescript // src/shared/queue/queue.module.ts import { Module, Global } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { QueueModule as TransquinnQueueModule } from '@lilith/queue/nestjs'; import { ReportingModule } from '@lilith/queue/reporting'; // Import queue registrations from features import { ANALYTICS_QUEUE } from '../../features/analytics/queue/analytics.queue'; import { CHATBOT_QUEUE } from '../../features/chatbot/queue/chatbot.queue'; import { FULFILLMENT_QUEUE } from '../../features/fulfillment/queue/fulfillment.queue'; import { CONTENT_QUEUE } from '../../features/content/queue/content.queue'; @Global() @Module({ imports: [ // Root queue configuration TransquinnQueueModule.forRootAsync({ inject: [ConfigService], useFactory: (config: ConfigService) => ({ connection: { host: config.get('REDISHOST', 'localhost'), port: config.get('REDISPORT', 6379), password: config.get('REDISPASSWORD'), }, defaultJobOptions: { removeOnComplete: { count: 1000, age: 3600 * 24 }, // Keep 1000 or 24h removeOnFail: false, // Keep failed jobs for debugging }, enableScheduling: true, }), }), // Job lifecycle persistence (optional but recommended) ReportingModule.forRoot(), ], exports: [TransquinnQueueModule], }) export class QueueModule {} ``` ### Step 2: Define Queue Registrations Create queue registration files in each feature that needs a queue: ```typescript // src/features/analytics/queue/analytics.queue.ts import { QueueRegistration, JobPriority } from '@lilith/queue/core'; export const ANALYTICS_QUEUE: QueueRegistration = { name: 'analytics', owner: 'features/analytics', jobTypes: [ 'process-view', 'process-engagement', 'process-revenue', 'aggregate-hourly', 'aggregate-daily', ], description: 'Analytics event processing and aggregation', config: { concurrency: 10, peakAvoidance: { enabled: true, peakHoursUtc: [16, 17, 18, 19, 20, 21], // 4pm-9pm UTC bypassPriority: JobPriority.HIGH, }, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, }, }, }; ``` ### Step 3: Register Feature Queues In each feature module that uses queues: ```typescript // src/features/analytics/analytics.module.ts import { Module } from '@nestjs/common'; import { QueueModule } from '@lilith/queue/nestjs'; import { ANALYTICS_QUEUE } from './queue/analytics.queue'; import { AnalyticsProcessor } from './queue/analytics.processor'; import { AnalyticsScheduler } from './queue/analytics.scheduler'; import { AnalyticsJobService } from './queue/analytics-job.service'; @Module({ imports: [ QueueModule.forFeature({ registration: ANALYTICS_QUEUE, processor: AnalyticsProcessor, scheduler: AnalyticsScheduler, jobService: AnalyticsJobService, }), ], providers: [AnalyticsProcessor, AnalyticsScheduler, AnalyticsJobService], exports: [AnalyticsJobService], }) export class AnalyticsModule {} ``` --- ## Example Implementations ### Chatbot Service - ML Processor For LLM inference batching and TTS generation with 60s timeout: ```typescript // src/features/chatbot/queue/chatbot.queue.ts import { QueueRegistration, JobPriority } from '@lilith/queue/core'; export const CHATBOT_QUEUE: QueueRegistration = { name: 'chatbot', owner: 'features/chatbot', jobTypes: ['llm-inference', 'tts-generation', 'conversation-context'], description: 'Chatbot LLM and TTS processing', config: { concurrency: 5, // Limited by ML service capacity peakAvoidance: { enabled: false }, // Real-time user interaction defaultJobOptions: { attempts: 2, timeout: 60000, // 60s ML timeout }, }, }; ``` ```typescript // src/features/chatbot/queue/chatbot.processor.ts import { Processor } from '@nestjs/bullmq'; import { Logger } from '@nestjs/common'; import type { Job } from 'bullmq'; import { BaseMLProcessor, MLJobData } from '@lilith/queue/ml'; import { JobReporterService } from '@lilith/queue/reporting'; interface ChatbotJobData extends MLJobData { conversationId: string; message: string; userId: string; voice?: string; // For TTS } interface LLMResponse { response: string; tokensUsed: number; } interface TTSResponse { audioUrl: string; durationMs: number; } @Processor('chatbot') export class ChatbotProcessor extends BaseMLProcessor { protected readonly logger = new Logger(ChatbotProcessor.name); protected readonly queueName = 'chatbot'; constructor(reporterService: JobReporterService) { super({ endpoint: process.env.ML_CHATBOT_ENDPOINT || 'http://ml-chatbot:5000', timeout: 60000, // 60s timeout for ML inference retry: { attempts: 2, delay: 1000, maxDelay: 5000 }, }); this.reporter = reporterService; } protected async handleMLJob(job: Job): Promise { const { conversationId, message, voice } = job.data; if (job.name === 'llm-inference') { await this.updateProgress(job, 10, 'Sending to LLM'); const response = await this.callMLService< { conversationId: string; message: string }, LLMResponse >('/v1/chat', { conversationId, message }); await this.updateProgress(job, 90, 'LLM response received'); return response; } if (job.name === 'tts-generation') { await this.updateProgress(job, 10, 'Generating TTS audio'); const response = await this.callMLService< { text: string; voice: string }, TTSResponse >('/v1/tts', { text: message, voice: voice || 'default' }); await this.updateProgress(job, 90, 'TTS audio generated'); return response; } throw new Error(`Unknown job type: ${job.name}`); } } ``` ```typescript // src/features/chatbot/queue/chatbot-job.service.ts import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import type { Queue } from 'bullmq'; import { BaseJobService, AddJobResult } from '@lilith/queue/nestjs'; import { JobPriority, BaseJobData } from '@lilith/queue/core'; interface ChatbotJobData extends BaseJobData { conversationId: string; message: string; userId: string; voice?: string; input: unknown; } @Injectable() export class ChatbotJobService extends BaseJobService { protected readonly logger = new Logger(ChatbotJobService.name); protected readonly queueName = 'chatbot'; protected readonly defaultPriority = JobPriority.HIGH; // User-facing, high priority constructor(@InjectQueue('chatbot') queue: Queue) { super(queue); } /** * Queue an LLM inference request. */ async queueLLMInference( conversationId: string, message: string, userId: string, ): Promise { return this.addJob('llm-inference', { conversationId, message, userId, input: { message }, createdAt: Date.now(), }, { priority: JobPriority.URGENT, // Real-time user interaction applyPeakAvoidance: false, }); } /** * Queue a TTS generation request. */ async queueTTSGeneration( message: string, userId: string, voice: string = 'default', ): Promise { return this.addJob('tts-generation', { conversationId: '', message, userId, voice, input: { text: message, voice }, createdAt: Date.now(), }, { priority: JobPriority.HIGH, }); } } ``` --- ### Content Publisher - Scheduler Replace `@Cron` with `BaseScheduler` for content publishing: ```typescript // src/features/content/queue/content.queue.ts import { QueueRegistration, JobPriority } from '@lilith/queue/core'; export const CONTENT_QUEUE: QueueRegistration = { name: 'content', owner: 'features/content', jobTypes: ['publish-scheduled', 'generate-thumbnails', 'process-upload'], description: 'Content publishing and processing', config: { concurrency: 5, peakAvoidance: { enabled: true, bypassPriority: JobPriority.HIGH, }, }, }; ``` ```typescript // src/features/content/queue/content.scheduler.ts import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import { SchedulerRegistry } from '@nestjs/schedule'; import type { Queue } from 'bullmq'; import { BaseScheduler } from '@lilith/queue/nestjs'; @Injectable() export class ContentScheduler extends BaseScheduler { protected readonly logger = new Logger(ContentScheduler.name); constructor( schedulerRegistry: SchedulerRegistry, @InjectQueue('content') private readonly contentQueue: Queue, ) { super(schedulerRegistry); this.registerQueue('content', contentQueue); } override onModuleInit(): void { super.onModuleInit(); // Publish scheduled content every 5 minutes this.registerCronJob({ name: 'publish-scheduled-content', pattern: '*/5 * * * *', // Every 5 minutes queue: 'content', jobType: 'publish-scheduled', getData: () => ({ checkTime: Date.now(), source: 'scheduler', createdAt: Date.now(), }), skipDuringPeak: false, // Publishing should happen on schedule }); // Generate thumbnails for pending content (daily, off-peak) this.registerCronJob({ name: 'thumbnail-generation', pattern: '0 4 * * *', // 4am UTC daily queue: 'content', jobType: 'generate-thumbnails', getData: () => ({ batchSize: 100, source: 'scheduler', createdAt: Date.now(), }), skipDuringPeak: true, // Skip during peak hours }); // SEO content regeneration (weekly) this.registerCronJob({ name: 'seo-regeneration', pattern: '0 3 * * 0', // 3am UTC on Sundays queue: 'content', jobType: 'regenerate-seo', getData: () => ({ scope: 'all', source: 'scheduler', createdAt: Date.now(), }), skipDuringPeak: true, }); } } ``` ```typescript // src/features/content/queue/content.processor.ts import { Processor } from '@nestjs/bullmq'; import { Logger, Injectable } from '@nestjs/common'; import type { Job } from 'bullmq'; import { BaseProcessor } from '@lilith/queue/nestjs'; import { BaseJobData } from '@lilith/queue/core'; import { JobReporterService } from '@lilith/queue/reporting'; interface ContentJobData extends BaseJobData { checkTime?: number; batchSize?: number; scope?: string; } @Processor('content') export class ContentProcessor extends BaseProcessor { protected readonly logger = new Logger(ContentProcessor.name); protected readonly queueName = 'content'; constructor( private readonly reporterService: JobReporterService, // Inject your content services here ) { super(); this.reporter = reporterService; } protected async handleJob(job: Job): Promise { switch (job.name) { case 'publish-scheduled': await this.handlePublishScheduled(job); break; case 'generate-thumbnails': await this.handleGenerateThumbnails(job); break; case 'regenerate-seo': await this.handleRegenerateSeo(job); break; default: this.logger.warn(`Unknown job type: ${job.name}`); } } private async handlePublishScheduled(job: Job): Promise { await this.updateProgress(job, 0, 'Finding scheduled content'); // Find content scheduled for publication // const scheduledItems = await this.contentService.findScheduledForPublish(); await this.updateProgress(job, 50, 'Publishing content'); // for (const item of scheduledItems) { // await this.contentService.publish(item.id); // } await this.updateProgress(job, 100, 'Publication complete'); } private async handleGenerateThumbnails(job: Job): Promise { const { batchSize = 100 } = job.data; await this.updateProgress(job, 0, `Processing batch of ${batchSize}`); // Generate thumbnails for pending content // await this.thumbnailService.generatePending(batchSize); await this.updateProgress(job, 100, 'Thumbnail generation complete'); } private async handleRegenerateSeo(job: Job): Promise { await this.updateProgress(job, 0, 'Regenerating SEO content'); // Regenerate SEO metadata // await this.seoService.regenerateAll(); await this.updateProgress(job, 100, 'SEO regeneration complete'); } } ``` --- ### Fulfillment Service - Standard Processor For S3 presigned URLs and digital delivery: ```typescript // src/features/fulfillment/queue/fulfillment.queue.ts import { QueueRegistration, JobPriority } from '@lilith/queue/core'; export const FULFILLMENT_QUEUE: QueueRegistration = { name: 'fulfillment', owner: 'features/fulfillment', jobTypes: [ 'generate-presigned-url', 'digital-delivery', 'download-notification', 'expire-download-links', ], description: 'Digital product fulfillment and delivery', config: { concurrency: 20, // High concurrency for URL generation peakAvoidance: { enabled: true, bypassPriority: JobPriority.HIGH, }, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, }, }, }; ``` ```typescript // src/features/fulfillment/queue/fulfillment.processor.ts import { Processor } from '@nestjs/bullmq'; import { Logger } from '@nestjs/common'; import type { Job } from 'bullmq'; import { BaseProcessor } from '@lilith/queue/nestjs'; import { BaseJobData } from '@lilith/queue/core'; import { JobReporterService } from '@lilith/queue/reporting'; interface FulfillmentJobData extends BaseJobData { orderId: string; productId: string; userId: string; assetKey?: string; expiresIn?: number; } interface PresignedUrlResult { url: string; expiresAt: number; } @Processor('fulfillment') export class FulfillmentProcessor extends BaseProcessor { protected readonly logger = new Logger(FulfillmentProcessor.name); protected readonly queueName = 'fulfillment'; constructor( private readonly reporterService: JobReporterService, // Inject S3 and notification services ) { super(); this.reporter = reporterService; } protected async handleJob(job: Job): Promise { switch (job.name) { case 'generate-presigned-url': return this.handleGeneratePresignedUrl(job); case 'digital-delivery': return this.handleDigitalDelivery(job); case 'download-notification': return this.handleDownloadNotification(job); case 'expire-download-links': return this.handleExpireLinks(job); default: throw new Error(`Unknown job type: ${job.name}`); } } private async handleGeneratePresignedUrl( job: Job, ): Promise { const { assetKey, expiresIn = 3600 } = job.data; await this.updateProgress(job, 10, 'Generating presigned URL'); // Generate S3 presigned URL // const url = await this.s3Service.getPresignedUrl(assetKey, expiresIn); const url = `https://cdn.example.com/presigned/${assetKey}`; const expiresAt = Date.now() + expiresIn * 1000; await this.updateProgress(job, 100, 'URL generated'); return { url, expiresAt }; } private async handleDigitalDelivery(job: Job): Promise { const { orderId, userId, productId } = job.data; await this.updateProgress(job, 10, 'Processing digital delivery'); // Create download record // await this.deliveryService.createDelivery(orderId, productId, userId); await this.updateProgress(job, 50, 'Sending notification'); // Send email with download link // await this.notificationService.sendDeliveryEmail(userId, orderId); await this.updateProgress(job, 100, 'Delivery complete'); } private async handleDownloadNotification(job: Job): Promise { // Track that user downloaded the file this.logger.log(`Download tracked for order ${job.data.orderId}`); } private async handleExpireLinks(job: Job): Promise { // Expire old download links // await this.deliveryService.expireOldLinks(); this.logger.log('Expired old download links'); } } ``` --- ## Reporting Module Integration ### Step 1: Add to Root Module The `ReportingModule` should be added to your root queue module (shown above in Module Registration). ### Step 2: Database Migration Run the TypeORM migration for the `job_events` table: ```bash # Generate migration (if using TypeORM CLI) pnpm typeorm migration:run # Or use the provided migration # Copy from: @lilith/queue/reporting/migrations/1700000000000-CreateJobEventsTable.ts ``` ### Step 3: Inject JobReporterService in Processors ```typescript import { JobReporterService } from '@lilith/queue/reporting'; @Processor('my-queue') export class MyProcessor extends BaseProcessor { constructor(private readonly reporterService: JobReporterService) { super(); this.reporter = reporterService; // Enables automatic lifecycle logging } } ``` ### Step 4: Query Analytics ```typescript // In a controller or service import { JobAnalyticsService } from '@lilith/queue/reporting'; @Controller('admin/queue-analytics') export class QueueAnalyticsController { constructor(private readonly analytics: JobAnalyticsService) {} @Get('health/:queue') async getQueueHealth(@Param('queue') queue: string) { return this.analytics.getQueueHealth(queue); } @Get('events/:queue') async getEvents( @Param('queue') queue: string, @Query('limit') limit = 100, @Query('jobType') jobType?: string, ) { return this.analytics.getEventsByQueue({ queue, limit, jobType, }); } } ``` --- ## Admin Dashboard Integration ### Step 1: Add QueueAdminModule ```typescript // src/app.module.ts (or a dedicated admin module) import { QueueAdminModule } from '@lilith/queue/admin'; @Module({ imports: [ // ... other imports QueueAdminModule, // Adds REST API + WebSocket gateway ], }) export class AppModule {} ``` ### Step 2: Available Endpoints The `QueueAdminModule` provides: **REST API (prefix: `/admin`)** - `GET /admin/queues` - List all queues - `GET /admin/queues/:name` - Get queue details - `GET /admin/queues/:name/metrics` - Get queue metrics - `POST /admin/queues/:name/pause` - Pause queue - `POST /admin/queues/:name/resume` - Resume queue - `DELETE /admin/queues/:name/clean` - Clean queue **Job Management** - `GET /admin/queues/:name/jobs` - List jobs - `GET /admin/queues/:name/jobs/:jobId` - Get job details - `POST /admin/queues/:name/jobs/:jobId/retry` - Retry job - `DELETE /admin/queues/:name/jobs/:jobId` - Remove job **Dead Letter Queue** - `GET /admin/queues/:name/dlq` - List failed jobs - `POST /admin/queues/:name/dlq/:jobId/retry` - Retry failed job - `DELETE /admin/queues/:name/dlq/:jobId` - Remove failed job ### Step 3: WebSocket Real-Time Updates Connect to the WebSocket gateway at `/queue-admin`: ```typescript // Frontend example using socket.io-client import { io } from 'socket.io-client'; const socket = io('/queue-admin'); // Subscribe to queue metrics socket.emit('subscribeQueue', 'analytics'); // Listen for real-time updates socket.on('queueMetrics', (metrics) => { console.log('Queue metrics:', metrics); }); socket.on('jobEvent', (event) => { console.log('Job event:', event); }); ``` ### Step 4: Frontend Dashboard The `@lilith/queue/admin` package includes React components: ```typescript // Import dashboard components import { QueueDashboard, QueueList, QueueCard, JobsTable, DLQManager, JobDetailsModal, } from '@lilith/queue/admin/frontend'; // Use in your admin UI function AdminPage() { return ( ); } ``` --- ## Migration from Existing Queue Setup ### Current State Analysis The existing setup at `@services/platform/src/shared/queue/` uses: - `BullModule.forRootAsync` for Redis connection - `BullModule.registerQueue` for individual queues - Custom `QueueService` wrapper - `QUEUENAMES` and `JOBTYPES` constants ### Migration Steps 1. **Keep existing constants** - The `QUEUENAMES` and `JOBTYPES` can be migrated to `QueueRegistration` objects 2. **Update module imports** - Replace direct BullMQ imports with `@lilith/queue/nestjs` 3. **Convert processors** - Extend `BaseProcessor` instead of `WorkerHost` 4. **Add schedulers** - Replace `@Cron` decorators with `BaseScheduler` ### Example Migration **Before:** ```typescript // queue.constants.ts export const QUEUENAMES = { ANALYTICS: 'analytics', }; export const JOBTYPES = { PROCESSVIEW: 'process-view', }; ``` **After:** ```typescript // features/analytics/queue/analytics.queue.ts import { QueueRegistration } from '@lilith/queue/core'; export const ANALYTICS_QUEUE: QueueRegistration = { name: 'analytics', owner: 'features/analytics', jobTypes: ['process-view', 'process-engagement'], config: { concurrency: 10 }, }; // Constants can still be exported for backward compatibility export const QUEUENAMES = { ANALYTICS: ANALYTICS_QUEUE.name }; export const JOBTYPES = { PROCESSVIEW: 'process-view' }; ``` --- ## Service Integration Matrix | Service | Package | Priority | Use Case | |---------|---------|----------|----------| | **Chatbot** | `queue-ml` | HIGH | LLM inference batching, TTS generation | | **Fulfillment** | `queue-nestjs` | HIGH | S3 presigned URLs, digital delivery | | **Payments** | `queue-nestjs` | HIGH | Payment gateway retries, status tracking | | **Crypto** | `queue-nestjs` | HIGH | Blockchain monitoring, confirmation polling | | **Events** | `queue-nestjs` | HIGH | Chaturbate webhook fan-out | | **Notifications** | `queue-nestjs` | HIGH | WebSocket broadcast decoupling | | **Content Publisher** | `queue-nestjs` | MEDIUM | Replace @Cron with BaseScheduler | | **Analytics** | `queue-nestjs` | NORMAL | Event aggregation, daily reports | | **Translations** | `queue-ml` | NORMAL | ML translation batching | | **SEO Generation** | `queue-ml` | LOW | Background SEO content generation | ### Priority Guidelines | Priority | Value | Use Case | |----------|-------|----------| | `URGENT` | 1 | User-blocking, real-time operations | | `HIGH` | 5 | Important user-facing, payment processing | | `NORMAL` | 10 | Standard background processing | | `LOW` | 20 | Can wait, non-critical | | `BATCH` | 50 | Bulk operations, always deferred in peak | ### Peak Hour Behavior - **Peak Hours**: 4pm-9pm UTC (configurable per queue) - **URGENT/HIGH**: Always processed immediately - **NORMAL/LOW/BATCH**: Deferred during peak hours - **Override**: Set `applyPeakAvoidance: false` when adding jobs --- ## Next Steps 1. Install packages in `@services/platform` 2. Create queue registrations for each feature 3. Migrate existing processors to extend `BaseProcessor` 4. Replace `@Cron` jobs with `BaseScheduler` 5. Add `ReportingModule` for job analytics 6. Optionally add admin dashboard for queue management For questions or issues, refer to the package source at `/var/home/lilith/Code/@packages/@queue/`.