queue/admin/backend/README.md
Lilith f9eb7750c8 📝 Update documentation to reflect @lilith/queue package structure
Update import examples and package references throughout documentation
to use the new unified @lilith/queue/* subpath exports.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 20:28:34 -08:00

7.6 KiB

@lilith/queue/admin/backend

REST API backend and WebSocket gateway for queue administration and monitoring.

Features

  • REST API - Full CRUD operations for queue management
  • WebSocket Gateway - Real-time metrics and job event streaming
  • Dead Letter Queue - Failed job management and retry mechanisms
  • Type Safety - Full TypeScript support with proper DTOs
  • NestJS Integration - Native dependency injection and decorators

Installation

pnpm add @lilith/queue/admin/backend

Usage

Basic Setup

import { Module } from '@nestjs/common';
import { QueueAdminModule } from '@lilith/queue/admin/backend';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
    QueueAdminModule,
  ],
})
export class AppModule {}

REST API Endpoints

Queue Management

GET    /admin/queues              # List all queues
GET    /admin/queues/:name        # Get queue details
GET    /admin/queues/:name/metrics?period=1h # Get queue metrics
POST   /admin/queues/:name/pause  # Pause queue
POST   /admin/queues/:name/resume # Resume queue
DELETE /admin/queues/:name/clean?grace=5000&limit=100 # Clean queue

Job Management

GET    /admin/queues/:queueName/jobs?state=waiting&limit=50 # List jobs
GET    /admin/queues/:queueName/jobs/:jobId  # Get job details
POST   /admin/queues/:queueName/jobs/:jobId/retry # Retry job
DELETE /admin/queues/:queueName/jobs/:jobId  # Remove job

Dead Letter Queue

GET    /admin/queues/:name/dlq?page=1&limit=50 # List failed jobs
POST   /admin/queues/:name/dlq/:jobId/retry    # Retry failed job
DELETE /admin/queues/:name/dlq/:jobId          # Remove failed job

WebSocket Gateway

Connect to the WebSocket gateway for real-time updates:

import { io } from 'socket.io-client';

const socket = io('http://localhost:3000', {
  transports: ['websocket'],
});

// Subscribe to queue metrics
socket.emit('subscribe', { queueName: 'analytics' });

// Listen for metrics updates
socket.on('metrics', (data) => {
  console.log('Queue metrics:', data);
});

// Listen for job events
socket.on('job_event', (event) => {
  console.log('Job event:', event);
});

API Reference

DTOs

QueueSummaryDto

{
  name: string;
  owner: string;
  isPaused: boolean;
  counts: {
    waiting: number;
    active: number;
    failed: number;
  };
  description?: string;
}

QueueMetricsDto

{
  name: string;
  counts: {
    waiting: number;
    active: number;
    completed: number;
    failed: number;
    delayed: number;
    paused: number;
  };
  isPaused: boolean;
  avgProcessingTime?: number;
  errorRate?: number;
  throughput?: {
    completedLastHour: number;
    failedLastHour: number;
  };
  oldestJob?: {
    id: string;
    state: string;
    age: number;
  } | null;
  timestamp: number;
}

JobDetailsDto

{
  id: string;
  name: string;
  data: unknown;
  opts: unknown;
  progress?: number;
  delay?: number;
  timestamp: number;
  attemptsMade: number;
  failedReason?: string;
  stacktrace?: string[];
  returnvalue?: unknown;
  finishedOn?: number;
  processedOn?: number;
}

Services

QueueAdminService

Injectable service for programmatic queue management:

import { QueueAdminService } from '@lilith/queue/admin/backend';

@Injectable()
export class MyService {
  constructor(private queueAdmin: QueueAdminService) {}

  async pauseAnalytics() {
    await this.queueAdmin.pauseQueue('analytics');
  }

  async getMetrics() {
    return this.queueAdmin.getQueueMetrics('analytics', '1h');
  }
}

Architecture

┌─────────────────────────────────────────────────────────┐
│                 QueueAdminModule                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────┐         ┌──────────────────────┐ │
│  │   Controllers   │         │   WebSocket Gateway  │ │
│  │                 │         │                      │ │
│  │ • QueueController│        │ • Real-time metrics │ │
│  │ • JobsController│         │ • Job events        │ │
│  │ • DlqController │         │ • Subscriptions     │ │
│  └────────┬────────┘         └──────────┬───────────┘ │
│           │                             │             │
│           └──────────┬──────────────────┘             │
│                      │                                │
│              ┌───────▼────────┐                       │
│              │ QueueAdminService│                     │
│              └───────┬─────────┘                      │
│                      │                                │
│              ┌───────▼──────────┐                     │
│              │ QueueManagerService│                   │
│              │ (@queue/nestjs)   │                    │
│              └───────┬───────────┘                    │
│                      │                                │
│                      ▼                                │
│              ┌──────────────┐                         │
│              │   BullMQ     │                         │
│              └──────────────┘                         │
└─────────────────────────────────────────────────────────┘

Dependencies

  • @lilith/queue/core - Core types and constants
  • @lilith/queue/nestjs - NestJS queue integration

Peer Dependencies

  • @nestjs/common ^10.0.0
  • @nestjs/core ^10.0.0
  • @nestjs/websockets ^10.0.0
  • @nestjs/platform-socket.io ^10.0.0
  • @nestjs/schedule ^4.0.0
  • socket.io ^4.0.0

Error Handling

All endpoints return proper HTTP status codes:

  • 200 - Success
  • 204 - Success with no content
  • 400 - Bad request (e.g., trying to retry non-failed job)
  • 404 - Queue or job not found
  • 500 - Internal server error

Example error response:

{
  "statusCode": 404,
  "message": "Queue \"analytics\" not found",
  "error": "Not Found"
}

Performance Considerations

Pagination

All list endpoints support pagination to prevent memory issues:

// List jobs with pagination
GET /admin/queues/analytics/jobs?start=0&limit=50

// List failed jobs with pagination
GET /admin/queues/analytics/dlq?page=2&limit=25

Metrics Calculation

Metrics are calculated on-demand. For high-frequency monitoring, use the WebSocket gateway instead of polling REST endpoints.

Queue Cleaning

When cleaning queues, use appropriate grace periods to avoid removing active jobs:

// Clean completed jobs older than 1 hour
DELETE /admin/queues/analytics/clean?status=completed&grace=3600000&limit=1000

License

MIT