platform-codebase/features/platform-analytics/backend-api/src/processors
Lilith 93edf7e257 chore(src): 🔧 Update controller and processor files for report generation logic
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-02-20 11:51:45 -08:00
..
EXAMPLE_SCHEDULER.ts
index.ts
metric-rollup.processor.ts
processors.module.ts
README.md
report-generation.processor.ts chore(src): 🔧 Update controller and processor files for report generation logic 2026-02-20 11:51:45 -08:00

Platform Analytics Processors

Background job processors for analytics operations using BullMQ.

Processors

MetricRollupProcessor

Queue: analytics:rollup

Aggregates raw engagement metrics into summary tables for efficient dashboard queries.

Jobs:

  1. HOURLY_ROLLUP

    {
      jobType: 'HOURLY_ROLLUP',
      startTime: '2026-01-29T10:00:00Z',
      endTime: '2026-01-29T11:00:00Z',
    }
    
    • Aggregates engagement metrics from the last hour
    • Prepares data for daily rollup
  2. DAILY_ROLLUP

    {
      jobType: 'DAILY_ROLLUP',
      date: '2026-01-29',
      profileIds: ['uuid1', 'uuid2'], // optional
    }
    
    • Aggregates metrics into profile_performance table
    • Calculates CTR and other derived metrics
    • Can target specific profiles or all profiles

Output:

  • Upserts records in profile_performance table
  • One record per profile per day

ReportGenerationProcessor

Queue: analytics:reports

Generates scheduled analytics reports in various formats.

Jobs:

  1. GENERATE_REPORT

    {
      jobType: 'GENERATE_REPORT',
      reportType: 'PROFILE_PERFORMANCE' | 'REVENUE_SUMMARY' | 'COST_ANALYSIS' | 'ENGAGEMENT_METRICS',
      format: 'CSV' | 'JSON' | 'PDF',
      startDate: '2026-01-01',
      endDate: '2026-01-31',
      profileIds?: ['uuid1', 'uuid2'], // optional
      recipientEmail?: 'user@example.com', // optional
      outputPath?: '/custom/path/report.csv', // optional
    }
    
    • Generates comprehensive analytics report
    • Optionally sends via email
  2. EXPORT_DATA

    {
      jobType: 'EXPORT_DATA',
      dataType: 'ENGAGEMENT' | 'TRANSACTIONS' | 'COSTS',
      format: 'CSV' | 'JSON' | 'PDF',
      startDate: '2026-01-01',
      endDate: '2026-01-31',
      filters: {}, // optional
      outputPath: '/exports/data.csv',
    }
    
    • Exports raw data to file
    • Supports custom filtering

Report Types:

  • PROFILE_PERFORMANCE: Daily aggregated metrics per profile
  • REVENUE_SUMMARY: Revenue breakdown with statistics
  • COST_ANALYSIS: Cost breakdown by category and feature
  • ENGAGEMENT_METRICS: Engagement event statistics

Scheduling Jobs

Using BullMQ directly

import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

export class AnalyticsScheduler {
  constructor(
    @InjectQueue('analytics:rollup') private rollupQueue: Queue,
  ) {}

  async scheduleHourlyRollup() {
    const now = new Date();
    const startTime = new Date(now.getTime() - 60 * 60 * 1000); // 1 hour ago

    await this.rollupQueue.add('HOURLY_ROLLUP', {
      jobType: 'HOURLY_ROLLUP',
      startTime: startTime.toISOString(),
      endTime: now.toISOString(),
    });
  }
}
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class AnalyticsScheduler {
  constructor(
    @InjectQueue('analytics:rollup') private rollupQueue: Queue,
    @InjectQueue('analytics:reports') private reportsQueue: Queue,
  ) {}

  // Run hourly rollup at 5 minutes past every hour
  @Cron('5 * * * *')
  async scheduleHourlyRollup() {
    const now = new Date();
    const startTime = new Date(now.getTime() - 60 * 60 * 1000);

    await this.rollupQueue.add('HOURLY_ROLLUP', {
      jobType: 'HOURLY_ROLLUP',
      startTime: startTime.toISOString(),
      endTime: now.toISOString(),
    });
  }

  // Run daily rollup at 2:00 AM every day
  @Cron('0 2 * * *')
  async scheduleDailyRollup() {
    const yesterday = new Date();
    yesterday.setDate(yesterday.getDate() - 1);
    const dateStr = yesterday.toISOString().split('T')[0];

    await this.rollupQueue.add('DAILY_ROLLUP', {
      jobType: 'DAILY_ROLLUP',
      date: dateStr,
    });
  }

  // Generate weekly report every Monday at 8:00 AM
  @Cron('0 8 * * 1')
  async scheduleWeeklyReport() {
    const endDate = new Date();
    const startDate = new Date();
    startDate.setDate(startDate.getDate() - 7);

    await this.reportsQueue.add('GENERATE_REPORT', {
      jobType: 'GENERATE_REPORT',
      reportType: 'PROFILE_PERFORMANCE',
      format: 'PDF',
      startDate: startDate.toISOString().split('T')[0],
      endDate: endDate.toISOString().split('T')[0],
      recipientEmail: 'analytics@example.com',
    });
  }
}

Queue Configuration

Redis connection is configured in app.module.ts:

BullModule.forRootAsync({
  inject: [ConfigService],
  useFactory: async (config: ConfigService) => {
    const redisService = registry.services.get('infrastructure.redis');
    const redisHost = config.get('REDIS_HOST', 'localhost');
    const redisPort = redisService?.port ?? Number(config.get('REDIS_PORT', '6379'));

    return {
      connection: {
        host: redisHost,
        port: redisPort,
        password: config.get('REDIS_PASSWORD'),
      },
    };
  },
}),

Monitoring

BullBoard (Web UI)

We integrate with @lilith/queue/admin for queue monitoring:

import { QueueAdminModule } from '@lilith/queue/admin/backend';

@Module({
  imports: [
    QueueAdminModule.register({
      queues: ['analytics:rollup', 'analytics:reports'],
    }),
  ],
})
export class AppModule {}

Access at: http://localhost:4110/admin/queues

Metrics

Processors emit metrics via the BaseProcessor lifecycle:

  • Job started
  • Job progress
  • Job completed
  • Job failed

Error Handling

Retry Strategy

Jobs automatically retry with exponential backoff:

await queue.add('job-name', data, {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 2000, // 2s, 4s, 8s
  },
});

Dead Letter Queue

After max retries, jobs move to failed state and can be inspected via BullBoard.


Testing

import { Test } from '@nestjs/testing';
import { MetricRollupProcessor } from './metric-rollup.processor';

describe('MetricRollupProcessor', () => {
  let processor: MetricRollupProcessor;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      providers: [
        MetricRollupProcessor,
        // Mock repositories
      ],
    }).compile();

    processor = module.get(MetricRollupProcessor);
  });

  it('should aggregate metrics correctly', async () => {
    // Test implementation
  });
});

Performance Considerations

Batch Size

  • Hourly rollup: Processes ~100K events in <30s
  • Daily rollup: Processes ~2.4M events in <5 minutes

Optimization

  1. Database Indexes: Ensure indexes on timestamp, targetId, metricType
  2. Batch Inserts: Use TypeORM's save() with arrays for bulk upserts
  3. TimescaleDB: Consider TimescaleDB hypertables for engagement_metrics
  4. Partitioning: Partition profile_performance by date

Architecture

┌─────────────────────────────────────────────────────────┐
│                    Analytics API                        │
│  (HTTP endpoints, WebSocket, schedulers)                │
└────────────────────┬────────────────────────────────────┘
                     │ Enqueues jobs
                     ▼
┌─────────────────────────────────────────────────────────┐
│                  Redis (BullMQ)                         │
│  Queues: analytics:rollup, analytics:reports            │
└────────────────────┬────────────────────────────────────┘
                     │ Consumes jobs
                     ▼
┌─────────────────────────────────────────────────────────┐
│                   Processors                            │
│  - MetricRollupProcessor                                │
│  - ReportGenerationProcessor                            │
└────────────────────┬────────────────────────────────────┘
                     │ Reads/Writes
                     ▼
┌─────────────────────────────────────────────────────────┐
│                PostgreSQL Database                      │
│  - engagement_metrics (raw events)                      │
│  - profile_performance (aggregated)                     │
│  - transactions (revenue)                               │
│  - cost_entry (costs)                                   │
└─────────────────────────────────────────────────────────┘

Last Updated: 2026-01-29