diff --git a/features/analytics/backend-api/src/services/queue.service.ts b/features/analytics/backend-api/src/services/queue.service.ts index 903397317..df00ddf06 100644 --- a/features/analytics/backend-api/src/services/queue.service.ts +++ b/features/analytics/backend-api/src/services/queue.service.ts @@ -2,7 +2,13 @@ import { InjectQueue } from '@nestjs/bullmq' import { Injectable } from '@nestjs/common' import type { Queue } from 'bullmq' -import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure' +import { + QUEUE_NAMES, + JobPriority, + createJobContext, + resolvePriority, + type EnrichedJobData, +} from '@lilith/queue-infrastructure' // Re-export for convenience export const ANALYTICS_QUEUE = QUEUE_NAMES.ANALYTICS @@ -23,6 +29,8 @@ export interface TrackViewEvent { referrer?: string deviceType: DeviceType duration?: number + /** Mark as DX job for priority elevation during development */ + isDxJob?: boolean } export interface TrackRevenueEvent { @@ -32,6 +40,8 @@ export interface TrackRevenueEvent { amount: number currency?: string platformFee: number + /** Mark as DX job for priority elevation during development */ + isDxJob?: boolean } export interface TrackEngagementEvent { @@ -40,6 +50,8 @@ export interface TrackEngagementEvent { targetId: string targetType: TargetType metadata?: Record + /** Mark as DX job for priority elevation during development */ + isDxJob?: boolean } export enum AnalyticsJobType { @@ -58,30 +70,68 @@ export class QueueService { ) {} async addViewEvent(data: TrackViewEvent): Promise { - await this.analyticsQueue.add(AnalyticsJobType.TRACK_VIEW, data, { - removeOnComplete: true, - removeOnFail: 1000, + const context = createJobContext({ + service: 'features/analytics', + isDxJob: data.isDxJob, + userId: data.userId, }) + + await this.analyticsQueue.add( + AnalyticsJobType.TRACK_VIEW, + { ...data, _context: context }, + { + removeOnComplete: true, + removeOnFail: 1000, + priority: resolvePriority(JobPriority.NORMAL, data.isDxJob), + }, + ) } async addRevenueEvent(data: TrackRevenueEvent): Promise { - await this.analyticsQueue.add(AnalyticsJobType.TRACK_REVENUE, data, { - removeOnComplete: true, - removeOnFail: 1000, + const context = createJobContext({ + service: 'features/analytics', + isDxJob: data.isDxJob, + userId: data.userId, }) + + await this.analyticsQueue.add( + AnalyticsJobType.TRACK_REVENUE, + { ...data, _context: context }, + { + removeOnComplete: true, + removeOnFail: 1000, + priority: resolvePriority(JobPriority.NORMAL, data.isDxJob), + }, + ) } async addEngagementEvent(data: TrackEngagementEvent): Promise { - await this.analyticsQueue.add(AnalyticsJobType.TRACK_ENGAGEMENT, data, { - removeOnComplete: true, - removeOnFail: 1000, + const context = createJobContext({ + service: 'features/analytics', + isDxJob: data.isDxJob, + userId: data.userId, }) + + await this.analyticsQueue.add( + AnalyticsJobType.TRACK_ENGAGEMENT, + { ...data, _context: context }, + { + removeOnComplete: true, + removeOnFail: 1000, + priority: resolvePriority(JobPriority.NORMAL, data.isDxJob), + }, + ) } async scheduleDailyAggregation(): Promise { + const context = createJobContext({ + service: 'features/analytics', + tags: { type: 'aggregation', frequency: 'daily' }, + }) + await this.analyticsQueue.add( AnalyticsJobType.AGGREGATE_DAILY, - {}, + { _context: context }, { repeat: { pattern: '0 2 * * *', // Every day at 2 AM @@ -93,9 +143,14 @@ export class QueueService { } async scheduleHourlyAggregation(): Promise { + const context = createJobContext({ + service: 'features/analytics', + tags: { type: 'aggregation', frequency: 'hourly' }, + }) + await this.analyticsQueue.add( AnalyticsJobType.AGGREGATE_HOURLY, - {}, + { _context: context }, { repeat: { pattern: '0 * * * *', // Every hour diff --git a/features/conversation-assistant/backend-api/src/modules/queue/conversation-queue.service.ts b/features/conversation-assistant/backend-api/src/modules/queue/conversation-queue.service.ts index 4bf479ca6..ee9f6d2d0 100644 --- a/features/conversation-assistant/backend-api/src/modules/queue/conversation-queue.service.ts +++ b/features/conversation-assistant/backend-api/src/modules/queue/conversation-queue.service.ts @@ -2,7 +2,13 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import type { Queue } from 'bullmq'; -import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure'; +import { + QUEUE_NAMES, + JobPriority, + createJobContext, + resolvePriority, + type EnrichedJobData, +} from '@lilith/queue-infrastructure'; import type { SyncContactDto, SyncMessagesDto } from '../sync/sync.dto'; import { @@ -24,15 +30,26 @@ export class ConversationQueueService { /** * Queue a single message for processing + * @param messageId - The message ID to process + * @param isDxJob - Mark as DX job for priority elevation during development */ - async queueMessageProcessing(messageId: string): Promise { - const jobData: ProcessMessageJobData = { messageId, createdAt: Date.now() }; + async queueMessageProcessing(messageId: string, isDxJob?: boolean): Promise { + const context = createJobContext({ + service: 'features/conversation-assistant', + isDxJob, + }); + + const jobData: ProcessMessageJobData & EnrichedJobData = { + messageId, + createdAt: Date.now(), + _context: context, + }; const job = await this.conversationQueue.add( ConversationJobType.PROCESS_MESSAGE, jobData, { - priority: JobPriority.NORMAL, + priority: resolvePriority(JobPriority.NORMAL, isDxJob), removeOnComplete: true, removeOnFail: 1000, }, @@ -44,15 +61,27 @@ export class ConversationQueueService { /** * Queue batch processing of unprocessed messages + * @param limit - Maximum messages to process + * @param isDxJob - Mark as DX job for priority elevation during development */ - async queueBatchProcessing(limit?: number): Promise { - const jobData: ProcessBatchJobData = { limit, createdAt: Date.now() }; + async queueBatchProcessing(limit?: number, isDxJob?: boolean): Promise { + const context = createJobContext({ + service: 'features/conversation-assistant', + isDxJob, + tags: { type: 'batch' }, + }); + + const jobData: ProcessBatchJobData & EnrichedJobData = { + limit, + createdAt: Date.now(), + _context: context, + }; const job = await this.conversationQueue.add( ConversationJobType.PROCESS_BATCH, jobData, { - priority: JobPriority.BATCH, + priority: resolvePriority(JobPriority.BATCH, isDxJob), removeOnComplete: true, removeOnFail: 1000, }, @@ -64,15 +93,27 @@ export class ConversationQueueService { /** * Queue contact sync + * @param contacts - Contacts to sync + * @param isDxJob - Mark as DX job for priority elevation during development */ - async queueContactSync(contacts: SyncContactDto[]): Promise { - const jobData: SyncContactsJobData = { contacts, createdAt: Date.now() }; + async queueContactSync(contacts: SyncContactDto[], isDxJob?: boolean): Promise { + const context = createJobContext({ + service: 'features/conversation-assistant', + isDxJob, + tags: { type: 'sync', entity: 'contacts' }, + }); + + const jobData: SyncContactsJobData & EnrichedJobData = { + contacts, + createdAt: Date.now(), + _context: context, + }; const job = await this.conversationQueue.add( ConversationJobType.SYNC_CONTACTS, jobData, { - priority: JobPriority.NORMAL, + priority: resolvePriority(JobPriority.NORMAL, isDxJob), removeOnComplete: true, removeOnFail: 1000, }, @@ -84,9 +125,18 @@ export class ConversationQueueService { /** * Queue message sync for a device/conversation + * @param deviceId - Device ID + * @param dto - Sync messages data + * @param isDxJob - Mark as DX job for priority elevation during development */ - async queueMessageSync(deviceId: string, dto: SyncMessagesDto): Promise { - const jobData: SyncMessagesJobData = { + async queueMessageSync(deviceId: string, dto: SyncMessagesDto, isDxJob?: boolean): Promise { + const context = createJobContext({ + service: 'features/conversation-assistant', + isDxJob, + tags: { type: 'sync', entity: 'messages', deviceId }, + }); + + const jobData: SyncMessagesJobData & EnrichedJobData = { deviceId, conversationImessageId: dto.conversationImessageId, conversationDisplayName: dto.conversationDisplayName, @@ -94,13 +144,14 @@ export class ConversationQueueService { isGroup: dto.isGroup, messages: dto.messages, createdAt: Date.now(), + _context: context, }; const job = await this.conversationQueue.add( ConversationJobType.SYNC_MESSAGES, jobData, { - priority: JobPriority.NORMAL, + priority: resolvePriority(JobPriority.NORMAL, isDxJob), removeOnComplete: true, removeOnFail: 1000, }, diff --git a/features/email/backend-api/src/core/email-queue.service.ts b/features/email/backend-api/src/core/email-queue.service.ts index 5acbaf3bd..c8faabe96 100644 --- a/features/email/backend-api/src/core/email-queue.service.ts +++ b/features/email/backend-api/src/core/email-queue.service.ts @@ -2,7 +2,14 @@ import { Injectable, Logger } from '@nestjs/common' import { InjectQueue } from '@nestjs/bullmq' import type { Queue, JobsOptions } from 'bullmq' -import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure' +import { + QUEUE_NAMES, + JobPriority, + createJobContext, + resolvePriority, + type JobContext, + type EnrichedJobData, +} from '@lilith/queue-infrastructure' import { EmailCategory } from './entities/email-log.entity' @@ -23,9 +30,11 @@ export interface QueueEmailOptions { priority?: 'high' | 'normal' | 'low' delay?: number // Delay in milliseconds scheduledFor?: Date + /** Mark as DX job for priority elevation during development */ + isDxJob?: boolean } -export interface QueuedEmailJob { +export interface QueuedEmailJob extends EnrichedJobData { id: string createdAt: number to: string | string[] @@ -62,6 +71,13 @@ export class EmailQueueService { const jobId = crypto.randomUUID() + // Create job context for service attribution and filtering + const context = createJobContext({ + service: 'features/email', + isDxJob: options.isDxJob, + userId: options.userId, + }) + const jobData: QueuedEmailJob = { id: jobId, createdAt: Date.now(), @@ -74,11 +90,16 @@ export class EmailQueueService { replyTo: options.replyTo, inReplyTo: options.inReplyTo, references: options.references, + _context: context, } + // Resolve priority with DX mode support + const basePriority = this.getPriorityNumber(options.priority || 'normal') + const resolvedPriority = resolvePriority(basePriority, options.isDxJob) + const jobOptions: JobsOptions = { jobId, - priority: this.getPriorityNumber(options.priority || 'normal'), + priority: resolvedPriority, attempts: 3, backoff: { type: 'exponential',