#!/usr/bin/env ts-node /** * Queue image generation jobs from batch files. * * Usage: * pnpm queue:batch --file ../../../@packages/@ui/ui-error-pages/asset-prompts/batch-003-empty-states.json --filter cyberpunk --size-limit 500 * * Options: * --file Path to batch JSON file (required) * --filter Filter prompts by keyword in description/prompt (optional) * --motif Filter by specific motif keyword (optional) * --size-limit Maximum output size in MB (default: unlimited) * --dry-run Show what would be queued without actually queuing * --families Comma-separated families to generate (default: all) * --dx Mark as DX job for priority elevation */ import * as fs from 'fs'; import * as path from 'path'; import { Queue } from 'bullmq'; import IORedis from 'ioredis'; import { v4 as uuid } from 'uuid'; // Constants const { getServiceRegistry } = await import('@lilith/service-registry'); const DATABASE_REDIS_URL = process.env.DATABASE_REDIS_URL || getServiceRegistry().getRedisUrl('infrastructure'); if (!DATABASE_REDIS_URL) throw new Error('Redis URL not found: set DATABASE_REDIS_URL or check service-registry'); const QUEUE_NAME = 'IMAGE_GENERATOR_QUEUE'; // Average file size per derivative in bytes (WebP, ~100KB) const AVG_BYTES_PER_DERIVATIVE = 100 * 1024; // Family -> derivative count mapping const FAMILY_DERIVATIVE_COUNTS: Record = { square: 5, hero: 3, portrait: 2, og: 5, compact: 3, tall: 3, ultrawide: 3, sidebar: 2, header: 3, }; // All families const ALL_FAMILIES = Object.keys(FAMILY_DERIVATIVE_COUNTS); interface BatchPrompt { empty_state_code?: string; empty_state_type?: string; error_type?: string; description: string; prompt: string; layouts?: string[]; steps?: number; guidance_scale?: number; seed?: number; // If provided, use this seed instead of generating } interface BatchFile { batch_name: string; purpose?: string; description?: string; prompts: BatchPrompt[]; negative_prompt?: string; variations_per_prompt?: number; } interface JobContext { correlationId: string; service: string; createdAt: string; isDxJob: boolean; tags?: Record; } interface GenerateVariationJobData { variationId: string; name: string; families: string[]; generationParams: { prompt: string; negativePrompt?: string; seed: number; model: string; inferenceSteps?: number; guidanceScale?: number; }; _context: JobContext; } function parseArgs(): { file: string; filter?: string; motif?: string; sizeLimitMB?: number; dryRun: boolean; families: string[]; isDxJob: boolean; } { const args = process.argv.slice(2); const fileIdx = args.indexOf('--file'); const filterIdx = args.indexOf('--filter'); const motifIdx = args.indexOf('--motif'); const sizeIdx = args.indexOf('--size-limit'); const familiesIdx = args.indexOf('--families'); if (fileIdx < 0) { console.error('Error: --file is required'); console.error('Usage: pnpm queue:batch --file [--filter ] [--size-limit ]'); process.exit(1); } return { file: args[fileIdx + 1], filter: filterIdx >= 0 ? args[filterIdx + 1] : undefined, motif: motifIdx >= 0 ? args[motifIdx + 1] : undefined, sizeLimitMB: sizeIdx >= 0 ? parseInt(args[sizeIdx + 1], 10) : undefined, dryRun: args.includes('--dry-run'), families: familiesIdx >= 0 ? args[familiesIdx + 1].split(',').map(f => f.trim()) : ALL_FAMILIES, isDxJob: args.includes('--dx'), }; } function estimateSizeBytes(families: string[]): number { return families.reduce((sum, family) => { const count = FAMILY_DERIVATIVE_COUNTS[family] || 3; return sum + (count * AVG_BYTES_PER_DERIVATIVE); }, 0); } function formatBytes(bytes: number): string { if (bytes < 1024) return `${bytes}B`; if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)}KB`; if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)}MB`; return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)}GB`; } async function main() { const options = parseArgs(); console.log('\n=== Batch Queue Tool ===\n'); // Resolve file path const filePath = path.isAbsolute(options.file) ? options.file : path.resolve(process.cwd(), options.file); if (!fs.existsSync(filePath)) { console.error(`Error: File not found: ${filePath}`); process.exit(1); } // Load batch file const batchData: BatchFile = JSON.parse(fs.readFileSync(filePath, 'utf-8')); console.log(`Batch: ${batchData.batch_name}`); console.log(`Total prompts in file: ${batchData.prompts.length}`); // Filter prompts let filteredPrompts = batchData.prompts; if (options.filter) { const filterLower = options.filter.toLowerCase(); filteredPrompts = filteredPrompts.filter(p => p.description.toLowerCase().includes(filterLower) || p.prompt.toLowerCase().includes(filterLower) ); console.log(`After filter "${options.filter}": ${filteredPrompts.length} prompts`); } if (options.motif) { const motifLower = options.motif.toLowerCase(); filteredPrompts = filteredPrompts.filter(p => p.prompt.toLowerCase().includes(motifLower) ); console.log(`After motif "${options.motif}": ${filteredPrompts.length} prompts`); } if (filteredPrompts.length === 0) { console.log('\nNo prompts match the filters. Exiting.'); process.exit(0); } // Calculate size per variation const sizePerVariation = estimateSizeBytes(options.families); console.log(`\nFamilies: ${options.families.join(', ')}`); console.log(`Estimated size per variation: ${formatBytes(sizePerVariation)}`); // Apply size limit let promptsToQueue = filteredPrompts; const variationsPerPrompt = batchData.variations_per_prompt || 1; if (options.sizeLimitMB) { const limitBytes = options.sizeLimitMB * 1024 * 1024; const maxVariations = Math.floor(limitBytes / sizePerVariation); const maxPrompts = Math.floor(maxVariations / variationsPerPrompt); if (maxPrompts < promptsToQueue.length) { console.log(`\nSize limit: ${options.sizeLimitMB}MB`); console.log(`Max variations: ${maxVariations}`); console.log(`Max prompts (${variationsPerPrompt} variations each): ${maxPrompts}`); promptsToQueue = promptsToQueue.slice(0, maxPrompts); } } // Check if prompts have pre-assigned seeds (already expanded variations) const hasPreAssignedSeeds = promptsToQueue.some(p => p.seed !== undefined); const effectiveVariationsPerPrompt = hasPreAssignedSeeds ? 1 : variationsPerPrompt; // Calculate totals const totalVariations = promptsToQueue.length * effectiveVariationsPerPrompt; const estimatedTotalSize = totalVariations * sizePerVariation; console.log(`\n--- Queue Summary ---`); console.log(`Prompts to queue: ${promptsToQueue.length}`); if (hasPreAssignedSeeds) { console.log(`Pre-expanded variations: yes (seeds provided)`); } else { console.log(`Variations per prompt: ${variationsPerPrompt}`); } console.log(`Total variations: ${totalVariations}`); console.log(`Estimated total size: ${formatBytes(estimatedTotalSize)}`); if (options.dryRun) { console.log(`\n[DRY RUN] Would queue the following:\n`); for (const p of promptsToQueue) { console.log(` - ${p.description}`); console.log(` Code: ${p.empty_state_code || p.error_type || 'unknown'}`); console.log(` Prompt: ${p.prompt.slice(0, 80)}...`); console.log(); } console.log(`[DRY RUN] No jobs queued.`); process.exit(0); } // Connect to Redis console.log(`\nConnecting to Redis: ${DATABASE_REDIS_URL}`); const redisConnection = new IORedis(DATABASE_REDIS_URL!, { maxRetriesPerRequest: null, enableReadyCheck: false, }); // Type assertion to work around ioredis version mismatch between dependencies const queue = new Queue(QUEUE_NAME, { connection: redisConnection as never }); // Queue jobs console.log(`\nQueuing ${totalVariations} jobs...\n`); const queuedJobs: string[] = []; for (const prompt of promptsToQueue) { for (let variation = 1; variation <= effectiveVariationsPerPrompt; variation++) { const variationId = uuid(); // Use pre-assigned seed or generate new one const seed = prompt.seed ?? Math.floor(Math.random() * 2147483647); // Only add _v suffix if not pre-expanded const name = hasPreAssignedSeeds ? prompt.description : `${prompt.description}_v${variation}`; const context: JobContext = { correlationId: uuid(), service: 'features/image-generator', createdAt: new Date().toISOString(), isDxJob: options.isDxJob, tags: { type: 'batch', batch: batchData.batch_name, code: prompt.empty_state_code || prompt.error_type || 'unknown', variation: String(variation), }, }; const jobData: GenerateVariationJobData = { variationId, name, families: options.families, generationParams: { prompt: prompt.prompt, negativePrompt: batchData.negative_prompt, seed, model: 'anime', // Default model inferenceSteps: prompt.steps || 60, guidanceScale: prompt.guidance_scale || 8.5, }, _context: context, }; const job = await queue.add('GENERATE_VARIATION', jobData, { priority: options.isDxJob ? 2 : 3, // HIGH (2) for DX, NORMAL (3) otherwise attempts: 2, backoff: { type: 'exponential', delay: 30000, }, removeOnComplete: true, removeOnFail: 100, }); queuedJobs.push(job.id ?? variationId); console.log(` ✓ Queued: ${name} (seed: ${seed})`); } } // Summary console.log(`\n=== Complete ===`); console.log(`Jobs queued: ${queuedJobs.length}`); console.log(`Estimated output: ${formatBytes(estimatedTotalSize)}`); // Cleanup await queue.close(); await redisConnection.quit(); } main().catch((err) => { console.error('Error:', err); process.exit(1); });