queue/reporting
Lilith 285de6673a 🔧 migrate to @lilith namespace, remove gitlab-ci.yml
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-31 01:35:24 -08:00
..
examples ♻️ Update imports from @transquinnftw/queue-* to @lilith/queue/* 2025-12-30 20:28:26 -08:00
migrations feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00
src 🔧 migrate to @lilith namespace, remove gitlab-ci.yml 2025-12-31 01:35:24 -08:00
.gitignore feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00
README.md 📝 Update documentation to reflect @lilith/queue package structure 2025-12-30 20:28:34 -08:00
tsconfig.json 🔧 Add unified TypeScript and Vitest configuration 2025-12-30 20:27:44 -08:00
vitest.config.ts feat: consolidate @queue packages into unified monorepo 2025-12-30 18:57:45 -08:00

@lilith/queue/reporting

Job lifecycle database persistence using TypeORM.

Overview

The reporting package provides database persistence for job lifecycle events, enabling:

  • Historical analysis of job processing
  • Failure rate monitoring
  • Performance metrics and percentiles
  • Queue health dashboards
  • Debug tracing for failed jobs

Installation

pnpm add @lilith/queue/reporting

Setup

1. Configure TypeORM

First, set up TypeORM in your application:

// app.module.ts
import { TypeOrmModule } from '@nestjs/typeorm';

@Module({
  imports: [
    TypeOrmModule.forRoot({
      type: 'postgres',
      host: 'localhost',
      port: 5432,
      username: 'user',
      password: 'password',
      database: 'queue_db',
      autoLoadEntities: true,
      synchronize: false, // Use migrations in production
    }),
  ],
})
export class AppModule {}

2. Import ReportingModule

import { ReportingModule } from '@lilith/queue/reporting';

@Module({
  imports: [
    TypeOrmModule.forRoot({ /* ... */ }),
    ReportingModule.forRoot(),
  ],
})
export class AppModule {}

3. Run Migrations

Generate and run the migration for the job_events table:

# Generate migration
pnpm typeorm migration:generate -n CreateJobEventsTable

# Run migration
pnpm typeorm migration:run

Usage

Automatic Event Logging

Integrate with BaseProcessor from @queue/nestjs:

import { BaseProcessor } from '@lilith/queue/nestjs';
import { JobReporterService } from '@lilith/queue/reporting';

@Processor('analytics')
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJobData> {
  protected readonly logger = new Logger(AnalyticsProcessor.name);
  protected readonly queueName = 'analytics';

  constructor(private readonly reporterService: JobReporterService) {
    super();
    this.reporter = reporterService; // Enable automatic logging
  }

  protected async handleJob(job: Job<AnalyticsJobData>): Promise<void> {
    // Job events (started, progress, completed, failed) are logged automatically
    await this.updateProgress(job, 50, 'Halfway done');
    // Process job...
  }
}

Manual Event Logging

import { JobReporterService } from '@lilith/queue/reporting';

@Injectable()
export class CustomService {
  constructor(private readonly reporter: JobReporterService) {}

  async processCustomJob(jobId: string) {
    await this.reporter.logJobStarted(jobId, 'custom', 'process-data');

    try {
      // Process...
      await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', 1500);
    } catch (error) {
      await this.reporter.logJobFailed(jobId, 'custom', 'process-data', error.message, 500);
    }
  }
}

Analytics Queries

import { JobAnalyticsService } from '@lilith/queue/reporting';

@Controller('metrics')
export class MetricsController {
  constructor(private readonly analytics: JobAnalyticsService) {}

  @Get('health/:queue')
  async getQueueHealth(@Param('queue') queue: string) {
    return this.analytics.getQueueHealth(queue, 3600000); // Last hour
  }

  @Get('failures/:queue')
  async getFailureRate(@Param('queue') queue: string) {
    const rate = await this.analytics.getFailureRate(queue, 3600000);
    return { queue, failureRate: (rate * 100).toFixed(2) + '%' };
  }

  @Get('performance/:queue')
  async getPerformance(@Param('queue') queue: string) {
    const [avgTime, percentiles] = await Promise.all([
      this.analytics.getAverageProcessingTime(queue, 100),
      this.analytics.getProcessingTimePercentiles(queue, 1000),
    ]);

    return { avgTime, percentiles };
  }

  @Get('errors/:queue')
  async getTopErrors(@Param('queue') queue: string) {
    return this.analytics.getTopErrors(queue, 10, 86400000); // Last 24h
  }
}

Scheduled Cleanup

import { Cron } from '@nestjs/schedule';
import { JobReporterService } from '@lilith/queue/reporting';

@Injectable()
export class CleanupService {
  constructor(private readonly reporter: JobReporterService) {}

  @Cron('0 2 * * *') // Daily at 2 AM
  async cleanupOldEvents() {
    const deleted = await this.reporter.cleanupOldEvents(30); // Keep 30 days
    console.log(`Cleaned up ${deleted} old job events`);
  }
}

API Reference

JobReporterService

  • logEvent(event: JobEvent): Promise<void> - Log a job lifecycle event
  • logJobQueued(jobId, queue, jobType, metadata?): Promise<void> - Log queued event
  • logJobStarted(jobId, queue, jobType, metadata?): Promise<void> - Log started event
  • logJobCompleted(jobId, queue, jobType, durationMs, metadata?): Promise<void> - Log completion
  • logJobFailed(jobId, queue, jobType, error, durationMs, metadata?): Promise<void> - Log failure
  • cleanupOldEvents(daysToKeep: number): Promise<number> - Delete old events

JobAnalyticsService

  • getJobEvents(jobId: string): Promise<JobEvent[]> - Get all events for a job
  • getEventsByQueue(queue, options): Promise<JobEvent[]> - Get events for a queue
  • getFailureRate(queue, periodMs): Promise<number> - Calculate failure rate (0-1)
  • getAverageProcessingTime(queue, sampleSize): Promise<number> - Avg time in ms
  • getProcessingTimePercentiles(queue, sampleSize): Promise<{p50, p95, p99, min, max}> - Time distribution
  • getThroughput(queue, periodMs): Promise<number> - Jobs per second
  • getTopErrors(queue, limit, periodMs?): Promise<Array<{error, count}>> - Most common errors
  • getQueueHealth(queue, periodMs): Promise<HealthSummary> - Aggregated health metrics

Database Schema

job_events table

Column Type Description
id UUID Primary key
job_id VARCHAR Job identifier
queue VARCHAR Queue name
job_type VARCHAR Job type
type VARCHAR Event type (queued, started, progress, completed, failed, retrying, moved_to_dlq)
progress INTEGER Progress percentage (0-100)
duration_ms INTEGER Processing duration in milliseconds
error TEXT Error message (for failed events)
metadata JSONB Additional event data
timestamp TIMESTAMP Event timestamp

Indexes

  • job_id - For job lifecycle queries
  • queue - For queue-specific analytics
  • type - For event type filtering
  • timestamp - For time-based queries

Performance Considerations

Non-blocking Logging

All event logging is non-blocking - failures to log events will not affect job processing:

// If database is down, job processing continues
await this.updateProgress(job, 50); // Logs progress (non-blocking)
// Job continues even if logging fails

Query Optimization

  • Use limit parameter to cap query results
  • Specify time ranges (from/to) to leverage timestamp index
  • Run cleanup regularly to prevent unbounded table growth
  • Consider partitioning by timestamp for high-volume queues

Storage Planning

Estimate storage needs:

  • Average event size: ~500 bytes (without large metadata)
  • Events per job: ~5 (queued, started, progress, completed)
  • Daily jobs: 1M jobs = ~2.5GB/day uncompressed
  • With 30-day retention: ~75GB

Enable PostgreSQL compression and consider archiving old events to cold storage.

Integration with @queue/nestjs

The JobReporter interface from @queue/nestjs is implemented by JobReporterService. Simply inject it into your processors:

@Processor('my-queue')
export class MyProcessor extends BaseProcessor<MyJobData> {
  constructor(reporter: JobReporterService) {
    super();
    this.reporter = reporter; // BaseProcessor uses this
  }
}

BaseProcessor automatically logs:

  • Job start
  • Progress updates
  • Successful completion
  • Failures with retry metadata
  • DLQ moves (when max retries exceeded)

License

MIT