- Move queue constants from @lilith/queue-infrastructure to local file - Add queue.constants.ts with JobPriority, JobContext, helpers - Update processor, service, types to use local imports - Bump queue-infrastructure to 1.0.1 Reduces external dependency coupling for queue implementation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
165 lines
4.2 KiB
TypeScript
165 lines
4.2 KiB
TypeScript
import { Injectable, Logger } from '@nestjs/common';
|
|
import { InjectQueue } from '@nestjs/bullmq';
|
|
import type { Queue } from 'bullmq';
|
|
import type { FamilyName } from '@lilith/image-generator-types';
|
|
|
|
import {
|
|
IMAGE_GENERATOR_QUEUE,
|
|
JobPriority,
|
|
createJobContext,
|
|
resolvePriority,
|
|
} from './queue.constants';
|
|
|
|
import {
|
|
ImageJobType,
|
|
type GenerateVariationJobData,
|
|
type RegenerateVariationJobData,
|
|
} from './image-queue.types';
|
|
|
|
export interface QueueVariationOptions {
|
|
variationId: string;
|
|
name: string;
|
|
families: FamilyName[];
|
|
generationParams: {
|
|
prompt: string;
|
|
negativePrompt?: string;
|
|
seed: number;
|
|
model: string;
|
|
inferenceSteps?: number;
|
|
guidanceScale?: number;
|
|
};
|
|
/** Mark as DX job for priority elevation during development */
|
|
isDxJob?: boolean;
|
|
}
|
|
|
|
@Injectable()
|
|
export class ImageQueueService {
|
|
private readonly logger = new Logger(ImageQueueService.name);
|
|
|
|
constructor(
|
|
@InjectQueue(IMAGE_GENERATOR_QUEUE)
|
|
private readonly imageQueue: Queue,
|
|
) {}
|
|
|
|
/**
|
|
* Queue a new variation for generation
|
|
* @param options - Variation generation options
|
|
* @returns Job ID
|
|
*/
|
|
async queueVariationGeneration(options: QueueVariationOptions): Promise<string> {
|
|
const context = createJobContext({
|
|
service: 'features/image-generator',
|
|
isDxJob: options.isDxJob,
|
|
tags: {
|
|
type: 'variation',
|
|
name: options.name,
|
|
families: options.families.join(','),
|
|
},
|
|
});
|
|
|
|
const jobData: GenerateVariationJobData = {
|
|
variationId: options.variationId,
|
|
name: options.name,
|
|
families: options.families,
|
|
generationParams: options.generationParams,
|
|
_context: context,
|
|
};
|
|
|
|
// ML jobs should use NORMAL priority by default (respects peak hours)
|
|
// DX jobs get HIGH priority (bypasses peak hours)
|
|
const priority = resolvePriority(JobPriority.NORMAL, options.isDxJob);
|
|
|
|
const job = await this.imageQueue.add(
|
|
ImageJobType.GENERATE_VARIATION,
|
|
jobData,
|
|
{
|
|
priority,
|
|
attempts: 2, // Image generation is expensive, limit retries
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 30000, // 30 second initial backoff
|
|
},
|
|
removeOnComplete: true,
|
|
removeOnFail: 100,
|
|
},
|
|
);
|
|
|
|
this.logger.log(
|
|
`Queued variation generation: ${options.name} (${options.families.length} families, priority: ${priority})`,
|
|
);
|
|
|
|
return job.id ?? options.variationId;
|
|
}
|
|
|
|
/**
|
|
* Queue regeneration of an existing variation
|
|
* @param variationId - ID of variation to regenerate
|
|
* @param isDxJob - Mark as DX job for priority elevation
|
|
* @returns Job ID
|
|
*/
|
|
async queueVariationRegeneration(
|
|
variationId: string,
|
|
isDxJob?: boolean,
|
|
): Promise<string> {
|
|
const context = createJobContext({
|
|
service: 'features/image-generator',
|
|
isDxJob,
|
|
tags: { type: 'regenerate', variationId },
|
|
});
|
|
|
|
const jobData: RegenerateVariationJobData = {
|
|
variationId,
|
|
_context: context,
|
|
};
|
|
|
|
const priority = resolvePriority(JobPriority.NORMAL, isDxJob);
|
|
|
|
const job = await this.imageQueue.add(
|
|
ImageJobType.REGENERATE_VARIATION,
|
|
jobData,
|
|
{
|
|
priority,
|
|
attempts: 2,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 30000,
|
|
},
|
|
removeOnComplete: true,
|
|
removeOnFail: 100,
|
|
},
|
|
);
|
|
|
|
this.logger.log(`Queued variation regeneration: ${variationId}`);
|
|
|
|
return job.id ?? variationId;
|
|
}
|
|
|
|
/**
|
|
* Get queue statistics
|
|
*/
|
|
async getQueueStats(): Promise<{
|
|
waiting: number;
|
|
active: number;
|
|
completed: number;
|
|
failed: number;
|
|
delayed: number;
|
|
}> {
|
|
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
|
this.imageQueue.getWaitingCount(),
|
|
this.imageQueue.getActiveCount(),
|
|
this.imageQueue.getCompletedCount(),
|
|
this.imageQueue.getFailedCount(),
|
|
this.imageQueue.getDelayedCount(),
|
|
]);
|
|
|
|
return { waiting, active, completed, failed, delayed };
|
|
}
|
|
|
|
/**
|
|
* Check if any jobs are currently active
|
|
*/
|
|
async hasActiveJobs(): Promise<boolean> {
|
|
const active = await this.imageQueue.getActiveCount();
|
|
return active > 0;
|
|
}
|
|
}
|