From c1d6af8ecc4dfd11b57fbac8edf3d86af545f501 Mon Sep 17 00:00:00 2001 From: Quinn Ftw Date: Tue, 30 Dec 2025 04:47:09 -0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Migrate=20analytics=20back?= =?UTF-8?q?end=20to=20PlatformQueueModule?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace @nestjs/bull with @lilith/queue-infrastructure - Use PlatformQueueModule.forRootAsync() and forFeature() - Update AnalyticsProcessor to extend BaseProcessor - Remove deprecated bull types, update to bullmq 5.34.0 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- features/analytics/backend/package.json | 6 +- features/analytics/backend/src/app.module.ts | 28 +++---- .../src/processors/analytics.processor.ts | 76 +++++++++++-------- .../analytics/backend/src/services/index.ts | 2 +- .../backend/src/services/queue.service.ts | 13 +++- 5 files changed, 68 insertions(+), 57 deletions(-) diff --git a/features/analytics/backend/package.json b/features/analytics/backend/package.json index 642ed9077..e5982fe03 100644 --- a/features/analytics/backend/package.json +++ b/features/analytics/backend/package.json @@ -27,7 +27,7 @@ "migration:show": "typeorm migration:show -d dist/database/data-source.js" }, "dependencies": { - "@nestjs/bull": "^10.0.0", + "@lilith/queue-infrastructure": "workspace:*", "@nestjs/bullmq": "^10.0.0", "@nestjs/cache-manager": "^2.0.0", "@nestjs/common": "^11.0.0", @@ -40,8 +40,7 @@ "@nestjs/throttler": "^6.0.0", "@nestjs/typeorm": "^10.0.0", "@nestjs/websockets": "^11.0.0", - "bull": "^4.12.0", - "bullmq": "^5.0.0", + "bullmq": "^5.34.0", "cache-manager": "^5.0.0", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", @@ -61,7 +60,6 @@ "@nestjs/schematics": "^11.0.0", "@nestjs/testing": "^11.1.10", "@swc/core": "^1.10.0", - "@types/bull": "^4.10.0", "@types/express": "^4.17.17", "@types/jest": "^29.5.14", "@types/jsonwebtoken": "^9.0.0", diff --git a/features/analytics/backend/src/app.module.ts b/features/analytics/backend/src/app.module.ts index 09d7a5318..3ff3291d9 100644 --- a/features/analytics/backend/src/app.module.ts +++ b/features/analytics/backend/src/app.module.ts @@ -2,10 +2,11 @@ import { Module } from '@nestjs/common' import { ConfigModule, ConfigService } from '@nestjs/config' import { TypeOrmModule } from '@nestjs/typeorm' import { ScheduleModule } from '@nestjs/schedule' -import { BullModule } from '@nestjs/bullmq' import { ThrottlerModule } from '@nestjs/throttler' import { CacheModule } from '@nestjs/cache-manager' +import { PlatformQueueModule, QUEUE_NAMES } from '@lilith/queue-infrastructure' + import { HealthController } from './health/health.controller' import { AnalyticsController, AdminAnalyticsController } from './controllers' @@ -39,7 +40,6 @@ import { AdminAnalyticsService, SubscriberAnalyticsService, ContentMetadataService, - ANALYTICS_QUEUE, } from './services' // Processors @@ -104,22 +104,16 @@ import { JwtAuthGuard, AdminGuard } from './auth' SessionFingerprint, ]), - // BullMQ for job processing - BullModule.forRootAsync({ - inject: [ConfigService], - useFactory: (config: ConfigService) => ({ - connection: { - host: config.get('REDIS_HOST', 'localhost'), - port: config.get('REDIS_PORT', 6379), - password: config.get('REDIS_PASSWORD') || undefined, - db: config.get('REDIS_DB', 1), - }, - }), - }), + // Queue infrastructure with Redis connection + PlatformQueueModule.forRootAsync(), - // Register analytics queue - BullModule.registerQueue({ - name: ANALYTICS_QUEUE, + // Register analytics queue with processor + PlatformQueueModule.forFeature({ + name: QUEUE_NAMES.ANALYTICS, + owner: 'features/analytics', + processor: AnalyticsProcessor, + concurrency: 10, + jobTypes: ['track_view', 'track_revenue', 'track_engagement', 'aggregate_daily', 'aggregate_hourly'], }), // Scheduled jobs diff --git a/features/analytics/backend/src/processors/analytics.processor.ts b/features/analytics/backend/src/processors/analytics.processor.ts index 703c7ce44..6e7480358 100644 --- a/features/analytics/backend/src/processors/analytics.processor.ts +++ b/features/analytics/backend/src/processors/analytics.processor.ts @@ -1,9 +1,11 @@ -import { Processor, WorkerHost } from '@nestjs/bullmq' +import { Processor } from '@nestjs/bullmq' import { Injectable, Logger } from '@nestjs/common' import { InjectRepository } from '@nestjs/typeorm' -import { Job } from 'bullmq' +import type { Job } from 'bullmq' import { Between, Repository } from 'typeorm' +import { BaseProcessor, QUEUE_NAMES } from '@lilith/queue-infrastructure' + import { ContentView, EngagementMetric, @@ -12,7 +14,7 @@ import { SnapshotType, type DashboardMetrics, } from '../entities' -import { RedisService, AnalyticsJobType, ANALYTICS_QUEUE } from '../services' +import { RedisService, AnalyticsJobType } from '../services' let geoip: any = null try { @@ -21,10 +23,27 @@ try { // geoip-lite not installed yet } -@Processor(ANALYTICS_QUEUE) +/** Job data union type for all analytics job types */ +export type AnalyticsJobData = + | { type: 'view'; contentId: string; contentType: string; userId?: string; sessionId: string; referrer?: string; deviceType: string; duration?: number; ipAddress?: string; country?: string; app?: string; domain?: string } + | { type: 'engagement'; userId: string; metricType: string; targetId: string; targetType: string; metadata?: Record } + | { type: 'revenue'; userId: string; transactionId: string; transactionType: string; amount: number; currency?: string; platformFee: number } + | { type: 'aggregate_daily'; date?: string } + | { type: 'aggregate_hourly'; hour?: string } + +/** Result type for analytics jobs */ +export type AnalyticsJobResult = + | { success: true; viewId?: string } + | { success: true; engagementId?: string } + | { success: true; revenueId?: string; duplicate?: boolean } + | { success: true; usersProcessed: number; totalUsers: number } + | { success: true; hour: string; contentItems: number; usersWithRevenue: number; totalViews: number; totalRevenue: number } + +@Processor(QUEUE_NAMES.ANALYTICS) @Injectable() -export class AnalyticsProcessor extends WorkerHost { - private readonly logger = new Logger(AnalyticsProcessor.name) +export class AnalyticsProcessor extends BaseProcessor { + protected readonly logger = new Logger(AnalyticsProcessor.name) + protected readonly queueName = QUEUE_NAMES.ANALYTICS constructor( @InjectRepository(ContentView) @@ -40,27 +59,22 @@ export class AnalyticsProcessor extends WorkerHost { super() } - async process(job: Job): Promise { + protected async handleJob(job: Job): Promise { this.logger.debug(`Processing job ${job.id} (type: ${job.name})`) - try { - switch (job.name) { - case AnalyticsJobType.TRACK_VIEW: - return await this.processView(job.data) - case AnalyticsJobType.TRACK_ENGAGEMENT: - return await this.processEngagement(job.data) - case AnalyticsJobType.TRACK_REVENUE: - return await this.processRevenue(job.data) - case AnalyticsJobType.AGGREGATE_DAILY: - return await this.aggregateDaily(job.data) - case AnalyticsJobType.AGGREGATE_HOURLY: - return await this.aggregateHourly(job.data) - default: - throw new Error(`Unknown job type: ${job.name}`) - } - } catch (error) { - this.logger.error(`Job ${job.id} failed:`, error) - throw error + switch (job.name) { + case AnalyticsJobType.TRACK_VIEW: + return await this.processView(job.data as any) + case AnalyticsJobType.TRACK_ENGAGEMENT: + return await this.processEngagement(job.data as any) + case AnalyticsJobType.TRACK_REVENUE: + return await this.processRevenue(job.data as any) + case AnalyticsJobType.AGGREGATE_DAILY: + return await this.aggregateDaily(job.data as any) + case AnalyticsJobType.AGGREGATE_HOURLY: + return await this.aggregateHourly(job.data as any) + default: + throw new Error(`Unknown job type: ${job.name}`) } } @@ -96,7 +110,7 @@ export class AnalyticsProcessor extends WorkerHost { ]) this.logger.debug(`Processed view for content ${data.contentId}`) - return { success: true, viewId: view.id } + return { success: true as const, viewId: view.id } } private async processEngagement(data: any) { @@ -112,7 +126,7 @@ export class AnalyticsProcessor extends WorkerHost { this.logger.debug( `Processed ${data.metricType} engagement for ${data.targetType} ${data.targetId}`, ) - return { success: true, engagementId: engagement.id } + return { success: true as const, engagementId: engagement.id } } private async processRevenue(data: any) { @@ -123,7 +137,7 @@ export class AnalyticsProcessor extends WorkerHost { }) if (existing) { this.logger.debug(`Duplicate revenue event for transactionId ${data.transactionId}, skipping`) - return { success: true, duplicate: true, revenueId: existing.id } + return { success: true as const, duplicate: true, revenueId: existing.id } } } @@ -146,7 +160,7 @@ export class AnalyticsProcessor extends WorkerHost { await this.redis.expire(`${creatorKey}:revenue`, 86400 * 7) this.logger.debug(`Processed revenue for user ${data.userId}: ${data.amount} ${data.currency}`) - return { success: true, revenueId: revenue.id } + return { success: true as const, revenueId: revenue.id } } private async aggregateDaily(data: { date?: string }) { @@ -224,7 +238,7 @@ export class AnalyticsProcessor extends WorkerHost { } this.logger.log(`Daily aggregation complete: ${successCount}/${userIds.length} users`) - return { success: true, usersProcessed: successCount, totalUsers: userIds.length } + return { success: true as const, usersProcessed: successCount, totalUsers: userIds.length } } private async aggregateHourly(data: { hour?: string }) { @@ -313,7 +327,7 @@ export class AnalyticsProcessor extends WorkerHost { `Hourly aggregation complete for ${hourKey}: ${viewsByContent.length} content items, ${revenueByUser.length} users`, ) return { - success: true, + success: true as const, hour: hourKey, contentItems: viewsByContent.length, usersWithRevenue: revenueByUser.length, diff --git a/features/analytics/backend/src/services/index.ts b/features/analytics/backend/src/services/index.ts index 65e7961e9..00c3f98f4 100644 --- a/features/analytics/backend/src/services/index.ts +++ b/features/analytics/backend/src/services/index.ts @@ -1,8 +1,8 @@ export { RedisService } from './redis.service' export { QueueService, - ANALYTICS_QUEUE, AnalyticsJobType, + ANALYTICS_QUEUE, type TrackViewEvent, type TrackRevenueEvent, type TrackEngagementEvent, diff --git a/features/analytics/backend/src/services/queue.service.ts b/features/analytics/backend/src/services/queue.service.ts index d1649e2e9..903397317 100644 --- a/features/analytics/backend/src/services/queue.service.ts +++ b/features/analytics/backend/src/services/queue.service.ts @@ -1,6 +1,11 @@ import { InjectQueue } from '@nestjs/bullmq' import { Injectable } from '@nestjs/common' -import { Queue } from 'bullmq' +import type { Queue } from 'bullmq' + +import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure' + +// Re-export for convenience +export const ANALYTICS_QUEUE = QUEUE_NAMES.ANALYTICS import { ContentType, @@ -37,8 +42,6 @@ export interface TrackEngagementEvent { metadata?: Record } -export const ANALYTICS_QUEUE = 'analytics' - export enum AnalyticsJobType { TRACK_VIEW = 'track_view', TRACK_REVENUE = 'track_revenue', @@ -50,7 +53,7 @@ export enum AnalyticsJobType { @Injectable() export class QueueService { constructor( - @InjectQueue(ANALYTICS_QUEUE) + @InjectQueue(QUEUE_NAMES.ANALYTICS) private readonly analyticsQueue: Queue, ) {} @@ -83,6 +86,7 @@ export class QueueService { repeat: { pattern: '0 2 * * *', // Every day at 2 AM }, + priority: JobPriority.BATCH, // Low priority for batch operations removeOnComplete: true, }, ) @@ -96,6 +100,7 @@ export class QueueService { repeat: { pattern: '0 * * * *', // Every hour }, + priority: JobPriority.LOW, // Lower than normal tracking events removeOnComplete: true, }, )