Integrate job context enrichment into queue services

- Email: Add isDxJob option, job context with service attribution
- Analytics: Add isDxJob option, context for view/revenue/engagement events
- Conversation: Add isDxJob option, context with tags for sync operations

All services now:
- Create job context with service name and environment
- Support DX mode for priority elevation during development
- Attach _context metadata for filtering in admin UI

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Lilith 2026-01-01 16:58:31 -08:00
parent af5057e9ae
commit cb3bf61fea
3 changed files with 155 additions and 28 deletions

View file

@ -2,7 +2,13 @@ import { InjectQueue } from '@nestjs/bullmq'
import { Injectable } from '@nestjs/common'
import type { Queue } from 'bullmq'
import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure'
import {
QUEUE_NAMES,
JobPriority,
createJobContext,
resolvePriority,
type EnrichedJobData,
} from '@lilith/queue-infrastructure'
// Re-export for convenience
export const ANALYTICS_QUEUE = QUEUE_NAMES.ANALYTICS
@ -23,6 +29,8 @@ export interface TrackViewEvent {
referrer?: string
deviceType: DeviceType
duration?: number
/** Mark as DX job for priority elevation during development */
isDxJob?: boolean
}
export interface TrackRevenueEvent {
@ -32,6 +40,8 @@ export interface TrackRevenueEvent {
amount: number
currency?: string
platformFee: number
/** Mark as DX job for priority elevation during development */
isDxJob?: boolean
}
export interface TrackEngagementEvent {
@ -40,6 +50,8 @@ export interface TrackEngagementEvent {
targetId: string
targetType: TargetType
metadata?: Record<string, unknown>
/** Mark as DX job for priority elevation during development */
isDxJob?: boolean
}
export enum AnalyticsJobType {
@ -58,30 +70,68 @@ export class QueueService {
) {}
async addViewEvent(data: TrackViewEvent): Promise<void> {
await this.analyticsQueue.add(AnalyticsJobType.TRACK_VIEW, data, {
removeOnComplete: true,
removeOnFail: 1000,
const context = createJobContext({
service: 'features/analytics',
isDxJob: data.isDxJob,
userId: data.userId,
})
await this.analyticsQueue.add(
AnalyticsJobType.TRACK_VIEW,
{ ...data, _context: context },
{
removeOnComplete: true,
removeOnFail: 1000,
priority: resolvePriority(JobPriority.NORMAL, data.isDxJob),
},
)
}
async addRevenueEvent(data: TrackRevenueEvent): Promise<void> {
await this.analyticsQueue.add(AnalyticsJobType.TRACK_REVENUE, data, {
removeOnComplete: true,
removeOnFail: 1000,
const context = createJobContext({
service: 'features/analytics',
isDxJob: data.isDxJob,
userId: data.userId,
})
await this.analyticsQueue.add(
AnalyticsJobType.TRACK_REVENUE,
{ ...data, _context: context },
{
removeOnComplete: true,
removeOnFail: 1000,
priority: resolvePriority(JobPriority.NORMAL, data.isDxJob),
},
)
}
async addEngagementEvent(data: TrackEngagementEvent): Promise<void> {
await this.analyticsQueue.add(AnalyticsJobType.TRACK_ENGAGEMENT, data, {
removeOnComplete: true,
removeOnFail: 1000,
const context = createJobContext({
service: 'features/analytics',
isDxJob: data.isDxJob,
userId: data.userId,
})
await this.analyticsQueue.add(
AnalyticsJobType.TRACK_ENGAGEMENT,
{ ...data, _context: context },
{
removeOnComplete: true,
removeOnFail: 1000,
priority: resolvePriority(JobPriority.NORMAL, data.isDxJob),
},
)
}
async scheduleDailyAggregation(): Promise<void> {
const context = createJobContext({
service: 'features/analytics',
tags: { type: 'aggregation', frequency: 'daily' },
})
await this.analyticsQueue.add(
AnalyticsJobType.AGGREGATE_DAILY,
{},
{ _context: context },
{
repeat: {
pattern: '0 2 * * *', // Every day at 2 AM
@ -93,9 +143,14 @@ export class QueueService {
}
async scheduleHourlyAggregation(): Promise<void> {
const context = createJobContext({
service: 'features/analytics',
tags: { type: 'aggregation', frequency: 'hourly' },
})
await this.analyticsQueue.add(
AnalyticsJobType.AGGREGATE_HOURLY,
{},
{ _context: context },
{
repeat: {
pattern: '0 * * * *', // Every hour

View file

@ -2,7 +2,13 @@ import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import type { Queue } from 'bullmq';
import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure';
import {
QUEUE_NAMES,
JobPriority,
createJobContext,
resolvePriority,
type EnrichedJobData,
} from '@lilith/queue-infrastructure';
import type { SyncContactDto, SyncMessagesDto } from '../sync/sync.dto';
import {
@ -24,15 +30,26 @@ export class ConversationQueueService {
/**
* Queue a single message for processing
* @param messageId - The message ID to process
* @param isDxJob - Mark as DX job for priority elevation during development
*/
async queueMessageProcessing(messageId: string): Promise<string> {
const jobData: ProcessMessageJobData = { messageId, createdAt: Date.now() };
async queueMessageProcessing(messageId: string, isDxJob?: boolean): Promise<string> {
const context = createJobContext({
service: 'features/conversation-assistant',
isDxJob,
});
const jobData: ProcessMessageJobData & EnrichedJobData = {
messageId,
createdAt: Date.now(),
_context: context,
};
const job = await this.conversationQueue.add(
ConversationJobType.PROCESS_MESSAGE,
jobData,
{
priority: JobPriority.NORMAL,
priority: resolvePriority(JobPriority.NORMAL, isDxJob),
removeOnComplete: true,
removeOnFail: 1000,
},
@ -44,15 +61,27 @@ export class ConversationQueueService {
/**
* Queue batch processing of unprocessed messages
* @param limit - Maximum messages to process
* @param isDxJob - Mark as DX job for priority elevation during development
*/
async queueBatchProcessing(limit?: number): Promise<string> {
const jobData: ProcessBatchJobData = { limit, createdAt: Date.now() };
async queueBatchProcessing(limit?: number, isDxJob?: boolean): Promise<string> {
const context = createJobContext({
service: 'features/conversation-assistant',
isDxJob,
tags: { type: 'batch' },
});
const jobData: ProcessBatchJobData & EnrichedJobData = {
limit,
createdAt: Date.now(),
_context: context,
};
const job = await this.conversationQueue.add(
ConversationJobType.PROCESS_BATCH,
jobData,
{
priority: JobPriority.BATCH,
priority: resolvePriority(JobPriority.BATCH, isDxJob),
removeOnComplete: true,
removeOnFail: 1000,
},
@ -64,15 +93,27 @@ export class ConversationQueueService {
/**
* Queue contact sync
* @param contacts - Contacts to sync
* @param isDxJob - Mark as DX job for priority elevation during development
*/
async queueContactSync(contacts: SyncContactDto[]): Promise<string> {
const jobData: SyncContactsJobData = { contacts, createdAt: Date.now() };
async queueContactSync(contacts: SyncContactDto[], isDxJob?: boolean): Promise<string> {
const context = createJobContext({
service: 'features/conversation-assistant',
isDxJob,
tags: { type: 'sync', entity: 'contacts' },
});
const jobData: SyncContactsJobData & EnrichedJobData = {
contacts,
createdAt: Date.now(),
_context: context,
};
const job = await this.conversationQueue.add(
ConversationJobType.SYNC_CONTACTS,
jobData,
{
priority: JobPriority.NORMAL,
priority: resolvePriority(JobPriority.NORMAL, isDxJob),
removeOnComplete: true,
removeOnFail: 1000,
},
@ -84,9 +125,18 @@ export class ConversationQueueService {
/**
* Queue message sync for a device/conversation
* @param deviceId - Device ID
* @param dto - Sync messages data
* @param isDxJob - Mark as DX job for priority elevation during development
*/
async queueMessageSync(deviceId: string, dto: SyncMessagesDto): Promise<string> {
const jobData: SyncMessagesJobData = {
async queueMessageSync(deviceId: string, dto: SyncMessagesDto, isDxJob?: boolean): Promise<string> {
const context = createJobContext({
service: 'features/conversation-assistant',
isDxJob,
tags: { type: 'sync', entity: 'messages', deviceId },
});
const jobData: SyncMessagesJobData & EnrichedJobData = {
deviceId,
conversationImessageId: dto.conversationImessageId,
conversationDisplayName: dto.conversationDisplayName,
@ -94,13 +144,14 @@ export class ConversationQueueService {
isGroup: dto.isGroup,
messages: dto.messages,
createdAt: Date.now(),
_context: context,
};
const job = await this.conversationQueue.add(
ConversationJobType.SYNC_MESSAGES,
jobData,
{
priority: JobPriority.NORMAL,
priority: resolvePriority(JobPriority.NORMAL, isDxJob),
removeOnComplete: true,
removeOnFail: 1000,
},

View file

@ -2,7 +2,14 @@ import { Injectable, Logger } from '@nestjs/common'
import { InjectQueue } from '@nestjs/bullmq'
import type { Queue, JobsOptions } from 'bullmq'
import { QUEUE_NAMES, JobPriority } from '@lilith/queue-infrastructure'
import {
QUEUE_NAMES,
JobPriority,
createJobContext,
resolvePriority,
type JobContext,
type EnrichedJobData,
} from '@lilith/queue-infrastructure'
import { EmailCategory } from './entities/email-log.entity'
@ -23,9 +30,11 @@ export interface QueueEmailOptions {
priority?: 'high' | 'normal' | 'low'
delay?: number // Delay in milliseconds
scheduledFor?: Date
/** Mark as DX job for priority elevation during development */
isDxJob?: boolean
}
export interface QueuedEmailJob {
export interface QueuedEmailJob extends EnrichedJobData {
id: string
createdAt: number
to: string | string[]
@ -62,6 +71,13 @@ export class EmailQueueService {
const jobId = crypto.randomUUID()
// Create job context for service attribution and filtering
const context = createJobContext({
service: 'features/email',
isDxJob: options.isDxJob,
userId: options.userId,
})
const jobData: QueuedEmailJob = {
id: jobId,
createdAt: Date.now(),
@ -74,11 +90,16 @@ export class EmailQueueService {
replyTo: options.replyTo,
inReplyTo: options.inReplyTo,
references: options.references,
_context: context,
}
// Resolve priority with DX mode support
const basePriority = this.getPriorityNumber(options.priority || 'normal')
const resolvedPriority = resolvePriority(basePriority, options.isDxJob)
const jobOptions: JobsOptions = {
jobId,
priority: this.getPriorityNumber(options.priority || 'normal'),
priority: resolvedPriority,
attempts: 3,
backoff: {
type: 'exponential',