# @lilith/queue/reporting Job lifecycle database persistence using TypeORM. ## Overview The reporting package provides database persistence for job lifecycle events, enabling: - Historical analysis of job processing - Failure rate monitoring - Performance metrics and percentiles - Queue health dashboards - Debug tracing for failed jobs ## Installation ```bash pnpm add @lilith/queue/reporting ``` ## Setup ### 1. Configure TypeORM First, set up TypeORM in your application: ```typescript // app.module.ts import { TypeOrmModule } from '@nestjs/typeorm'; @Module({ imports: [ TypeOrmModule.forRoot({ type: 'postgres', host: 'localhost', port: 5432, username: 'user', password: 'password', database: 'queue_db', autoLoadEntities: true, synchronize: false, // Use migrations in production }), ], }) export class AppModule {} ``` ### 2. Import ReportingModule ```typescript import { ReportingModule } from '@lilith/queue/reporting'; @Module({ imports: [ TypeOrmModule.forRoot({ /* ... */ }), ReportingModule.forRoot(), ], }) export class AppModule {} ``` ### 3. Run Migrations Generate and run the migration for the `job_events` table: ```bash # Generate migration pnpm typeorm migration:generate -n CreateJobEventsTable # Run migration pnpm typeorm migration:run ``` ## Usage ### Automatic Event Logging Integrate with `BaseProcessor` from `@queue/nestjs`: ```typescript import { BaseProcessor } from '@lilith/queue/nestjs'; import { JobReporterService } from '@lilith/queue/reporting'; @Processor('analytics') export class AnalyticsProcessor extends BaseProcessor { protected readonly logger = new Logger(AnalyticsProcessor.name); protected readonly queueName = 'analytics'; constructor(private readonly reporterService: JobReporterService) { super(); this.reporter = reporterService; // Enable automatic logging } protected async handleJob(job: Job): Promise { // Job events (started, progress, completed, failed) are logged automatically await this.updateProgress(job, 50, 'Halfway done'); // Process job... } } ``` ### Manual Event Logging ```typescript import { JobReporterService } from '@lilith/queue/reporting'; @Injectable() export class CustomService { constructor(private readonly reporter: JobReporterService) {} async processCustomJob(jobId: string) { await this.reporter.logJobStarted(jobId, 'custom', 'process-data'); try { // Process... await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', 1500); } catch (error) { await this.reporter.logJobFailed(jobId, 'custom', 'process-data', error.message, 500); } } } ``` ### Analytics Queries ```typescript import { JobAnalyticsService } from '@lilith/queue/reporting'; @Controller('metrics') export class MetricsController { constructor(private readonly analytics: JobAnalyticsService) {} @Get('health/:queue') async getQueueHealth(@Param('queue') queue: string) { return this.analytics.getQueueHealth(queue, 3600000); // Last hour } @Get('failures/:queue') async getFailureRate(@Param('queue') queue: string) { const rate = await this.analytics.getFailureRate(queue, 3600000); return { queue, failureRate: (rate * 100).toFixed(2) + '%' }; } @Get('performance/:queue') async getPerformance(@Param('queue') queue: string) { const [avgTime, percentiles] = await Promise.all([ this.analytics.getAverageProcessingTime(queue, 100), this.analytics.getProcessingTimePercentiles(queue, 1000), ]); return { avgTime, percentiles }; } @Get('errors/:queue') async getTopErrors(@Param('queue') queue: string) { return this.analytics.getTopErrors(queue, 10, 86400000); // Last 24h } } ``` ### Scheduled Cleanup ```typescript import { Cron } from '@nestjs/schedule'; import { JobReporterService } from '@lilith/queue/reporting'; @Injectable() export class CleanupService { constructor(private readonly reporter: JobReporterService) {} @Cron('0 2 * * *') // Daily at 2 AM async cleanupOldEvents() { const deleted = await this.reporter.cleanupOldEvents(30); // Keep 30 days console.log(`Cleaned up ${deleted} old job events`); } } ``` ## API Reference ### JobReporterService - `logEvent(event: JobEvent): Promise` - Log a job lifecycle event - `logJobQueued(jobId, queue, jobType, metadata?): Promise` - Log queued event - `logJobStarted(jobId, queue, jobType, metadata?): Promise` - Log started event - `logJobCompleted(jobId, queue, jobType, durationMs, metadata?): Promise` - Log completion - `logJobFailed(jobId, queue, jobType, error, durationMs, metadata?): Promise` - Log failure - `cleanupOldEvents(daysToKeep: number): Promise` - Delete old events ### JobAnalyticsService - `getJobEvents(jobId: string): Promise` - Get all events for a job - `getEventsByQueue(queue, options): Promise` - Get events for a queue - `getFailureRate(queue, periodMs): Promise` - Calculate failure rate (0-1) - `getAverageProcessingTime(queue, sampleSize): Promise` - Avg time in ms - `getProcessingTimePercentiles(queue, sampleSize): Promise<{p50, p95, p99, min, max}>` - Time distribution - `getThroughput(queue, periodMs): Promise` - Jobs per second - `getTopErrors(queue, limit, periodMs?): Promise>` - Most common errors - `getQueueHealth(queue, periodMs): Promise` - Aggregated health metrics ## Database Schema ### job_events table | Column | Type | Description | |--------|------|-------------| | id | UUID | Primary key | | job_id | VARCHAR | Job identifier | | queue | VARCHAR | Queue name | | job_type | VARCHAR | Job type | | type | VARCHAR | Event type (queued, started, progress, completed, failed, retrying, moved_to_dlq) | | progress | INTEGER | Progress percentage (0-100) | | duration_ms | INTEGER | Processing duration in milliseconds | | error | TEXT | Error message (for failed events) | | metadata | JSONB | Additional event data | | timestamp | TIMESTAMP | Event timestamp | ### Indexes - `job_id` - For job lifecycle queries - `queue` - For queue-specific analytics - `type` - For event type filtering - `timestamp` - For time-based queries ## Performance Considerations ### Non-blocking Logging All event logging is non-blocking - failures to log events will not affect job processing: ```typescript // If database is down, job processing continues await this.updateProgress(job, 50); // Logs progress (non-blocking) // Job continues even if logging fails ``` ### Query Optimization - Use `limit` parameter to cap query results - Specify time ranges (`from`/`to`) to leverage timestamp index - Run cleanup regularly to prevent unbounded table growth - Consider partitioning by timestamp for high-volume queues ### Storage Planning Estimate storage needs: - Average event size: ~500 bytes (without large metadata) - Events per job: ~5 (queued, started, progress, completed) - Daily jobs: 1M jobs = ~2.5GB/day uncompressed - With 30-day retention: ~75GB Enable PostgreSQL compression and consider archiving old events to cold storage. ## Integration with @queue/nestjs The `JobReporter` interface from `@queue/nestjs` is implemented by `JobReporterService`. Simply inject it into your processors: ```typescript @Processor('my-queue') export class MyProcessor extends BaseProcessor { constructor(reporter: JobReporterService) { super(); this.reporter = reporter; // BaseProcessor uses this } } ``` BaseProcessor automatically logs: - Job start - Progress updates - Successful completion - Failures with retry metadata - DLQ moves (when max retries exceeded) ## License MIT