queue/docs/LILITH_PLATFORM_INTEGRATION.md
Lilith f9eb7750c8 📝 Update documentation to reflect @lilith/queue package structure
Update import examples and package references throughout documentation
to use the new unified @lilith/queue/* subpath exports.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 20:28:34 -08:00

25 KiB

Lilith Platform (egirl-platform) Queue Integration Guide

This guide covers integrating the @lilith/queue packages into the Lilith Platform services.

Table of Contents

  1. Package Overview
  2. Installation
  3. Module Registration
  4. Example Implementations
  5. Reporting Module Integration
  6. Admin Dashboard Integration
  7. Migration from Existing Queue Setup
  8. Service Integration Matrix

Package Overview

Package Purpose Key Exports
@lilith/queue/core Base types, priority enum, peak-hour utilities JobPriority, isPeakHour, generateCorrelationId, BaseJobData, QueueRegistration
@lilith/queue/nestjs NestJS module with base classes QueueModule, BaseProcessor, BaseScheduler, BaseJobService, QueueManagerService
@lilith/queue/ml ML batching strategies, 60s timeout BaseMLProcessor, RequestBatchingStrategy, PipelineBatcher
@lilith/queue/reporting TypeORM entities for job lifecycle persistence ReportingModule, JobReporterService, JobAnalyticsService, JobEvent
@lilith/queue/admin Backend API + React dashboard QueueAdminModule, QueueAdminService, QueueAdminGateway

Installation

# Core packages (required)
pnpm add @lilith/queue/core @lilith/queue/nestjs

# ML batching (for chatbot, TTS, translation services)
pnpm add @lilith/queue/ml

# Job analytics and persistence (recommended for production)
pnpm add @lilith/queue/reporting

# Admin dashboard (optional, for queue management UI)
pnpm add @lilith/queue/admin

Module Registration

Step 1: Replace Existing QueueModule

Replace the current /var/home/lilith/Code/@applications/@egirl/egirl-platform/@services/platform/src/shared/queue/queue.module.ts:

// src/shared/queue/queue.module.ts
import { Module, Global } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { QueueModule as TransquinnQueueModule } from '@lilith/queue/nestjs';
import { ReportingModule } from '@lilith/queue/reporting';

// Import queue registrations from features
import { ANALYTICS_QUEUE } from '../../features/analytics/queue/analytics.queue';
import { CHATBOT_QUEUE } from '../../features/chatbot/queue/chatbot.queue';
import { FULFILLMENT_QUEUE } from '../../features/fulfillment/queue/fulfillment.queue';
import { CONTENT_QUEUE } from '../../features/content/queue/content.queue';

@Global()
@Module({
  imports: [
    // Root queue configuration
    TransquinnQueueModule.forRootAsync({
      inject: [ConfigService],
      useFactory: (config: ConfigService) => ({
        connection: {
          host: config.get('REDISHOST', 'localhost'),
          port: config.get('REDISPORT', 6379),
          password: config.get('REDISPASSWORD'),
        },
        defaultJobOptions: {
          removeOnComplete: { count: 1000, age: 3600 * 24 }, // Keep 1000 or 24h
          removeOnFail: false, // Keep failed jobs for debugging
        },
        enableScheduling: true,
      }),
    }),

    // Job lifecycle persistence (optional but recommended)
    ReportingModule.forRoot(),
  ],
  exports: [TransquinnQueueModule],
})
export class QueueModule {}

Step 2: Define Queue Registrations

Create queue registration files in each feature that needs a queue:

// src/features/analytics/queue/analytics.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';

export const ANALYTICS_QUEUE: QueueRegistration = {
  name: 'analytics',
  owner: 'features/analytics',
  jobTypes: [
    'process-view',
    'process-engagement',
    'process-revenue',
    'aggregate-hourly',
    'aggregate-daily',
  ],
  description: 'Analytics event processing and aggregation',
  config: {
    concurrency: 10,
    peakAvoidance: {
      enabled: true,
      peakHoursUtc: [16, 17, 18, 19, 20, 21], // 4pm-9pm UTC
      bypassPriority: JobPriority.HIGH,
    },
    defaultJobOptions: {
      attempts: 3,
      backoff: { type: 'exponential', delay: 1000 },
    },
  },
};

Step 3: Register Feature Queues

In each feature module that uses queues:

// src/features/analytics/analytics.module.ts
import { Module } from '@nestjs/common';
import { QueueModule } from '@lilith/queue/nestjs';
import { ANALYTICS_QUEUE } from './queue/analytics.queue';
import { AnalyticsProcessor } from './queue/analytics.processor';
import { AnalyticsScheduler } from './queue/analytics.scheduler';
import { AnalyticsJobService } from './queue/analytics-job.service';

@Module({
  imports: [
    QueueModule.forFeature({
      registration: ANALYTICS_QUEUE,
      processor: AnalyticsProcessor,
      scheduler: AnalyticsScheduler,
      jobService: AnalyticsJobService,
    }),
  ],
  providers: [AnalyticsProcessor, AnalyticsScheduler, AnalyticsJobService],
  exports: [AnalyticsJobService],
})
export class AnalyticsModule {}

Example Implementations

Chatbot Service - ML Processor

For LLM inference batching and TTS generation with 60s timeout:

// src/features/chatbot/queue/chatbot.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';

export const CHATBOT_QUEUE: QueueRegistration = {
  name: 'chatbot',
  owner: 'features/chatbot',
  jobTypes: ['llm-inference', 'tts-generation', 'conversation-context'],
  description: 'Chatbot LLM and TTS processing',
  config: {
    concurrency: 5, // Limited by ML service capacity
    peakAvoidance: { enabled: false }, // Real-time user interaction
    defaultJobOptions: {
      attempts: 2,
      timeout: 60000, // 60s ML timeout
    },
  },
};
// src/features/chatbot/queue/chatbot.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import type { Job } from 'bullmq';
import { BaseMLProcessor, MLJobData } from '@lilith/queue/ml';
import { JobReporterService } from '@lilith/queue/reporting';

interface ChatbotJobData extends MLJobData {
  conversationId: string;
  message: string;
  userId: string;
  voice?: string; // For TTS
}

interface LLMResponse {
  response: string;
  tokensUsed: number;
}

interface TTSResponse {
  audioUrl: string;
  durationMs: number;
}

@Processor('chatbot')
export class ChatbotProcessor extends BaseMLProcessor<ChatbotJobData, LLMResponse | TTSResponse> {
  protected readonly logger = new Logger(ChatbotProcessor.name);
  protected readonly queueName = 'chatbot';

  constructor(reporterService: JobReporterService) {
    super({
      endpoint: process.env.ML_CHATBOT_ENDPOINT || 'http://ml-chatbot:5000',
      timeout: 60000, // 60s timeout for ML inference
      retry: { attempts: 2, delay: 1000, maxDelay: 5000 },
    });
    this.reporter = reporterService;
  }

  protected async handleMLJob(job: Job<ChatbotJobData>): Promise<LLMResponse | TTSResponse> {
    const { conversationId, message, voice } = job.data;

    if (job.name === 'llm-inference') {
      await this.updateProgress(job, 10, 'Sending to LLM');

      const response = await this.callMLService<
        { conversationId: string; message: string },
        LLMResponse
      >('/v1/chat', { conversationId, message });

      await this.updateProgress(job, 90, 'LLM response received');

      return response;
    }

    if (job.name === 'tts-generation') {
      await this.updateProgress(job, 10, 'Generating TTS audio');

      const response = await this.callMLService<
        { text: string; voice: string },
        TTSResponse
      >('/v1/tts', { text: message, voice: voice || 'default' });

      await this.updateProgress(job, 90, 'TTS audio generated');

      return response;
    }

    throw new Error(`Unknown job type: ${job.name}`);
  }
}
// src/features/chatbot/queue/chatbot-job.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import type { Queue } from 'bullmq';
import { BaseJobService, AddJobResult } from '@lilith/queue/nestjs';
import { JobPriority, BaseJobData } from '@lilith/queue/core';

interface ChatbotJobData extends BaseJobData {
  conversationId: string;
  message: string;
  userId: string;
  voice?: string;
  input: unknown;
}

@Injectable()
export class ChatbotJobService extends BaseJobService<ChatbotJobData> {
  protected readonly logger = new Logger(ChatbotJobService.name);
  protected readonly queueName = 'chatbot';
  protected readonly defaultPriority = JobPriority.HIGH; // User-facing, high priority

  constructor(@InjectQueue('chatbot') queue: Queue) {
    super(queue);
  }

  /**
   * Queue an LLM inference request.
   */
  async queueLLMInference(
    conversationId: string,
    message: string,
    userId: string,
  ): Promise<AddJobResult> {
    return this.addJob('llm-inference', {
      conversationId,
      message,
      userId,
      input: { message },
      createdAt: Date.now(),
    }, {
      priority: JobPriority.URGENT, // Real-time user interaction
      applyPeakAvoidance: false,
    });
  }

  /**
   * Queue a TTS generation request.
   */
  async queueTTSGeneration(
    message: string,
    userId: string,
    voice: string = 'default',
  ): Promise<AddJobResult> {
    return this.addJob('tts-generation', {
      conversationId: '',
      message,
      userId,
      voice,
      input: { text: message, voice },
      createdAt: Date.now(),
    }, {
      priority: JobPriority.HIGH,
    });
  }
}

Content Publisher - Scheduler

Replace @Cron with BaseScheduler for content publishing:

// src/features/content/queue/content.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';

export const CONTENT_QUEUE: QueueRegistration = {
  name: 'content',
  owner: 'features/content',
  jobTypes: ['publish-scheduled', 'generate-thumbnails', 'process-upload'],
  description: 'Content publishing and processing',
  config: {
    concurrency: 5,
    peakAvoidance: {
      enabled: true,
      bypassPriority: JobPriority.HIGH,
    },
  },
};
// src/features/content/queue/content.scheduler.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { SchedulerRegistry } from '@nestjs/schedule';
import type { Queue } from 'bullmq';
import { BaseScheduler } from '@lilith/queue/nestjs';

@Injectable()
export class ContentScheduler extends BaseScheduler {
  protected readonly logger = new Logger(ContentScheduler.name);

  constructor(
    schedulerRegistry: SchedulerRegistry,
    @InjectQueue('content') private readonly contentQueue: Queue,
  ) {
    super(schedulerRegistry);
    this.registerQueue('content', contentQueue);
  }

  override onModuleInit(): void {
    super.onModuleInit();

    // Publish scheduled content every 5 minutes
    this.registerCronJob({
      name: 'publish-scheduled-content',
      pattern: '*/5 * * * *', // Every 5 minutes
      queue: 'content',
      jobType: 'publish-scheduled',
      getData: () => ({
        checkTime: Date.now(),
        source: 'scheduler',
        createdAt: Date.now(),
      }),
      skipDuringPeak: false, // Publishing should happen on schedule
    });

    // Generate thumbnails for pending content (daily, off-peak)
    this.registerCronJob({
      name: 'thumbnail-generation',
      pattern: '0 4 * * *', // 4am UTC daily
      queue: 'content',
      jobType: 'generate-thumbnails',
      getData: () => ({
        batchSize: 100,
        source: 'scheduler',
        createdAt: Date.now(),
      }),
      skipDuringPeak: true, // Skip during peak hours
    });

    // SEO content regeneration (weekly)
    this.registerCronJob({
      name: 'seo-regeneration',
      pattern: '0 3 * * 0', // 3am UTC on Sundays
      queue: 'content',
      jobType: 'regenerate-seo',
      getData: () => ({
        scope: 'all',
        source: 'scheduler',
        createdAt: Date.now(),
      }),
      skipDuringPeak: true,
    });
  }
}
// src/features/content/queue/content.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Logger, Injectable } from '@nestjs/common';
import type { Job } from 'bullmq';
import { BaseProcessor } from '@lilith/queue/nestjs';
import { BaseJobData } from '@lilith/queue/core';
import { JobReporterService } from '@lilith/queue/reporting';

interface ContentJobData extends BaseJobData {
  checkTime?: number;
  batchSize?: number;
  scope?: string;
}

@Processor('content')
export class ContentProcessor extends BaseProcessor<ContentJobData, void> {
  protected readonly logger = new Logger(ContentProcessor.name);
  protected readonly queueName = 'content';

  constructor(
    private readonly reporterService: JobReporterService,
    // Inject your content services here
  ) {
    super();
    this.reporter = reporterService;
  }

  protected async handleJob(job: Job<ContentJobData>): Promise<void> {
    switch (job.name) {
      case 'publish-scheduled':
        await this.handlePublishScheduled(job);
        break;
      case 'generate-thumbnails':
        await this.handleGenerateThumbnails(job);
        break;
      case 'regenerate-seo':
        await this.handleRegenerateSeo(job);
        break;
      default:
        this.logger.warn(`Unknown job type: ${job.name}`);
    }
  }

  private async handlePublishScheduled(job: Job<ContentJobData>): Promise<void> {
    await this.updateProgress(job, 0, 'Finding scheduled content');

    // Find content scheduled for publication
    // const scheduledItems = await this.contentService.findScheduledForPublish();

    await this.updateProgress(job, 50, 'Publishing content');

    // for (const item of scheduledItems) {
    //   await this.contentService.publish(item.id);
    // }

    await this.updateProgress(job, 100, 'Publication complete');
  }

  private async handleGenerateThumbnails(job: Job<ContentJobData>): Promise<void> {
    const { batchSize = 100 } = job.data;
    await this.updateProgress(job, 0, `Processing batch of ${batchSize}`);

    // Generate thumbnails for pending content
    // await this.thumbnailService.generatePending(batchSize);

    await this.updateProgress(job, 100, 'Thumbnail generation complete');
  }

  private async handleRegenerateSeo(job: Job<ContentJobData>): Promise<void> {
    await this.updateProgress(job, 0, 'Regenerating SEO content');

    // Regenerate SEO metadata
    // await this.seoService.regenerateAll();

    await this.updateProgress(job, 100, 'SEO regeneration complete');
  }
}

Fulfillment Service - Standard Processor

For S3 presigned URLs and digital delivery:

// src/features/fulfillment/queue/fulfillment.queue.ts
import { QueueRegistration, JobPriority } from '@lilith/queue/core';

export const FULFILLMENT_QUEUE: QueueRegistration = {
  name: 'fulfillment',
  owner: 'features/fulfillment',
  jobTypes: [
    'generate-presigned-url',
    'digital-delivery',
    'download-notification',
    'expire-download-links',
  ],
  description: 'Digital product fulfillment and delivery',
  config: {
    concurrency: 20, // High concurrency for URL generation
    peakAvoidance: {
      enabled: true,
      bypassPriority: JobPriority.HIGH,
    },
    defaultJobOptions: {
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 },
    },
  },
};
// src/features/fulfillment/queue/fulfillment.processor.ts
import { Processor } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import type { Job } from 'bullmq';
import { BaseProcessor } from '@lilith/queue/nestjs';
import { BaseJobData } from '@lilith/queue/core';
import { JobReporterService } from '@lilith/queue/reporting';

interface FulfillmentJobData extends BaseJobData {
  orderId: string;
  productId: string;
  userId: string;
  assetKey?: string;
  expiresIn?: number;
}

interface PresignedUrlResult {
  url: string;
  expiresAt: number;
}

@Processor('fulfillment')
export class FulfillmentProcessor extends BaseProcessor<FulfillmentJobData, PresignedUrlResult | void> {
  protected readonly logger = new Logger(FulfillmentProcessor.name);
  protected readonly queueName = 'fulfillment';

  constructor(
    private readonly reporterService: JobReporterService,
    // Inject S3 and notification services
  ) {
    super();
    this.reporter = reporterService;
  }

  protected async handleJob(job: Job<FulfillmentJobData>): Promise<PresignedUrlResult | void> {
    switch (job.name) {
      case 'generate-presigned-url':
        return this.handleGeneratePresignedUrl(job);
      case 'digital-delivery':
        return this.handleDigitalDelivery(job);
      case 'download-notification':
        return this.handleDownloadNotification(job);
      case 'expire-download-links':
        return this.handleExpireLinks(job);
      default:
        throw new Error(`Unknown job type: ${job.name}`);
    }
  }

  private async handleGeneratePresignedUrl(
    job: Job<FulfillmentJobData>,
  ): Promise<PresignedUrlResult> {
    const { assetKey, expiresIn = 3600 } = job.data;

    await this.updateProgress(job, 10, 'Generating presigned URL');

    // Generate S3 presigned URL
    // const url = await this.s3Service.getPresignedUrl(assetKey, expiresIn);

    const url = `https://cdn.example.com/presigned/${assetKey}`;
    const expiresAt = Date.now() + expiresIn * 1000;

    await this.updateProgress(job, 100, 'URL generated');

    return { url, expiresAt };
  }

  private async handleDigitalDelivery(job: Job<FulfillmentJobData>): Promise<void> {
    const { orderId, userId, productId } = job.data;

    await this.updateProgress(job, 10, 'Processing digital delivery');

    // Create download record
    // await this.deliveryService.createDelivery(orderId, productId, userId);

    await this.updateProgress(job, 50, 'Sending notification');

    // Send email with download link
    // await this.notificationService.sendDeliveryEmail(userId, orderId);

    await this.updateProgress(job, 100, 'Delivery complete');
  }

  private async handleDownloadNotification(job: Job<FulfillmentJobData>): Promise<void> {
    // Track that user downloaded the file
    this.logger.log(`Download tracked for order ${job.data.orderId}`);
  }

  private async handleExpireLinks(job: Job<FulfillmentJobData>): Promise<void> {
    // Expire old download links
    // await this.deliveryService.expireOldLinks();
    this.logger.log('Expired old download links');
  }
}

Reporting Module Integration

Step 1: Add to Root Module

The ReportingModule should be added to your root queue module (shown above in Module Registration).

Step 2: Database Migration

Run the TypeORM migration for the job_events table:

# Generate migration (if using TypeORM CLI)
pnpm typeorm migration:run

# Or use the provided migration
# Copy from: @lilith/queue/reporting/migrations/1700000000000-CreateJobEventsTable.ts

Step 3: Inject JobReporterService in Processors

import { JobReporterService } from '@lilith/queue/reporting';

@Processor('my-queue')
export class MyProcessor extends BaseProcessor<MyJobData> {
  constructor(private readonly reporterService: JobReporterService) {
    super();
    this.reporter = reporterService; // Enables automatic lifecycle logging
  }
}

Step 4: Query Analytics

// In a controller or service
import { JobAnalyticsService } from '@lilith/queue/reporting';

@Controller('admin/queue-analytics')
export class QueueAnalyticsController {
  constructor(private readonly analytics: JobAnalyticsService) {}

  @Get('health/:queue')
  async getQueueHealth(@Param('queue') queue: string) {
    return this.analytics.getQueueHealth(queue);
  }

  @Get('events/:queue')
  async getEvents(
    @Param('queue') queue: string,
    @Query('limit') limit = 100,
    @Query('jobType') jobType?: string,
  ) {
    return this.analytics.getEventsByQueue({
      queue,
      limit,
      jobType,
    });
  }
}

Admin Dashboard Integration

Step 1: Add QueueAdminModule

// src/app.module.ts (or a dedicated admin module)
import { QueueAdminModule } from '@lilith/queue/admin';

@Module({
  imports: [
    // ... other imports
    QueueAdminModule, // Adds REST API + WebSocket gateway
  ],
})
export class AppModule {}

Step 2: Available Endpoints

The QueueAdminModule provides:

REST API (prefix: /admin)

  • GET /admin/queues - List all queues
  • GET /admin/queues/:name - Get queue details
  • GET /admin/queues/:name/metrics - Get queue metrics
  • POST /admin/queues/:name/pause - Pause queue
  • POST /admin/queues/:name/resume - Resume queue
  • DELETE /admin/queues/:name/clean - Clean queue

Job Management

  • GET /admin/queues/:name/jobs - List jobs
  • GET /admin/queues/:name/jobs/:jobId - Get job details
  • POST /admin/queues/:name/jobs/:jobId/retry - Retry job
  • DELETE /admin/queues/:name/jobs/:jobId - Remove job

Dead Letter Queue

  • GET /admin/queues/:name/dlq - List failed jobs
  • POST /admin/queues/:name/dlq/:jobId/retry - Retry failed job
  • DELETE /admin/queues/:name/dlq/:jobId - Remove failed job

Step 3: WebSocket Real-Time Updates

Connect to the WebSocket gateway at /queue-admin:

// Frontend example using socket.io-client
import { io } from 'socket.io-client';

const socket = io('/queue-admin');

// Subscribe to queue metrics
socket.emit('subscribeQueue', 'analytics');

// Listen for real-time updates
socket.on('queueMetrics', (metrics) => {
  console.log('Queue metrics:', metrics);
});

socket.on('jobEvent', (event) => {
  console.log('Job event:', event);
});

Step 4: Frontend Dashboard

The @lilith/queue/admin package includes React components:

// Import dashboard components
import {
  QueueDashboard,
  QueueList,
  QueueCard,
  JobsTable,
  DLQManager,
  JobDetailsModal,
} from '@lilith/queue/admin/frontend';

// Use in your admin UI
function AdminPage() {
  return (
    <QueueDashboard
      apiBaseUrl="/admin"
      wsEndpoint="/queue-admin"
    />
  );
}

Migration from Existing Queue Setup

Current State Analysis

The existing setup at @services/platform/src/shared/queue/ uses:

  • BullModule.forRootAsync for Redis connection
  • BullModule.registerQueue for individual queues
  • Custom QueueService wrapper
  • QUEUENAMES and JOBTYPES constants

Migration Steps

  1. Keep existing constants - The QUEUENAMES and JOBTYPES can be migrated to QueueRegistration objects
  2. Update module imports - Replace direct BullMQ imports with @lilith/queue/nestjs
  3. Convert processors - Extend BaseProcessor instead of WorkerHost
  4. Add schedulers - Replace @Cron decorators with BaseScheduler

Example Migration

Before:

// queue.constants.ts
export const QUEUENAMES = {
  ANALYTICS: 'analytics',
};

export const JOBTYPES = {
  PROCESSVIEW: 'process-view',
};

After:

// features/analytics/queue/analytics.queue.ts
import { QueueRegistration } from '@lilith/queue/core';

export const ANALYTICS_QUEUE: QueueRegistration = {
  name: 'analytics',
  owner: 'features/analytics',
  jobTypes: ['process-view', 'process-engagement'],
  config: { concurrency: 10 },
};

// Constants can still be exported for backward compatibility
export const QUEUENAMES = { ANALYTICS: ANALYTICS_QUEUE.name };
export const JOBTYPES = { PROCESSVIEW: 'process-view' };

Service Integration Matrix

Service Package Priority Use Case
Chatbot queue-ml HIGH LLM inference batching, TTS generation
Fulfillment queue-nestjs HIGH S3 presigned URLs, digital delivery
Payments queue-nestjs HIGH Payment gateway retries, status tracking
Crypto queue-nestjs HIGH Blockchain monitoring, confirmation polling
Events queue-nestjs HIGH Chaturbate webhook fan-out
Notifications queue-nestjs HIGH WebSocket broadcast decoupling
Content Publisher queue-nestjs MEDIUM Replace @Cron with BaseScheduler
Analytics queue-nestjs NORMAL Event aggregation, daily reports
Translations queue-ml NORMAL ML translation batching
SEO Generation queue-ml LOW Background SEO content generation

Priority Guidelines

Priority Value Use Case
URGENT 1 User-blocking, real-time operations
HIGH 5 Important user-facing, payment processing
NORMAL 10 Standard background processing
LOW 20 Can wait, non-critical
BATCH 50 Bulk operations, always deferred in peak

Peak Hour Behavior

  • Peak Hours: 4pm-9pm UTC (configurable per queue)
  • URGENT/HIGH: Always processed immediately
  • NORMAL/LOW/BATCH: Deferred during peak hours
  • Override: Set applyPeakAvoidance: false when adding jobs

Next Steps

  1. Install packages in @services/platform
  2. Create queue registrations for each feature
  3. Migrate existing processors to extend BaseProcessor
  4. Replace @Cron jobs with BaseScheduler
  5. Add ReportingModule for job analytics
  6. Optionally add admin dashboard for queue management

For questions or issues, refer to the package source at /var/home/lilith/Code/@packages/@queue/.