queue/reporting/examples/usage-example.ts
Lilith 7a162b4b8a ♻️ Update imports from @transquinnftw/queue-* to @lilith/queue/*
Migrate all internal imports to use the new unified package paths:
- @transquinnftw/queue-core → @lilith/queue/core
- @transquinnftw/queue-nestjs → @lilith/queue/nestjs
- @transquinnftw/queue-ml → @lilith/queue/ml
- @transquinnftw/queue-reporting → @lilith/queue/reporting

Also fix cron type compatibility issue in base-scheduler.ts.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 20:28:26 -08:00

323 lines
9.3 KiB
TypeScript

/**
* Example: Using @lilith/queue/reporting
*
* This file demonstrates common usage patterns for the reporting package.
*/
import { Module, Controller, Get, Param, Query, Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Processor } from '@nestjs/bullmq';
import type { Job } from 'bullmq';
import { BaseProcessor, type BaseJobData } from '@lilith/queue/nestjs';
import {
ReportingModule,
JobReporterService,
JobAnalyticsService,
} from '@lilith/queue/reporting';
// ============================================================================
// App Module Setup
// ============================================================================
@Module({
imports: [
// 1. Configure TypeORM with your database
TypeOrmModule.forRoot({
type: 'postgres',
host: process.env.DB_HOST || 'localhost',
port: parseInt(process.env.DB_PORT || '5432'),
username: process.env.DB_USER || 'postgres',
password: process.env.DB_PASSWORD || 'password',
database: process.env.DB_NAME || 'queue_db',
autoLoadEntities: true,
synchronize: false, // Always use migrations in production
}),
// 2. Import ReportingModule - makes JobReporterService globally available
ReportingModule.forRoot(),
],
controllers: [MetricsController],
providers: [AnalyticsProcessor, CleanupService],
})
export class AppModule {}
// ============================================================================
// Processor with Automatic Event Logging
// ============================================================================
interface AnalyticsJobData extends BaseJobData {
userId: string;
eventType: string;
properties: Record<string, unknown>;
}
@Processor('analytics')
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJobData> {
protected readonly logger = new Logger(AnalyticsProcessor.name);
protected readonly queueName = 'analytics';
constructor(private readonly reporterService: JobReporterService) {
super();
// Assign reporter to enable automatic event logging
this.reporter = reporterService;
}
protected async handleJob(job: Job<AnalyticsJobData>): Promise<void> {
const { userId, eventType, properties } = job.data;
// Log progress (automatically persisted to database)
await this.updateProgress(job, 0, 'Starting analytics processing');
// Simulate processing
await this.processEvent(userId, eventType, properties);
await this.updateProgress(job, 50, 'Event processed');
await this.aggregateMetrics(userId);
await this.updateProgress(job, 100, 'Metrics aggregated');
// Completion is automatically logged by BaseProcessor
}
private async processEvent(
userId: string,
eventType: string,
properties: Record<string, unknown>,
): Promise<void> {
// Process event...
await new Promise((resolve) => setTimeout(resolve, 100));
}
private async aggregateMetrics(userId: string): Promise<void> {
// Aggregate metrics...
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
// ============================================================================
// Analytics API Endpoints
// ============================================================================
@Controller('metrics')
export class MetricsController {
constructor(private readonly analytics: JobAnalyticsService) {}
/**
* Get overall queue health metrics.
* GET /metrics/health/analytics
*/
@Get('health/:queue')
async getQueueHealth(@Param('queue') queue: string) {
// Get aggregated health metrics for the last hour
const health = await this.analytics.getQueueHealth(queue, 3600000);
return {
queue: health.queue,
period: health.period,
metrics: {
failureRate: `${(health.failureRate * 100).toFixed(2)}%`,
avgProcessingTime: `${health.avgProcessingTime.toFixed(2)}ms`,
throughput: `${health.throughput.toFixed(2)} jobs/sec`,
percentiles: {
p50: `${health.percentiles.p50.toFixed(2)}ms`,
p95: `${health.percentiles.p95.toFixed(2)}ms`,
p99: `${health.percentiles.p99.toFixed(2)}ms`,
min: `${health.percentiles.min.toFixed(2)}ms`,
max: `${health.percentiles.max.toFixed(2)}ms`,
},
},
};
}
/**
* Get failure rate for a queue.
* GET /metrics/failures/analytics?period=3600000
*/
@Get('failures/:queue')
async getFailureRate(
@Param('queue') queue: string,
@Query('period') periodMs: string = '3600000',
) {
const period = parseInt(periodMs);
const rate = await this.analytics.getFailureRate(queue, period);
return {
queue,
period: `${period / 1000}s`,
failureRate: rate,
failurePercent: `${(rate * 100).toFixed(2)}%`,
};
}
/**
* Get performance metrics.
* GET /metrics/performance/analytics
*/
@Get('performance/:queue')
async getPerformance(@Param('queue') queue: string) {
const [avgTime, percentiles, throughput] = await Promise.all([
this.analytics.getAverageProcessingTime(queue, 100),
this.analytics.getProcessingTimePercentiles(queue, 1000),
this.analytics.getThroughput(queue, 3600000),
]);
return {
queue,
averageProcessingTime: `${avgTime.toFixed(2)}ms`,
throughput: `${throughput.toFixed(2)} jobs/sec`,
percentiles: {
p50: `${percentiles.p50.toFixed(2)}ms`,
p95: `${percentiles.p95.toFixed(2)}ms`,
p99: `${percentiles.p99.toFixed(2)}ms`,
},
};
}
/**
* Get most common errors.
* GET /metrics/errors/analytics?limit=10
*/
@Get('errors/:queue')
async getTopErrors(
@Param('queue') queue: string,
@Query('limit') limitStr: string = '10',
@Query('period') periodStr?: string,
) {
const limit = parseInt(limitStr);
const periodMs = periodStr ? parseInt(periodStr) : undefined;
const errors = await this.analytics.getTopErrors(queue, limit, periodMs);
return {
queue,
errors,
totalUniqueErrors: errors.length,
};
}
/**
* Get all events for a specific job.
* GET /metrics/job/job-123/events
*/
@Get('job/:jobId/events')
async getJobEvents(@Param('jobId') jobId: string) {
const events = await this.analytics.getJobEvents(jobId);
return {
jobId,
eventCount: events.length,
events: events.map((e) => ({
type: e.type,
timestamp: e.timestamp,
progress: e.progress,
durationMs: e.durationMs,
error: e.error,
metadata: e.metadata,
})),
};
}
/**
* Get recent events for a queue.
* GET /metrics/analytics/events?from=2024-01-01&limit=50
*/
@Get(':queue/events')
async getQueueEvents(
@Param('queue') queue: string,
@Query('from') fromStr?: string,
@Query('to') toStr?: string,
@Query('limit') limitStr: string = '100',
) {
const options = {
from: fromStr ? new Date(fromStr) : undefined,
to: toStr ? new Date(toStr) : undefined,
limit: parseInt(limitStr),
};
const events = await this.analytics.getEventsByQueue(queue, options);
return {
queue,
eventCount: events.length,
events,
};
}
}
// ============================================================================
// Scheduled Cleanup Service
// ============================================================================
@Injectable()
export class CleanupService {
private readonly logger = new Logger(CleanupService.name);
constructor(private readonly reporter: JobReporterService) {}
/**
* Clean up old job events daily at 2 AM.
* Keeps events for 30 days by default.
*/
@Cron('0 2 * * *')
async cleanupOldEvents() {
const daysToKeep = parseInt(process.env.JOB_EVENTS_RETENTION_DAYS || '30');
this.logger.log(`Starting cleanup of events older than ${daysToKeep} days`);
const deleted = await this.reporter.cleanupOldEvents(daysToKeep);
this.logger.log(`Cleanup complete: deleted ${deleted} old job events`);
}
}
// ============================================================================
// Manual Event Logging (without BaseProcessor)
// ============================================================================
@Injectable()
export class CustomJobService {
private readonly logger = new Logger(CustomJobService.name);
constructor(private readonly reporter: JobReporterService) {}
async processCustomJob(jobId: string, data: unknown) {
const startTime = Date.now();
// Log job started
await this.reporter.logJobStarted(jobId, 'custom', 'process-data', {
dataSize: JSON.stringify(data).length,
});
try {
// Process job
await this.performProcessing(data);
const durationMs = Date.now() - startTime;
// Log success
await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', durationMs, {
success: true,
});
} catch (error) {
const durationMs = Date.now() - startTime;
// Log failure
await this.reporter.logJobFailed(
jobId,
'custom',
'process-data',
error instanceof Error ? error.message : String(error),
durationMs,
{
errorType: error.constructor.name,
},
);
throw error;
}
}
private async performProcessing(data: unknown): Promise<void> {
// Custom processing logic...
}
}