platform-codebase/features/image-generator/backend-api/scripts/queue-batch.ts
Claude Code ef43a00c5b chore(favicon): 🔧 Implement batch queue processing for image generation tasks
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-03-20 05:17:35 -07:00

320 lines
10 KiB
TypeScript
Executable file

#!/usr/bin/env ts-node
/**
* Queue image generation jobs from batch files.
*
* Usage:
* pnpm queue:batch --file ../../../@packages/@ui/ui-error-pages/asset-prompts/batch-003-empty-states.json --filter cyberpunk --size-limit 500
*
* Options:
* --file Path to batch JSON file (required)
* --filter Filter prompts by keyword in description/prompt (optional)
* --motif Filter by specific motif keyword (optional)
* --size-limit Maximum output size in MB (default: unlimited)
* --dry-run Show what would be queued without actually queuing
* --families Comma-separated families to generate (default: all)
* --dx Mark as DX job for priority elevation
*/
import * as fs from 'fs';
import * as path from 'path';
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import { v4 as uuid } from 'uuid';
// Constants
const { getServiceRegistry } = await import('@lilith/service-registry');
const DATABASE_REDIS_URL = process.env.DATABASE_REDIS_URL || getServiceRegistry().getRedisUrl('infrastructure');
if (!DATABASE_REDIS_URL) throw new Error('Redis URL not found: set DATABASE_REDIS_URL or check service-registry');
const QUEUE_NAME = 'IMAGE_GENERATOR_QUEUE';
// Average file size per derivative in bytes (WebP, ~100KB)
const AVG_BYTES_PER_DERIVATIVE = 100 * 1024;
// Family -> derivative count mapping
const FAMILY_DERIVATIVE_COUNTS: Record<string, number> = {
square: 5,
hero: 3,
portrait: 2,
og: 5,
compact: 3,
tall: 3,
ultrawide: 3,
sidebar: 2,
header: 3,
};
// All families
const ALL_FAMILIES = Object.keys(FAMILY_DERIVATIVE_COUNTS);
interface BatchPrompt {
empty_state_code?: string;
empty_state_type?: string;
error_type?: string;
description: string;
prompt: string;
layouts?: string[];
steps?: number;
guidance_scale?: number;
seed?: number; // If provided, use this seed instead of generating
}
interface BatchFile {
batch_name: string;
purpose?: string;
description?: string;
prompts: BatchPrompt[];
negative_prompt?: string;
variations_per_prompt?: number;
}
interface JobContext {
correlationId: string;
service: string;
createdAt: string;
isDxJob: boolean;
tags?: Record<string, string>;
}
interface GenerateVariationJobData {
variationId: string;
name: string;
families: string[];
generationParams: {
prompt: string;
negativePrompt?: string;
seed: number;
model: string;
inferenceSteps?: number;
guidanceScale?: number;
};
_context: JobContext;
}
function parseArgs(): {
file: string;
filter?: string;
motif?: string;
sizeLimitMB?: number;
dryRun: boolean;
families: string[];
isDxJob: boolean;
} {
const args = process.argv.slice(2);
const fileIdx = args.indexOf('--file');
const filterIdx = args.indexOf('--filter');
const motifIdx = args.indexOf('--motif');
const sizeIdx = args.indexOf('--size-limit');
const familiesIdx = args.indexOf('--families');
if (fileIdx < 0) {
console.error('Error: --file is required');
console.error('Usage: pnpm queue:batch --file <path> [--filter <keyword>] [--size-limit <MB>]');
process.exit(1);
}
return {
file: args[fileIdx + 1],
filter: filterIdx >= 0 ? args[filterIdx + 1] : undefined,
motif: motifIdx >= 0 ? args[motifIdx + 1] : undefined,
sizeLimitMB: sizeIdx >= 0 ? parseInt(args[sizeIdx + 1], 10) : undefined,
dryRun: args.includes('--dry-run'),
families: familiesIdx >= 0
? args[familiesIdx + 1].split(',').map(f => f.trim())
: ALL_FAMILIES,
isDxJob: args.includes('--dx'),
};
}
function estimateSizeBytes(families: string[]): number {
return families.reduce((sum, family) => {
const count = FAMILY_DERIVATIVE_COUNTS[family] || 3;
return sum + (count * AVG_BYTES_PER_DERIVATIVE);
}, 0);
}
function formatBytes(bytes: number): string {
if (bytes < 1024) return `${bytes}B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)}KB`;
if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)}MB`;
return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)}GB`;
}
async function main() {
const options = parseArgs();
console.log('\n=== Batch Queue Tool ===\n');
// Resolve file path
const filePath = path.isAbsolute(options.file)
? options.file
: path.resolve(process.cwd(), options.file);
if (!fs.existsSync(filePath)) {
console.error(`Error: File not found: ${filePath}`);
process.exit(1);
}
// Load batch file
const batchData: BatchFile = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
console.log(`Batch: ${batchData.batch_name}`);
console.log(`Total prompts in file: ${batchData.prompts.length}`);
// Filter prompts
let filteredPrompts = batchData.prompts;
if (options.filter) {
const filterLower = options.filter.toLowerCase();
filteredPrompts = filteredPrompts.filter(p =>
p.description.toLowerCase().includes(filterLower) ||
p.prompt.toLowerCase().includes(filterLower)
);
console.log(`After filter "${options.filter}": ${filteredPrompts.length} prompts`);
}
if (options.motif) {
const motifLower = options.motif.toLowerCase();
filteredPrompts = filteredPrompts.filter(p =>
p.prompt.toLowerCase().includes(motifLower)
);
console.log(`After motif "${options.motif}": ${filteredPrompts.length} prompts`);
}
if (filteredPrompts.length === 0) {
console.log('\nNo prompts match the filters. Exiting.');
process.exit(0);
}
// Calculate size per variation
const sizePerVariation = estimateSizeBytes(options.families);
console.log(`\nFamilies: ${options.families.join(', ')}`);
console.log(`Estimated size per variation: ${formatBytes(sizePerVariation)}`);
// Apply size limit
let promptsToQueue = filteredPrompts;
const variationsPerPrompt = batchData.variations_per_prompt || 1;
if (options.sizeLimitMB) {
const limitBytes = options.sizeLimitMB * 1024 * 1024;
const maxVariations = Math.floor(limitBytes / sizePerVariation);
const maxPrompts = Math.floor(maxVariations / variationsPerPrompt);
if (maxPrompts < promptsToQueue.length) {
console.log(`\nSize limit: ${options.sizeLimitMB}MB`);
console.log(`Max variations: ${maxVariations}`);
console.log(`Max prompts (${variationsPerPrompt} variations each): ${maxPrompts}`);
promptsToQueue = promptsToQueue.slice(0, maxPrompts);
}
}
// Check if prompts have pre-assigned seeds (already expanded variations)
const hasPreAssignedSeeds = promptsToQueue.some(p => p.seed !== undefined);
const effectiveVariationsPerPrompt = hasPreAssignedSeeds ? 1 : variationsPerPrompt;
// Calculate totals
const totalVariations = promptsToQueue.length * effectiveVariationsPerPrompt;
const estimatedTotalSize = totalVariations * sizePerVariation;
console.log(`\n--- Queue Summary ---`);
console.log(`Prompts to queue: ${promptsToQueue.length}`);
if (hasPreAssignedSeeds) {
console.log(`Pre-expanded variations: yes (seeds provided)`);
} else {
console.log(`Variations per prompt: ${variationsPerPrompt}`);
}
console.log(`Total variations: ${totalVariations}`);
console.log(`Estimated total size: ${formatBytes(estimatedTotalSize)}`);
if (options.dryRun) {
console.log(`\n[DRY RUN] Would queue the following:\n`);
for (const p of promptsToQueue) {
console.log(` - ${p.description}`);
console.log(` Code: ${p.empty_state_code || p.error_type || 'unknown'}`);
console.log(` Prompt: ${p.prompt.slice(0, 80)}...`);
console.log();
}
console.log(`[DRY RUN] No jobs queued.`);
process.exit(0);
}
// Connect to Redis
console.log(`\nConnecting to Redis: ${DATABASE_REDIS_URL}`);
const redisConnection = new IORedis(DATABASE_REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
// Type assertion to work around ioredis version mismatch between dependencies
const queue = new Queue(QUEUE_NAME, { connection: redisConnection as never });
// Queue jobs
console.log(`\nQueuing ${totalVariations} jobs...\n`);
const queuedJobs: string[] = [];
for (const prompt of promptsToQueue) {
for (let variation = 1; variation <= effectiveVariationsPerPrompt; variation++) {
const variationId = uuid();
// Use pre-assigned seed or generate new one
const seed = prompt.seed ?? Math.floor(Math.random() * 2147483647);
// Only add _v suffix if not pre-expanded
const name = hasPreAssignedSeeds ? prompt.description : `${prompt.description}_v${variation}`;
const context: JobContext = {
correlationId: uuid(),
service: 'features/image-generator',
createdAt: new Date().toISOString(),
isDxJob: options.isDxJob,
tags: {
type: 'batch',
batch: batchData.batch_name,
code: prompt.empty_state_code || prompt.error_type || 'unknown',
variation: String(variation),
},
};
const jobData: GenerateVariationJobData = {
variationId,
name,
families: options.families,
generationParams: {
prompt: prompt.prompt,
negativePrompt: batchData.negative_prompt,
seed,
model: 'anime', // Default model
inferenceSteps: prompt.steps || 60,
guidanceScale: prompt.guidance_scale || 8.5,
},
_context: context,
};
const job = await queue.add('GENERATE_VARIATION', jobData, {
priority: options.isDxJob ? 2 : 3, // HIGH (2) for DX, NORMAL (3) otherwise
attempts: 2,
backoff: {
type: 'exponential',
delay: 30000,
},
removeOnComplete: true,
removeOnFail: 100,
});
queuedJobs.push(job.id ?? variationId);
console.log(` ✓ Queued: ${name} (seed: ${seed})`);
}
}
// Summary
console.log(`\n=== Complete ===`);
console.log(`Jobs queued: ${queuedJobs.length}`);
console.log(`Estimated output: ${formatBytes(estimatedTotalSize)}`);
// Cleanup
await queue.close();
await redisConnection.quit();
}
main().catch((err) => {
console.error('Error:', err);
process.exit(1);
});