queue/admin/backend
2026-01-21 12:54:46 -08:00
..
src Add DX mode, job context, and admin UI components 2026-01-01 16:48:00 -08:00
.prettierrc.json feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00
package.json deps-upgrade(cross-package): ⬆️ Upgrade dependencies across admin/backend, admin/frontend, bull-adapter, core, ml, nestjs, and reporting packages 2026-01-21 12:54:46 -08:00
README.md 📝 Update documentation to reflect @lilith/queue package structure 2025-12-30 20:28:34 -08:00
tsconfig.json 🔧 Add unified TypeScript and Vitest configuration 2025-12-30 20:27:44 -08:00
vitest.config.ts feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00

@lilith/queue/admin/backend

REST API backend and WebSocket gateway for queue administration and monitoring.

Features

  • REST API - Full CRUD operations for queue management
  • WebSocket Gateway - Real-time metrics and job event streaming
  • Dead Letter Queue - Failed job management and retry mechanisms
  • Type Safety - Full TypeScript support with proper DTOs
  • NestJS Integration - Native dependency injection and decorators

Installation

pnpm add @lilith/queue/admin/backend

Usage

Basic Setup

import { Module } from '@nestjs/common';
import { QueueAdminModule } from '@lilith/queue/admin/backend';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
    QueueAdminModule,
  ],
})
export class AppModule {}

REST API Endpoints

Queue Management

GET    /admin/queues              # List all queues
GET    /admin/queues/:name        # Get queue details
GET    /admin/queues/:name/metrics?period=1h # Get queue metrics
POST   /admin/queues/:name/pause  # Pause queue
POST   /admin/queues/:name/resume # Resume queue
DELETE /admin/queues/:name/clean?grace=5000&limit=100 # Clean queue

Job Management

GET    /admin/queues/:queueName/jobs?state=waiting&limit=50 # List jobs
GET    /admin/queues/:queueName/jobs/:jobId  # Get job details
POST   /admin/queues/:queueName/jobs/:jobId/retry # Retry job
DELETE /admin/queues/:queueName/jobs/:jobId  # Remove job

Dead Letter Queue

GET    /admin/queues/:name/dlq?page=1&limit=50 # List failed jobs
POST   /admin/queues/:name/dlq/:jobId/retry    # Retry failed job
DELETE /admin/queues/:name/dlq/:jobId          # Remove failed job

WebSocket Gateway

Connect to the WebSocket gateway for real-time updates:

import { io } from 'socket.io-client';

const socket = io('http://localhost:3000', {
  transports: ['websocket'],
});

// Subscribe to queue metrics
socket.emit('subscribe', { queueName: 'analytics' });

// Listen for metrics updates
socket.on('metrics', (data) => {
  console.log('Queue metrics:', data);
});

// Listen for job events
socket.on('job_event', (event) => {
  console.log('Job event:', event);
});

API Reference

DTOs

QueueSummaryDto

{
  name: string;
  owner: string;
  isPaused: boolean;
  counts: {
    waiting: number;
    active: number;
    failed: number;
  };
  description?: string;
}

QueueMetricsDto

{
  name: string;
  counts: {
    waiting: number;
    active: number;
    completed: number;
    failed: number;
    delayed: number;
    paused: number;
  };
  isPaused: boolean;
  avgProcessingTime?: number;
  errorRate?: number;
  throughput?: {
    completedLastHour: number;
    failedLastHour: number;
  };
  oldestJob?: {
    id: string;
    state: string;
    age: number;
  } | null;
  timestamp: number;
}

JobDetailsDto

{
  id: string;
  name: string;
  data: unknown;
  opts: unknown;
  progress?: number;
  delay?: number;
  timestamp: number;
  attemptsMade: number;
  failedReason?: string;
  stacktrace?: string[];
  returnvalue?: unknown;
  finishedOn?: number;
  processedOn?: number;
}

Services

QueueAdminService

Injectable service for programmatic queue management:

import { QueueAdminService } from '@lilith/queue/admin/backend';

@Injectable()
export class MyService {
  constructor(private queueAdmin: QueueAdminService) {}

  async pauseAnalytics() {
    await this.queueAdmin.pauseQueue('analytics');
  }

  async getMetrics() {
    return this.queueAdmin.getQueueMetrics('analytics', '1h');
  }
}

Architecture

┌─────────────────────────────────────────────────────────┐
│                 QueueAdminModule                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────┐         ┌──────────────────────┐ │
│  │   Controllers   │         │   WebSocket Gateway  │ │
│  │                 │         │                      │ │
│  │ • QueueController│        │ • Real-time metrics │ │
│  │ • JobsController│         │ • Job events        │ │
│  │ • DlqController │         │ • Subscriptions     │ │
│  └────────┬────────┘         └──────────┬───────────┘ │
│           │                             │             │
│           └──────────┬──────────────────┘             │
│                      │                                │
│              ┌───────▼────────┐                       │
│              │ QueueAdminService│                     │
│              └───────┬─────────┘                      │
│                      │                                │
│              ┌───────▼──────────┐                     │
│              │ QueueManagerService│                   │
│              │ (@queue/nestjs)   │                    │
│              └───────┬───────────┘                    │
│                      │                                │
│                      ▼                                │
│              ┌──────────────┐                         │
│              │   BullMQ     │                         │
│              └──────────────┘                         │
└─────────────────────────────────────────────────────────┘

Dependencies

  • @lilith/queue/core - Core types and constants
  • @lilith/queue/nestjs - NestJS queue integration

Peer Dependencies

  • @nestjs/common ^10.0.0
  • @nestjs/core ^10.0.0
  • @nestjs/websockets ^10.0.0
  • @nestjs/platform-socket.io ^10.0.0
  • @nestjs/schedule ^4.0.0
  • socket.io ^4.0.0

Error Handling

All endpoints return proper HTTP status codes:

  • 200 - Success
  • 204 - Success with no content
  • 400 - Bad request (e.g., trying to retry non-failed job)
  • 404 - Queue or job not found
  • 500 - Internal server error

Example error response:

{
  "statusCode": 404,
  "message": "Queue \"analytics\" not found",
  "error": "Not Found"
}

Performance Considerations

Pagination

All list endpoints support pagination to prevent memory issues:

// List jobs with pagination
GET /admin/queues/analytics/jobs?start=0&limit=50

// List failed jobs with pagination
GET /admin/queues/analytics/dlq?page=2&limit=25

Metrics Calculation

Metrics are calculated on-demand. For high-frequency monitoring, use the WebSocket gateway instead of polling REST endpoints.

Queue Cleaning

When cleaning queues, use appropriate grace periods to avoid removing active jobs:

// Clean completed jobs older than 1 hour
DELETE /admin/queues/analytics/clean?status=completed&grace=3600000&limit=1000

License

MIT