diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 0000000..c0be78a --- /dev/null +++ b/cli/README.md @@ -0,0 +1,235 @@ +# @lilith/queue-cli + +CLI tools for managing BullMQ queues from the command line. + +## Features + +- **Queue Status**: View job counts by state +- **Job Listing**: List jobs with filtering and limiting +- **Job Clearing**: Remove jobs by state or filter +- **Queue Control**: Pause, resume, and drain queues +- **Programmatic API**: Use as a library in your code + +## Installation + +```bash +pnpm add @lilith/queue-cli +``` + +Or globally: + +```bash +pnpm add -g @lilith/queue-cli +``` + +## CLI Commands + +### queue-status + +Display job counts for a queue: + +```bash +queue-status [options] + +Options: + -r, --redis Redis URL (default: REDIS_URL env or redis://localhost:6379) + +Examples: + queue-status email-queue + queue-status image-processing -r redis://192.168.1.100:6379 +``` + +Output: + +``` +Queue: email-queue + + waiting: 5 + active: 2 + completed: 1000 + failed: 3 + delayed: 10 + paused: 0 + ────────────── + total: 1020 +``` + +### queue-list + +List jobs in a queue: + +```bash +queue-list [options] + +Options: + -s, --state Job state (waiting, active, completed, failed, delayed, paused) + -l, --limit Maximum jobs to show (default: 20) + -f, --filter Filter by name or description + -r, --redis Redis URL + +Examples: + queue-list email-queue + queue-list email-queue -s failed -l 50 + queue-list image-processing -f "avatar" -s completed +``` + +### queue-clear + +Remove jobs from a queue: + +```bash +queue-clear [options] + +Options: + -s, --state Job state to clear (required) + -f, --filter Only clear jobs matching filter + -r, --redis Redis URL + --force Skip confirmation prompt + +Examples: + queue-clear email-queue -s failed + queue-clear image-processing -s completed -f "old-batch" + queue-clear notifications -s waiting --force +``` + +### queue-control + +Control queue state: + +```bash +queue-control [options] + +Actions: + pause - Pause the queue (stop processing new jobs) + resume - Resume a paused queue + drain - Remove all waiting jobs + status - Show if queue is paused + +Options: + -r, --redis Redis URL + +Examples: + queue-control email-queue pause + queue-control email-queue resume + queue-control image-processing drain + queue-control notifications status +``` + +## Programmatic API + +Use as a library in your code: + +```typescript +import { QueueClient } from '@lilith/queue-cli'; + +const client = new QueueClient({ + queueName: 'email-queue', + redisUrl: 'redis://localhost:6379', +}); + +// Get job counts +const counts = await client.getJobCounts(); +console.log(`Waiting: ${counts.waiting}, Failed: ${counts.failed}`); + +// List jobs +const failedJobs = await client.getJobs(['failed'], { + limit: 10, + filter: 'welcome-email', +}); + +// Clear jobs +const { cleared, errors } = await client.clearJobs(['completed'], { + filter: 'old-batch', +}); + +// Control queue +await client.pause(); +const isPaused = await client.isPaused(); +await client.resume(); + +// Drain waiting jobs +await client.drain(); + +// Cleanup +await client.close(); +``` + +## API Reference + +### QueueClient + +```typescript +class QueueClient { + constructor(options: QueueClientOptions); + + // Get job counts by state + getJobCounts(): Promise; + + // List jobs with optional filtering + getJobs( + states: JobState[], + options?: { limit?: number; filter?: string } + ): Promise; + + // Clear jobs by state with optional filter + clearJobs( + states: JobState[], + options?: { filter?: string } + ): Promise<{ cleared: number; errors: number }>; + + // Queue control + pause(): Promise; + resume(): Promise; + drain(): Promise; + isPaused(): Promise; + + // Cleanup + close(): Promise; +} +``` + +### Types + +```typescript +interface QueueClientOptions { + queueName: string; + redisUrl?: string; // Default: REDIS_URL env or redis://localhost:6379 +} + +type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' | 'paused'; + +interface JobCounts { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + paused: number; + total: number; +} + +interface JobInfo { + id: string; + name: string; + state: JobState; + timestamp: number; + data: Record; + failedReason?: string; +} +``` + +## Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `REDIS_URL` | Redis connection URL | `redis://localhost:6379` | + +## Dependencies + +- `bullmq` - Queue management +- `ioredis` - Redis client +- `commander` - CLI framework + +## License + +MIT diff --git a/cli/package.json b/cli/package.json new file mode 100644 index 0000000..a490f2c --- /dev/null +++ b/cli/package.json @@ -0,0 +1,32 @@ +{ + "name": "@lilith/queue-cli", + "version": "0.1.0", + "description": "CLI tools for managing BullMQ queues", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "bin": { + "queue-status": "dist/bin/queue-status.js", + "queue-list": "dist/bin/queue-list.js", + "queue-clear": "dist/bin/queue-clear.js", + "queue-control": "dist/bin/queue-control.js" + }, + "scripts": { + "build": "tsc", + "prepublishOnly": "pnpm build" + }, + "dependencies": { + "bullmq": "^5.34.0", + "ioredis": "^5.4.0", + "commander": "^12.0.0" + }, + "devDependencies": { + "@types/node": "^22.0.0", + "typescript": "^5.7.0" + }, + "publishConfig": { + "registry": "http://forge.nasty.sh/api/packages/lilith/npm/" + }, + "files": [ + "dist" + ] +} diff --git a/cli/src/bin/queue-clear.ts b/cli/src/bin/queue-clear.ts new file mode 100644 index 0000000..6efe013 --- /dev/null +++ b/cli/src/bin/queue-clear.ts @@ -0,0 +1,92 @@ +#!/usr/bin/env node +import { Command } from 'commander'; +import { QueueClient, JobState } from '../queue-client'; + +const program = new Command(); + +program + .name('queue-clear') + .description('Clear jobs from the queue') + .requiredOption('-q, --queue ', 'Queue name') + .option('-r, --redis ', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379') + .option('--waiting', 'Clear waiting jobs') + .option('--active', 'Clear active jobs') + .option('--completed', 'Clear completed jobs') + .option('--failed', 'Clear failed jobs') + .option('--delayed', 'Clear delayed jobs') + .option('--all', 'Clear all jobs') + .option('-f, --filter ', 'Filter by name') + .option('--dry-run', 'Preview without clearing') + .option('--force', 'Force clear active jobs') + .action(async (opts) => { + const states: JobState[] = []; + if (opts.waiting) states.push('waiting'); + if (opts.active) states.push('active'); + if (opts.completed) states.push('completed'); + if (opts.failed) states.push('failed'); + if (opts.delayed) states.push('delayed'); + if (opts.all) states.push('waiting', 'active', 'completed', 'failed', 'delayed'); + + if (states.length === 0) { + console.error('Error: Specify at least one of: --waiting, --active, --completed, --failed, --delayed, --all'); + process.exit(1); + } + + // Dedupe + const uniqueStates = [...new Set(states)]; + + const client = new QueueClient({ + queueName: opts.queue, + redisUrl: opts.redis, + }); + + try { + console.log('\n=== Queue Clear ===\n'); + console.log(`Queue: ${opts.queue}`); + console.log(`States: ${uniqueStates.join(', ')}`); + if (opts.filter) console.log(`Filter: "${opts.filter}"`); + if (opts.dryRun) console.log(`Mode: DRY RUN`); + console.log(); + + // Get jobs to see count + const jobs = await client.getJobs(uniqueStates, { filter: opts.filter, limit: 10000 }); + console.log(`Jobs to clear: ${jobs.length}`); + + if (jobs.length === 0) { + console.log('\nNo jobs to clear.'); + return; + } + + // Show sample + console.log('\nSample jobs:'); + for (const job of jobs.slice(0, 5)) { + console.log(` [${job.state}] ${job.name}`); + } + if (jobs.length > 5) { + console.log(` ... and ${jobs.length - 5} more`); + } + + if (opts.dryRun) { + console.log('\n[DRY RUN] No jobs cleared.'); + return; + } + + if (uniqueStates.includes('active') && !opts.force) { + console.log('\n⚠️ Warning: Clearing active jobs may cause issues.'); + console.log(' Use --force to confirm.'); + return; + } + + console.log('\nClearing jobs...'); + const result = await client.clearJobs(uniqueStates, { filter: opts.filter }); + + console.log(`\n✓ Cleared: ${result.cleared}`); + if (result.errors > 0) { + console.log(`✗ Errors: ${result.errors}`); + } + } finally { + await client.close(); + } + }); + +program.parse(); diff --git a/cli/src/bin/queue-control.ts b/cli/src/bin/queue-control.ts new file mode 100644 index 0000000..cf6d9a8 --- /dev/null +++ b/cli/src/bin/queue-control.ts @@ -0,0 +1,55 @@ +#!/usr/bin/env node +import { Command } from 'commander'; +import { QueueClient } from '../queue-client'; + +const program = new Command(); + +program + .name('queue-control') + .description('Control queue operations') + .requiredOption('-q, --queue ', 'Queue name') + .option('-r, --redis ', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379') + .argument('', 'Command: pause, resume, drain') + .action(async (command, opts) => { + if (!['pause', 'resume', 'drain'].includes(command)) { + console.error('Error: Command must be one of: pause, resume, drain'); + process.exit(1); + } + + const client = new QueueClient({ + queueName: opts.queue, + redisUrl: opts.redis, + }); + + try { + console.log('\n=== Queue Control ===\n'); + console.log(`Queue: ${opts.queue}`); + console.log(`Command: ${command}\n`); + + switch (command) { + case 'pause': + await client.pause(); + console.log('✓ Queue paused'); + break; + case 'resume': + await client.resume(); + console.log('✓ Queue resumed'); + break; + case 'drain': + await client.drain(); + console.log('✓ Queue drained (all waiting jobs removed)'); + break; + } + + const isPaused = await client.isPaused(); + const counts = await client.getJobCounts(); + console.log(`\nCurrent state:`); + console.log(` Paused: ${isPaused}`); + console.log(` Waiting: ${counts.waiting}`); + console.log(` Active: ${counts.active}`); + } finally { + await client.close(); + } + }); + +program.parse(); diff --git a/cli/src/bin/queue-list.ts b/cli/src/bin/queue-list.ts new file mode 100644 index 0000000..76054cf --- /dev/null +++ b/cli/src/bin/queue-list.ts @@ -0,0 +1,72 @@ +#!/usr/bin/env node +import { Command } from 'commander'; +import { QueueClient, JobState } from '../queue-client'; + +const program = new Command(); + +function formatTimestamp(ts: number): string { + if (!ts) return 'N/A'; + return new Date(ts).toISOString().replace('T', ' ').slice(0, 19); +} + +program + .name('queue-list') + .description('List jobs in the queue') + .requiredOption('-q, --queue ', 'Queue name') + .option('-r, --redis ', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379') + .option('-s, --state ', 'Job state to list', 'waiting') + .option('-l, --limit ', 'Max jobs to show', '20') + .option('-f, --filter ', 'Filter by name') + .option('-v, --verbose', 'Show full details') + .action(async (opts) => { + const client = new QueueClient({ + queueName: opts.queue, + redisUrl: opts.redis, + }); + + try { + console.log('\n=== Queue List ===\n'); + console.log(`Queue: ${opts.queue}`); + console.log(`State: ${opts.state}`); + console.log(`Limit: ${opts.limit}`); + if (opts.filter) console.log(`Filter: "${opts.filter}"`); + console.log(); + + const jobs = await client.getJobs([opts.state as JobState], { + limit: parseInt(opts.limit, 10), + filter: opts.filter, + }); + + console.log(`Found: ${jobs.length} jobs\n`); + + if (jobs.length === 0) { + console.log('No jobs found.'); + return; + } + + for (let i = 0; i < jobs.length; i++) { + const job = jobs[i]; + const genParams = job.data?.generationParams as Record | undefined; + + console.log(`${i + 1}. ${job.name}`); + console.log(` ID: ${job.id}`); + console.log(` Created: ${formatTimestamp(job.timestamp)}`); + + if (opts.verbose && genParams) { + const prompt = genParams.prompt as string; + console.log(` Seed: ${genParams.seed || 'N/A'}`); + console.log(` Prompt: ${prompt?.slice(0, 100)}...`); + } + + if (opts.state === 'failed' && job.failedReason) { + console.log(` Failed: ${job.failedReason.slice(0, 100)}`); + } + + console.log(); + } + } finally { + await client.close(); + } + }); + +program.parse(); diff --git a/cli/src/bin/queue-status.ts b/cli/src/bin/queue-status.ts new file mode 100644 index 0000000..f132e87 --- /dev/null +++ b/cli/src/bin/queue-status.ts @@ -0,0 +1,54 @@ +#!/usr/bin/env node +import { Command } from 'commander'; +import { QueueClient } from '../queue-client'; + +const program = new Command(); + +program + .name('queue-status') + .description('View queue status and job counts') + .requiredOption('-q, --queue ', 'Queue name') + .option('-r, --redis ', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379') + .option('-j, --jobs ', 'Show recent jobs', '0') + .action(async (opts) => { + const client = new QueueClient({ + queueName: opts.queue, + redisUrl: opts.redis, + }); + + try { + console.log('\n=== Queue Status ===\n'); + console.log(`Queue: ${opts.queue}`); + console.log(`Redis: ${opts.redis}\n`); + + const counts = await client.getJobCounts(); + + console.log('Job Counts:'); + console.log(` Waiting: ${counts.waiting}`); + console.log(` Active: ${counts.active}`); + console.log(` Completed: ${counts.completed}`); + console.log(` Failed: ${counts.failed}`); + console.log(` Delayed: ${counts.delayed}`); + console.log(` Paused: ${counts.paused}`); + console.log(` ─────────────────`); + console.log(` Total: ${counts.total}`); + + const isPaused = await client.isPaused(); + console.log(`\nQueue paused: ${isPaused}`); + + const showJobs = parseInt(opts.jobs, 10); + if (showJobs > 0) { + console.log(`\n--- Recent Jobs (${showJobs}) ---\n`); + const jobs = await client.getJobs(['waiting', 'active', 'failed'], { limit: showJobs }); + + for (const job of jobs) { + const icon = job.state === 'waiting' ? '⏳' : job.state === 'active' ? '🔄' : '❌'; + console.log(` ${icon} [${job.state}] ${job.name}`); + } + } + } finally { + await client.close(); + } + }); + +program.parse(); diff --git a/cli/src/index.ts b/cli/src/index.ts new file mode 100644 index 0000000..a3bd8e2 --- /dev/null +++ b/cli/src/index.ts @@ -0,0 +1 @@ +export { QueueClient, QueueClientOptions, JobCounts, JobInfo, JobState } from './queue-client'; diff --git a/cli/src/queue-client.ts b/cli/src/queue-client.ts new file mode 100644 index 0000000..92b7a81 --- /dev/null +++ b/cli/src/queue-client.ts @@ -0,0 +1,169 @@ +import { Queue, Job } from 'bullmq'; +import IORedis from 'ioredis'; + +export type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' | 'paused'; + +export interface QueueClientOptions { + redisUrl?: string; + queueName: string; +} + +export interface JobCounts { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + paused: number; + total: number; +} + +export interface JobInfo { + id: string; + name: string; + state: JobState; + timestamp: number; + data: Record; + failedReason?: string; +} + +export class QueueClient { + private connection: IORedis; + private queue: Queue; + readonly queueName: string; + readonly redisUrl: string; + + constructor(options: QueueClientOptions) { + this.queueName = options.queueName; + this.redisUrl = options.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379'; + + this.connection = new IORedis(this.redisUrl, { + maxRetriesPerRequest: null, + enableReadyCheck: false, + }); + + this.queue = new Queue(this.queueName, { connection: this.connection }); + } + + async getJobCounts(): Promise { + const counts = await this.queue.getJobCounts( + 'waiting', + 'active', + 'completed', + 'failed', + 'delayed', + 'paused' + ); + + return { + waiting: counts.waiting || 0, + active: counts.active || 0, + completed: counts.completed || 0, + failed: counts.failed || 0, + delayed: counts.delayed || 0, + paused: counts.paused || 0, + total: Object.values(counts).reduce((a, b) => a + b, 0), + }; + } + + async getJobs( + states: JobState[], + options?: { limit?: number; filter?: string } + ): Promise { + const limit = options?.limit ?? 100; + const filter = options?.filter?.toLowerCase(); + + let jobs: Job[] = []; + for (const state of states) { + const stateJobs = await this.queue.getJobs([state], 0, -1); + jobs.push(...stateJobs); + } + + // Sort by timestamp (newest first) + jobs.sort((a, b) => (b.timestamp || 0) - (a.timestamp || 0)); + + // Apply filter + if (filter) { + jobs = jobs.filter((job) => { + const name = (job.data?.name as string) || job.id || ''; + const desc = (job.data?.description as string) || ''; + return name.toLowerCase().includes(filter) || desc.toLowerCase().includes(filter); + }); + } + + // Limit + jobs = jobs.slice(0, limit); + + // Map to JobInfo + const result: JobInfo[] = []; + for (const job of jobs) { + const state = await job.getState(); + result.push({ + id: job.id || '', + name: (job.data?.name as string) || job.id || 'unnamed', + state: state as JobState, + timestamp: job.timestamp || 0, + data: job.data as Record, + failedReason: job.failedReason, + }); + } + + return result; + } + + async clearJobs( + states: JobState[], + options?: { filter?: string } + ): Promise<{ cleared: number; errors: number }> { + const filter = options?.filter?.toLowerCase(); + + let jobs: Job[] = []; + for (const state of states) { + const stateJobs = await this.queue.getJobs([state], 0, -1); + jobs.push(...stateJobs); + } + + // Apply filter + if (filter) { + jobs = jobs.filter((job) => { + const name = (job.data?.name as string) || job.id || ''; + return name.toLowerCase().includes(filter); + }); + } + + let cleared = 0; + let errors = 0; + + for (const job of jobs) { + try { + await job.remove(); + cleared++; + } catch { + errors++; + } + } + + return { cleared, errors }; + } + + async pause(): Promise { + await this.queue.pause(); + } + + async resume(): Promise { + await this.queue.resume(); + } + + async drain(): Promise { + await this.queue.drain(); + } + + async isPaused(): Promise { + return this.queue.isPaused(); + } + + async close(): Promise { + await this.queue.close(); + await this.connection.quit(); + } +} diff --git a/cli/tsconfig.json b/cli/tsconfig.json new file mode 100644 index 0000000..a29cb55 --- /dev/null +++ b/cli/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "commonjs", + "lib": ["ES2022"], + "declaration": true, + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "outDir": "./dist", + "rootDir": "./src" + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +}