♻️ Migrate analytics backend to PlatformQueueModule

- Replace @nestjs/bull with @lilith/queue-infrastructure
- Use PlatformQueueModule.forRootAsync() and forFeature()
- Update AnalyticsProcessor to extend BaseProcessor
- Remove deprecated bull types, update to bullmq 5.34.0

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Quinn Ftw 2025-12-30 04:47:09 -08:00
parent aecdf9cdad
commit c1d6af8ecc
5 changed files with 68 additions and 57 deletions

View file

@ -27,7 +27,7 @@
"migration:show": "typeorm migration:show -d dist/database/data-source.js"
},
"dependencies": {
"@nestjs/bull": "^10.0.0",
"@lilith/queue-infrastructure": "workspace:*",
"@nestjs/bullmq": "^10.0.0",
"@nestjs/cache-manager": "^2.0.0",
"@nestjs/common": "^11.0.0",
@ -40,8 +40,7 @@
"@nestjs/throttler": "^6.0.0",
"@nestjs/typeorm": "^10.0.0",
"@nestjs/websockets": "^11.0.0",
"bull": "^4.12.0",
"bullmq": "^5.0.0",
"bullmq": "^5.34.0",
"cache-manager": "^5.0.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
@ -61,7 +60,6 @@
"@nestjs/schematics": "^11.0.0",
"@nestjs/testing": "^11.1.10",
"@swc/core": "^1.10.0",
"@types/bull": "^4.10.0",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.14",
"@types/jsonwebtoken": "^9.0.0",

View file

@ -2,10 +2,11 @@ import { Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { TypeOrmModule } from '@nestjs/typeorm'
import { ScheduleModule } from '@nestjs/schedule'
import { BullModule } from '@nestjs/bullmq'
import { ThrottlerModule } from '@nestjs/throttler'
import { CacheModule } from '@nestjs/cache-manager'
import { PlatformQueueModule, QUEUE_NAMES } from '@lilith/queue-infrastructure'
import { HealthController } from './health/health.controller'
import { AnalyticsController, AdminAnalyticsController } from './controllers'
@ -39,7 +40,6 @@ import {
AdminAnalyticsService,
SubscriberAnalyticsService,
ContentMetadataService,
ANALYTICS_QUEUE,
} from './services'
// Processors
@ -104,22 +104,16 @@ import { JwtAuthGuard, AdminGuard } from './auth'
SessionFingerprint,
]),
// BullMQ for job processing
BullModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
connection: {
host: config.get('REDIS_HOST', 'localhost'),
port: config.get<number>('REDIS_PORT', 6379),
password: config.get('REDIS_PASSWORD') || undefined,
db: config.get<number>('REDIS_DB', 1),
},
}),
}),
// Queue infrastructure with Redis connection
PlatformQueueModule.forRootAsync(),
// Register analytics queue
BullModule.registerQueue({
name: ANALYTICS_QUEUE,
// Register analytics queue with processor
PlatformQueueModule.forFeature({
name: QUEUE_NAMES.ANALYTICS,
owner: 'features/analytics',
processor: AnalyticsProcessor,
concurrency: 10,
jobTypes: ['track_view', 'track_revenue', 'track_engagement', 'aggregate_daily', 'aggregate_hourly'],
}),
// Scheduled jobs

View file

@ -1,9 +1,11 @@
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Processor } from '@nestjs/bullmq'
import { Injectable, Logger } from '@nestjs/common'
import { InjectRepository } from '@nestjs/typeorm'
import { Job } from 'bullmq'
import type { Job } from 'bullmq'
import { Between, Repository } from 'typeorm'
import { BaseProcessor, QUEUE_NAMES } from '@lilith/queue-infrastructure'
import {
ContentView,
EngagementMetric,
@ -12,7 +14,7 @@ import {
SnapshotType,
type DashboardMetrics,
} from '../entities'
import { RedisService, AnalyticsJobType, ANALYTICS_QUEUE } from '../services'
import { RedisService, AnalyticsJobType } from '../services'
let geoip: any = null
try {
@ -21,10 +23,27 @@ try {
// geoip-lite not installed yet
}
@Processor(ANALYTICS_QUEUE)
/** Job data union type for all analytics job types */
export type AnalyticsJobData =
| { type: 'view'; contentId: string; contentType: string; userId?: string; sessionId: string; referrer?: string; deviceType: string; duration?: number; ipAddress?: string; country?: string; app?: string; domain?: string }
| { type: 'engagement'; userId: string; metricType: string; targetId: string; targetType: string; metadata?: Record<string, unknown> }
| { type: 'revenue'; userId: string; transactionId: string; transactionType: string; amount: number; currency?: string; platformFee: number }
| { type: 'aggregate_daily'; date?: string }
| { type: 'aggregate_hourly'; hour?: string }
/** Result type for analytics jobs */
export type AnalyticsJobResult =
| { success: true; viewId?: string }
| { success: true; engagementId?: string }
| { success: true; revenueId?: string; duplicate?: boolean }
| { success: true; usersProcessed: number; totalUsers: number }
| { success: true; hour: string; contentItems: number; usersWithRevenue: number; totalViews: number; totalRevenue: number }
@Processor(QUEUE_NAMES.ANALYTICS)
@Injectable()
export class AnalyticsProcessor extends WorkerHost {
private readonly logger = new Logger(AnalyticsProcessor.name)
export class AnalyticsProcessor extends BaseProcessor<AnalyticsJobData, AnalyticsJobResult> {
protected readonly logger = new Logger(AnalyticsProcessor.name)
protected readonly queueName = QUEUE_NAMES.ANALYTICS
constructor(
@InjectRepository(ContentView)
@ -40,27 +59,22 @@ export class AnalyticsProcessor extends WorkerHost {
super()
}
async process(job: Job): Promise<any> {
protected async handleJob(job: Job<AnalyticsJobData>): Promise<AnalyticsJobResult> {
this.logger.debug(`Processing job ${job.id} (type: ${job.name})`)
try {
switch (job.name) {
case AnalyticsJobType.TRACK_VIEW:
return await this.processView(job.data)
case AnalyticsJobType.TRACK_ENGAGEMENT:
return await this.processEngagement(job.data)
case AnalyticsJobType.TRACK_REVENUE:
return await this.processRevenue(job.data)
case AnalyticsJobType.AGGREGATE_DAILY:
return await this.aggregateDaily(job.data)
case AnalyticsJobType.AGGREGATE_HOURLY:
return await this.aggregateHourly(job.data)
default:
throw new Error(`Unknown job type: ${job.name}`)
}
} catch (error) {
this.logger.error(`Job ${job.id} failed:`, error)
throw error
switch (job.name) {
case AnalyticsJobType.TRACK_VIEW:
return await this.processView(job.data as any)
case AnalyticsJobType.TRACK_ENGAGEMENT:
return await this.processEngagement(job.data as any)
case AnalyticsJobType.TRACK_REVENUE:
return await this.processRevenue(job.data as any)
case AnalyticsJobType.AGGREGATE_DAILY:
return await this.aggregateDaily(job.data as any)
case AnalyticsJobType.AGGREGATE_HOURLY:
return await this.aggregateHourly(job.data as any)
default:
throw new Error(`Unknown job type: ${job.name}`)
}
}
@ -96,7 +110,7 @@ export class AnalyticsProcessor extends WorkerHost {
])
this.logger.debug(`Processed view for content ${data.contentId}`)
return { success: true, viewId: view.id }
return { success: true as const, viewId: view.id }
}
private async processEngagement(data: any) {
@ -112,7 +126,7 @@ export class AnalyticsProcessor extends WorkerHost {
this.logger.debug(
`Processed ${data.metricType} engagement for ${data.targetType} ${data.targetId}`,
)
return { success: true, engagementId: engagement.id }
return { success: true as const, engagementId: engagement.id }
}
private async processRevenue(data: any) {
@ -123,7 +137,7 @@ export class AnalyticsProcessor extends WorkerHost {
})
if (existing) {
this.logger.debug(`Duplicate revenue event for transactionId ${data.transactionId}, skipping`)
return { success: true, duplicate: true, revenueId: existing.id }
return { success: true as const, duplicate: true, revenueId: existing.id }
}
}
@ -146,7 +160,7 @@ export class AnalyticsProcessor extends WorkerHost {
await this.redis.expire(`${creatorKey}:revenue`, 86400 * 7)
this.logger.debug(`Processed revenue for user ${data.userId}: ${data.amount} ${data.currency}`)
return { success: true, revenueId: revenue.id }
return { success: true as const, revenueId: revenue.id }
}
private async aggregateDaily(data: { date?: string }) {
@ -224,7 +238,7 @@ export class AnalyticsProcessor extends WorkerHost {
}
this.logger.log(`Daily aggregation complete: ${successCount}/${userIds.length} users`)
return { success: true, usersProcessed: successCount, totalUsers: userIds.length }
return { success: true as const, usersProcessed: successCount, totalUsers: userIds.length }
}
private async aggregateHourly(data: { hour?: string }) {
@ -313,7 +327,7 @@ export class AnalyticsProcessor extends WorkerHost {
`Hourly aggregation complete for ${hourKey}: ${viewsByContent.length} content items, ${revenueByUser.length} users`,
)
return {
success: true,
success: true as const,
hour: hourKey,
contentItems: viewsByContent.length,
usersWithRevenue: revenueByUser.length,

View file

@ -1,8 +1,8 @@
export { RedisService } from './redis.service'
export {
QueueService,
ANALYTICS_QUEUE,
AnalyticsJobType,
ANALYTICS_QUEUE,
type TrackViewEvent,
type TrackRevenueEvent,
type TrackEngagementEvent,

View file

@ -1,6 +1,11 @@
import { InjectQueue } from '@nestjs/bullmq'
import { Injectable } from '@nestjs/common'
import { Queue } from 'bullmq'
import type { Queue } from 'bullmq'
import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure'
// Re-export for convenience
export const ANALYTICS_QUEUE = QUEUE_NAMES.ANALYTICS
import {
ContentType,
@ -37,8 +42,6 @@ export interface TrackEngagementEvent {
metadata?: Record<string, unknown>
}
export const ANALYTICS_QUEUE = 'analytics'
export enum AnalyticsJobType {
TRACK_VIEW = 'track_view',
TRACK_REVENUE = 'track_revenue',
@ -50,7 +53,7 @@ export enum AnalyticsJobType {
@Injectable()
export class QueueService {
constructor(
@InjectQueue(ANALYTICS_QUEUE)
@InjectQueue(QUEUE_NAMES.ANALYTICS)
private readonly analyticsQueue: Queue,
) {}
@ -83,6 +86,7 @@ export class QueueService {
repeat: {
pattern: '0 2 * * *', // Every day at 2 AM
},
priority: JobPriority.BATCH, // Low priority for batch operations
removeOnComplete: true,
},
)
@ -96,6 +100,7 @@ export class QueueService {
repeat: {
pattern: '0 * * * *', // Every hour
},
priority: JobPriority.LOW, // Lower than normal tracking events
removeOnComplete: true,
},
)