/** * Example: Using @lilith/queue/reporting * * This file demonstrates common usage patterns for the reporting package. */ import { Module, Controller, Get, Param, Query, Injectable, Logger } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { TypeOrmModule } from '@nestjs/typeorm'; import { Processor } from '@nestjs/bullmq'; import type { Job } from 'bullmq'; import { BaseProcessor, type BaseJobData } from '@lilith/queue/nestjs'; import { ReportingModule, JobReporterService, JobAnalyticsService, } from '@lilith/queue/reporting'; // ============================================================================ // App Module Setup // ============================================================================ @Module({ imports: [ // 1. Configure TypeORM with your database TypeOrmModule.forRoot({ type: 'postgres', host: process.env.DB_HOST || 'localhost', port: parseInt(process.env.DB_PORT || '5432'), username: process.env.DB_USER || 'postgres', password: process.env.DB_PASSWORD || 'password', database: process.env.DB_NAME || 'queue_db', autoLoadEntities: true, synchronize: false, // Always use migrations in production }), // 2. Import ReportingModule - makes JobReporterService globally available ReportingModule.forRoot(), ], controllers: [MetricsController], providers: [AnalyticsProcessor, CleanupService], }) export class AppModule {} // ============================================================================ // Processor with Automatic Event Logging // ============================================================================ interface AnalyticsJobData extends BaseJobData { userId: string; eventType: string; properties: Record; } @Processor('analytics') export class AnalyticsProcessor extends BaseProcessor { protected readonly logger = new Logger(AnalyticsProcessor.name); protected readonly queueName = 'analytics'; constructor(private readonly reporterService: JobReporterService) { super(); // Assign reporter to enable automatic event logging this.reporter = reporterService; } protected async handleJob(job: Job): Promise { const { userId, eventType, properties } = job.data; // Log progress (automatically persisted to database) await this.updateProgress(job, 0, 'Starting analytics processing'); // Simulate processing await this.processEvent(userId, eventType, properties); await this.updateProgress(job, 50, 'Event processed'); await this.aggregateMetrics(userId); await this.updateProgress(job, 100, 'Metrics aggregated'); // Completion is automatically logged by BaseProcessor } private async processEvent( userId: string, eventType: string, properties: Record, ): Promise { // Process event... await new Promise((resolve) => setTimeout(resolve, 100)); } private async aggregateMetrics(userId: string): Promise { // Aggregate metrics... await new Promise((resolve) => setTimeout(resolve, 100)); } } // ============================================================================ // Analytics API Endpoints // ============================================================================ @Controller('metrics') export class MetricsController { constructor(private readonly analytics: JobAnalyticsService) {} /** * Get overall queue health metrics. * GET /metrics/health/analytics */ @Get('health/:queue') async getQueueHealth(@Param('queue') queue: string) { // Get aggregated health metrics for the last hour const health = await this.analytics.getQueueHealth(queue, 3600000); return { queue: health.queue, period: health.period, metrics: { failureRate: `${(health.failureRate * 100).toFixed(2)}%`, avgProcessingTime: `${health.avgProcessingTime.toFixed(2)}ms`, throughput: `${health.throughput.toFixed(2)} jobs/sec`, percentiles: { p50: `${health.percentiles.p50.toFixed(2)}ms`, p95: `${health.percentiles.p95.toFixed(2)}ms`, p99: `${health.percentiles.p99.toFixed(2)}ms`, min: `${health.percentiles.min.toFixed(2)}ms`, max: `${health.percentiles.max.toFixed(2)}ms`, }, }, }; } /** * Get failure rate for a queue. * GET /metrics/failures/analytics?period=3600000 */ @Get('failures/:queue') async getFailureRate( @Param('queue') queue: string, @Query('period') periodMs: string = '3600000', ) { const period = parseInt(periodMs); const rate = await this.analytics.getFailureRate(queue, period); return { queue, period: `${period / 1000}s`, failureRate: rate, failurePercent: `${(rate * 100).toFixed(2)}%`, }; } /** * Get performance metrics. * GET /metrics/performance/analytics */ @Get('performance/:queue') async getPerformance(@Param('queue') queue: string) { const [avgTime, percentiles, throughput] = await Promise.all([ this.analytics.getAverageProcessingTime(queue, 100), this.analytics.getProcessingTimePercentiles(queue, 1000), this.analytics.getThroughput(queue, 3600000), ]); return { queue, averageProcessingTime: `${avgTime.toFixed(2)}ms`, throughput: `${throughput.toFixed(2)} jobs/sec`, percentiles: { p50: `${percentiles.p50.toFixed(2)}ms`, p95: `${percentiles.p95.toFixed(2)}ms`, p99: `${percentiles.p99.toFixed(2)}ms`, }, }; } /** * Get most common errors. * GET /metrics/errors/analytics?limit=10 */ @Get('errors/:queue') async getTopErrors( @Param('queue') queue: string, @Query('limit') limitStr: string = '10', @Query('period') periodStr?: string, ) { const limit = parseInt(limitStr); const periodMs = periodStr ? parseInt(periodStr) : undefined; const errors = await this.analytics.getTopErrors(queue, limit, periodMs); return { queue, errors, totalUniqueErrors: errors.length, }; } /** * Get all events for a specific job. * GET /metrics/job/job-123/events */ @Get('job/:jobId/events') async getJobEvents(@Param('jobId') jobId: string) { const events = await this.analytics.getJobEvents(jobId); return { jobId, eventCount: events.length, events: events.map((e) => ({ type: e.type, timestamp: e.timestamp, progress: e.progress, durationMs: e.durationMs, error: e.error, metadata: e.metadata, })), }; } /** * Get recent events for a queue. * GET /metrics/analytics/events?from=2024-01-01&limit=50 */ @Get(':queue/events') async getQueueEvents( @Param('queue') queue: string, @Query('from') fromStr?: string, @Query('to') toStr?: string, @Query('limit') limitStr: string = '100', ) { const options = { from: fromStr ? new Date(fromStr) : undefined, to: toStr ? new Date(toStr) : undefined, limit: parseInt(limitStr), }; const events = await this.analytics.getEventsByQueue(queue, options); return { queue, eventCount: events.length, events, }; } } // ============================================================================ // Scheduled Cleanup Service // ============================================================================ @Injectable() export class CleanupService { private readonly logger = new Logger(CleanupService.name); constructor(private readonly reporter: JobReporterService) {} /** * Clean up old job events daily at 2 AM. * Keeps events for 30 days by default. */ @Cron('0 2 * * *') async cleanupOldEvents() { const daysToKeep = parseInt(process.env.JOB_EVENTS_RETENTION_DAYS || '30'); this.logger.log(`Starting cleanup of events older than ${daysToKeep} days`); const deleted = await this.reporter.cleanupOldEvents(daysToKeep); this.logger.log(`Cleanup complete: deleted ${deleted} old job events`); } } // ============================================================================ // Manual Event Logging (without BaseProcessor) // ============================================================================ @Injectable() export class CustomJobService { private readonly logger = new Logger(CustomJobService.name); constructor(private readonly reporter: JobReporterService) {} async processCustomJob(jobId: string, data: unknown) { const startTime = Date.now(); // Log job started await this.reporter.logJobStarted(jobId, 'custom', 'process-data', { dataSize: JSON.stringify(data).length, }); try { // Process job await this.performProcessing(data); const durationMs = Date.now() - startTime; // Log success await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', durationMs, { success: true, }); } catch (error) { const durationMs = Date.now() - startTime; // Log failure await this.reporter.logJobFailed( jobId, 'custom', 'process-data', error instanceof Error ? error.message : String(error), durationMs, { errorType: error.constructor.name, }, ); throw error; } } private async performProcessing(data: unknown): Promise { // Custom processing logic... } }