- Update all package.json publishConfig to use forge.nasty.sh - Remove GitLab CI configuration - Set git remote to forge.nasty.sh/lilith/queue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> |
||
|---|---|---|
| .. | ||
| examples | ||
| migrations | ||
| src | ||
| .gitignore | ||
| eslint.config.cjs | ||
| package.json | ||
| README.md | ||
| tsconfig.json | ||
| vitest.config.ts | ||
@transquinnftw/queue-reporting
Job lifecycle database persistence using TypeORM.
Overview
The reporting package provides database persistence for job lifecycle events, enabling:
- Historical analysis of job processing
- Failure rate monitoring
- Performance metrics and percentiles
- Queue health dashboards
- Debug tracing for failed jobs
Installation
pnpm add @transquinnftw/queue-reporting
Setup
1. Configure TypeORM
First, set up TypeORM in your application:
// app.module.ts
import { TypeOrmModule } from '@nestjs/typeorm';
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'user',
password: 'password',
database: 'queue_db',
autoLoadEntities: true,
synchronize: false, // Use migrations in production
}),
],
})
export class AppModule {}
2. Import ReportingModule
import { ReportingModule } from '@transquinnftw/queue-reporting';
@Module({
imports: [
TypeOrmModule.forRoot({ /* ... */ }),
ReportingModule.forRoot(),
],
})
export class AppModule {}
3. Run Migrations
Generate and run the migration for the job_events table:
# Generate migration
pnpm typeorm migration:generate -n CreateJobEventsTable
# Run migration
pnpm typeorm migration:run
Usage
Automatic Event Logging
Integrate with BaseProcessor from @queue/nestjs:
import { BaseProcessor } from '@transquinnftw/queue-nestjs';
import { JobReporterService } from '@transquinnftw/queue-reporting';
@Processor('analytics')
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJobData> {
protected readonly logger = new Logger(AnalyticsProcessor.name);
protected readonly queueName = 'analytics';
constructor(private readonly reporterService: JobReporterService) {
super();
this.reporter = reporterService; // Enable automatic logging
}
protected async handleJob(job: Job<AnalyticsJobData>): Promise<void> {
// Job events (started, progress, completed, failed) are logged automatically
await this.updateProgress(job, 50, 'Halfway done');
// Process job...
}
}
Manual Event Logging
import { JobReporterService } from '@transquinnftw/queue-reporting';
@Injectable()
export class CustomService {
constructor(private readonly reporter: JobReporterService) {}
async processCustomJob(jobId: string) {
await this.reporter.logJobStarted(jobId, 'custom', 'process-data');
try {
// Process...
await this.reporter.logJobCompleted(jobId, 'custom', 'process-data', 1500);
} catch (error) {
await this.reporter.logJobFailed(jobId, 'custom', 'process-data', error.message, 500);
}
}
}
Analytics Queries
import { JobAnalyticsService } from '@transquinnftw/queue-reporting';
@Controller('metrics')
export class MetricsController {
constructor(private readonly analytics: JobAnalyticsService) {}
@Get('health/:queue')
async getQueueHealth(@Param('queue') queue: string) {
return this.analytics.getQueueHealth(queue, 3600000); // Last hour
}
@Get('failures/:queue')
async getFailureRate(@Param('queue') queue: string) {
const rate = await this.analytics.getFailureRate(queue, 3600000);
return { queue, failureRate: (rate * 100).toFixed(2) + '%' };
}
@Get('performance/:queue')
async getPerformance(@Param('queue') queue: string) {
const [avgTime, percentiles] = await Promise.all([
this.analytics.getAverageProcessingTime(queue, 100),
this.analytics.getProcessingTimePercentiles(queue, 1000),
]);
return { avgTime, percentiles };
}
@Get('errors/:queue')
async getTopErrors(@Param('queue') queue: string) {
return this.analytics.getTopErrors(queue, 10, 86400000); // Last 24h
}
}
Scheduled Cleanup
import { Cron } from '@nestjs/schedule';
import { JobReporterService } from '@transquinnftw/queue-reporting';
@Injectable()
export class CleanupService {
constructor(private readonly reporter: JobReporterService) {}
@Cron('0 2 * * *') // Daily at 2 AM
async cleanupOldEvents() {
const deleted = await this.reporter.cleanupOldEvents(30); // Keep 30 days
console.log(`Cleaned up ${deleted} old job events`);
}
}
API Reference
JobReporterService
logEvent(event: JobEvent): Promise<void>- Log a job lifecycle eventlogJobQueued(jobId, queue, jobType, metadata?): Promise<void>- Log queued eventlogJobStarted(jobId, queue, jobType, metadata?): Promise<void>- Log started eventlogJobCompleted(jobId, queue, jobType, durationMs, metadata?): Promise<void>- Log completionlogJobFailed(jobId, queue, jobType, error, durationMs, metadata?): Promise<void>- Log failurecleanupOldEvents(daysToKeep: number): Promise<number>- Delete old events
JobAnalyticsService
getJobEvents(jobId: string): Promise<JobEvent[]>- Get all events for a jobgetEventsByQueue(queue, options): Promise<JobEvent[]>- Get events for a queuegetFailureRate(queue, periodMs): Promise<number>- Calculate failure rate (0-1)getAverageProcessingTime(queue, sampleSize): Promise<number>- Avg time in msgetProcessingTimePercentiles(queue, sampleSize): Promise<{p50, p95, p99, min, max}>- Time distributiongetThroughput(queue, periodMs): Promise<number>- Jobs per secondgetTopErrors(queue, limit, periodMs?): Promise<Array<{error, count}>>- Most common errorsgetQueueHealth(queue, periodMs): Promise<HealthSummary>- Aggregated health metrics
Database Schema
job_events table
| Column | Type | Description |
|---|---|---|
| id | UUID | Primary key |
| job_id | VARCHAR | Job identifier |
| queue | VARCHAR | Queue name |
| job_type | VARCHAR | Job type |
| type | VARCHAR | Event type (queued, started, progress, completed, failed, retrying, moved_to_dlq) |
| progress | INTEGER | Progress percentage (0-100) |
| duration_ms | INTEGER | Processing duration in milliseconds |
| error | TEXT | Error message (for failed events) |
| metadata | JSONB | Additional event data |
| timestamp | TIMESTAMP | Event timestamp |
Indexes
job_id- For job lifecycle queriesqueue- For queue-specific analyticstype- For event type filteringtimestamp- For time-based queries
Performance Considerations
Non-blocking Logging
All event logging is non-blocking - failures to log events will not affect job processing:
// If database is down, job processing continues
await this.updateProgress(job, 50); // Logs progress (non-blocking)
// Job continues even if logging fails
Query Optimization
- Use
limitparameter to cap query results - Specify time ranges (
from/to) to leverage timestamp index - Run cleanup regularly to prevent unbounded table growth
- Consider partitioning by timestamp for high-volume queues
Storage Planning
Estimate storage needs:
- Average event size: ~500 bytes (without large metadata)
- Events per job: ~5 (queued, started, progress, completed)
- Daily jobs: 1M jobs = ~2.5GB/day uncompressed
- With 30-day retention: ~75GB
Enable PostgreSQL compression and consider archiving old events to cold storage.
Integration with @queue/nestjs
The JobReporter interface from @queue/nestjs is implemented by JobReporterService. Simply inject it into your processors:
@Processor('my-queue')
export class MyProcessor extends BaseProcessor<MyJobData> {
constructor(reporter: JobReporterService) {
super();
this.reporter = reporter; // BaseProcessor uses this
}
}
BaseProcessor automatically logs:
- Job start
- Progress updates
- Successful completion
- Failures with retry metadata
- DLQ moves (when max retries exceeded)
License
MIT