queue/admin/backend
Lilith 2f65c6cdb8 Add DX mode, job context, and admin UI components
Backend:
- Add JobContext types for service attribution and filtering
- Add DxModeService for development experience priority elevation
- Add bulk operations: retryAllFailed, cancelPending, cleanByAge
- Add context-aware filtering to getJobs
- Add search jobs endpoint across queues

Frontend:
- Add BulkActionsBar component for bulk job operations
- Add JobStatusTabs component for status filtering
- Add SearchBar component with debouncing
- Add ServiceFilter component (generic, accepts services as prop)
- Add JobDetailPanel slide-over component
- Add useJobDetail and useBulkJobActions hooks
- Add bulk operation types to frontend

Core:
- Add JobContext and EnrichedJobData types
- Add DX mode utilities for priority resolution
- Add BulkOperationResult type

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-01 16:48:00 -08:00
..
src Add DX mode, job context, and admin UI components 2026-01-01 16:48:00 -08:00
.prettierrc.json feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00
README.md 📝 Update documentation to reflect @lilith/queue package structure 2025-12-30 20:28:34 -08:00
tsconfig.json 🔧 Add unified TypeScript and Vitest configuration 2025-12-30 20:27:44 -08:00
vitest.config.ts feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00

@lilith/queue/admin/backend

REST API backend and WebSocket gateway for queue administration and monitoring.

Features

  • REST API - Full CRUD operations for queue management
  • WebSocket Gateway - Real-time metrics and job event streaming
  • Dead Letter Queue - Failed job management and retry mechanisms
  • Type Safety - Full TypeScript support with proper DTOs
  • NestJS Integration - Native dependency injection and decorators

Installation

pnpm add @lilith/queue/admin/backend

Usage

Basic Setup

import { Module } from '@nestjs/common';
import { QueueAdminModule } from '@lilith/queue/admin/backend';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
    QueueAdminModule,
  ],
})
export class AppModule {}

REST API Endpoints

Queue Management

GET    /admin/queues              # List all queues
GET    /admin/queues/:name        # Get queue details
GET    /admin/queues/:name/metrics?period=1h # Get queue metrics
POST   /admin/queues/:name/pause  # Pause queue
POST   /admin/queues/:name/resume # Resume queue
DELETE /admin/queues/:name/clean?grace=5000&limit=100 # Clean queue

Job Management

GET    /admin/queues/:queueName/jobs?state=waiting&limit=50 # List jobs
GET    /admin/queues/:queueName/jobs/:jobId  # Get job details
POST   /admin/queues/:queueName/jobs/:jobId/retry # Retry job
DELETE /admin/queues/:queueName/jobs/:jobId  # Remove job

Dead Letter Queue

GET    /admin/queues/:name/dlq?page=1&limit=50 # List failed jobs
POST   /admin/queues/:name/dlq/:jobId/retry    # Retry failed job
DELETE /admin/queues/:name/dlq/:jobId          # Remove failed job

WebSocket Gateway

Connect to the WebSocket gateway for real-time updates:

import { io } from 'socket.io-client';

const socket = io('http://localhost:3000', {
  transports: ['websocket'],
});

// Subscribe to queue metrics
socket.emit('subscribe', { queueName: 'analytics' });

// Listen for metrics updates
socket.on('metrics', (data) => {
  console.log('Queue metrics:', data);
});

// Listen for job events
socket.on('job_event', (event) => {
  console.log('Job event:', event);
});

API Reference

DTOs

QueueSummaryDto

{
  name: string;
  owner: string;
  isPaused: boolean;
  counts: {
    waiting: number;
    active: number;
    failed: number;
  };
  description?: string;
}

QueueMetricsDto

{
  name: string;
  counts: {
    waiting: number;
    active: number;
    completed: number;
    failed: number;
    delayed: number;
    paused: number;
  };
  isPaused: boolean;
  avgProcessingTime?: number;
  errorRate?: number;
  throughput?: {
    completedLastHour: number;
    failedLastHour: number;
  };
  oldestJob?: {
    id: string;
    state: string;
    age: number;
  } | null;
  timestamp: number;
}

JobDetailsDto

{
  id: string;
  name: string;
  data: unknown;
  opts: unknown;
  progress?: number;
  delay?: number;
  timestamp: number;
  attemptsMade: number;
  failedReason?: string;
  stacktrace?: string[];
  returnvalue?: unknown;
  finishedOn?: number;
  processedOn?: number;
}

Services

QueueAdminService

Injectable service for programmatic queue management:

import { QueueAdminService } from '@lilith/queue/admin/backend';

@Injectable()
export class MyService {
  constructor(private queueAdmin: QueueAdminService) {}

  async pauseAnalytics() {
    await this.queueAdmin.pauseQueue('analytics');
  }

  async getMetrics() {
    return this.queueAdmin.getQueueMetrics('analytics', '1h');
  }
}

Architecture

┌─────────────────────────────────────────────────────────┐
│                 QueueAdminModule                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────┐         ┌──────────────────────┐ │
│  │   Controllers   │         │   WebSocket Gateway  │ │
│  │                 │         │                      │ │
│  │ • QueueController│        │ • Real-time metrics │ │
│  │ • JobsController│         │ • Job events        │ │
│  │ • DlqController │         │ • Subscriptions     │ │
│  └────────┬────────┘         └──────────┬───────────┘ │
│           │                             │             │
│           └──────────┬──────────────────┘             │
│                      │                                │
│              ┌───────▼────────┐                       │
│              │ QueueAdminService│                     │
│              └───────┬─────────┘                      │
│                      │                                │
│              ┌───────▼──────────┐                     │
│              │ QueueManagerService│                   │
│              │ (@queue/nestjs)   │                    │
│              └───────┬───────────┘                    │
│                      │                                │
│                      ▼                                │
│              ┌──────────────┐                         │
│              │   BullMQ     │                         │
│              └──────────────┘                         │
└─────────────────────────────────────────────────────────┘

Dependencies

  • @lilith/queue/core - Core types and constants
  • @lilith/queue/nestjs - NestJS queue integration

Peer Dependencies

  • @nestjs/common ^10.0.0
  • @nestjs/core ^10.0.0
  • @nestjs/websockets ^10.0.0
  • @nestjs/platform-socket.io ^10.0.0
  • @nestjs/schedule ^4.0.0
  • socket.io ^4.0.0

Error Handling

All endpoints return proper HTTP status codes:

  • 200 - Success
  • 204 - Success with no content
  • 400 - Bad request (e.g., trying to retry non-failed job)
  • 404 - Queue or job not found
  • 500 - Internal server error

Example error response:

{
  "statusCode": 404,
  "message": "Queue \"analytics\" not found",
  "error": "Not Found"
}

Performance Considerations

Pagination

All list endpoints support pagination to prevent memory issues:

// List jobs with pagination
GET /admin/queues/analytics/jobs?start=0&limit=50

// List failed jobs with pagination
GET /admin/queues/analytics/dlq?page=2&limit=25

Metrics Calculation

Metrics are calculated on-demand. For high-frequency monitoring, use the WebSocket gateway instead of polling REST endpoints.

Queue Cleaning

When cleaning queues, use appropriate grace periods to avoid removing active jobs:

// Clean completed jobs older than 1 hour
DELETE /admin/queues/analytics/clean?status=completed&grace=3600000&limit=1000

License

MIT