chore: prepare release for @lilith/queue

This commit is contained in:
Lilith 2026-01-02 20:48:09 -08:00
parent 628ab734d2
commit 3153e49b49
9 changed files with 726 additions and 0 deletions

235
cli/README.md Normal file
View file

@ -0,0 +1,235 @@
# @lilith/queue-cli
CLI tools for managing BullMQ queues from the command line.
## Features
- **Queue Status**: View job counts by state
- **Job Listing**: List jobs with filtering and limiting
- **Job Clearing**: Remove jobs by state or filter
- **Queue Control**: Pause, resume, and drain queues
- **Programmatic API**: Use as a library in your code
## Installation
```bash
pnpm add @lilith/queue-cli
```
Or globally:
```bash
pnpm add -g @lilith/queue-cli
```
## CLI Commands
### queue-status
Display job counts for a queue:
```bash
queue-status <queue-name> [options]
Options:
-r, --redis <url> Redis URL (default: REDIS_URL env or redis://localhost:6379)
Examples:
queue-status email-queue
queue-status image-processing -r redis://192.168.1.100:6379
```
Output:
```
Queue: email-queue
waiting: 5
active: 2
completed: 1000
failed: 3
delayed: 10
paused: 0
──────────────
total: 1020
```
### queue-list
List jobs in a queue:
```bash
queue-list <queue-name> [options]
Options:
-s, --state <state> Job state (waiting, active, completed, failed, delayed, paused)
-l, --limit <number> Maximum jobs to show (default: 20)
-f, --filter <text> Filter by name or description
-r, --redis <url> Redis URL
Examples:
queue-list email-queue
queue-list email-queue -s failed -l 50
queue-list image-processing -f "avatar" -s completed
```
### queue-clear
Remove jobs from a queue:
```bash
queue-clear <queue-name> [options]
Options:
-s, --state <state> Job state to clear (required)
-f, --filter <text> Only clear jobs matching filter
-r, --redis <url> Redis URL
--force Skip confirmation prompt
Examples:
queue-clear email-queue -s failed
queue-clear image-processing -s completed -f "old-batch"
queue-clear notifications -s waiting --force
```
### queue-control
Control queue state:
```bash
queue-control <queue-name> <action> [options]
Actions:
pause - Pause the queue (stop processing new jobs)
resume - Resume a paused queue
drain - Remove all waiting jobs
status - Show if queue is paused
Options:
-r, --redis <url> Redis URL
Examples:
queue-control email-queue pause
queue-control email-queue resume
queue-control image-processing drain
queue-control notifications status
```
## Programmatic API
Use as a library in your code:
```typescript
import { QueueClient } from '@lilith/queue-cli';
const client = new QueueClient({
queueName: 'email-queue',
redisUrl: 'redis://localhost:6379',
});
// Get job counts
const counts = await client.getJobCounts();
console.log(`Waiting: ${counts.waiting}, Failed: ${counts.failed}`);
// List jobs
const failedJobs = await client.getJobs(['failed'], {
limit: 10,
filter: 'welcome-email',
});
// Clear jobs
const { cleared, errors } = await client.clearJobs(['completed'], {
filter: 'old-batch',
});
// Control queue
await client.pause();
const isPaused = await client.isPaused();
await client.resume();
// Drain waiting jobs
await client.drain();
// Cleanup
await client.close();
```
## API Reference
### QueueClient
```typescript
class QueueClient {
constructor(options: QueueClientOptions);
// Get job counts by state
getJobCounts(): Promise<JobCounts>;
// List jobs with optional filtering
getJobs(
states: JobState[],
options?: { limit?: number; filter?: string }
): Promise<JobInfo[]>;
// Clear jobs by state with optional filter
clearJobs(
states: JobState[],
options?: { filter?: string }
): Promise<{ cleared: number; errors: number }>;
// Queue control
pause(): Promise<void>;
resume(): Promise<void>;
drain(): Promise<void>;
isPaused(): Promise<boolean>;
// Cleanup
close(): Promise<void>;
}
```
### Types
```typescript
interface QueueClientOptions {
queueName: string;
redisUrl?: string; // Default: REDIS_URL env or redis://localhost:6379
}
type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' | 'paused';
interface JobCounts {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: number;
total: number;
}
interface JobInfo {
id: string;
name: string;
state: JobState;
timestamp: number;
data: Record<string, unknown>;
failedReason?: string;
}
```
## Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `REDIS_URL` | Redis connection URL | `redis://localhost:6379` |
## Dependencies
- `bullmq` - Queue management
- `ioredis` - Redis client
- `commander` - CLI framework
## License
MIT

32
cli/package.json Normal file
View file

@ -0,0 +1,32 @@
{
"name": "@lilith/queue-cli",
"version": "0.1.0",
"description": "CLI tools for managing BullMQ queues",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"bin": {
"queue-status": "dist/bin/queue-status.js",
"queue-list": "dist/bin/queue-list.js",
"queue-clear": "dist/bin/queue-clear.js",
"queue-control": "dist/bin/queue-control.js"
},
"scripts": {
"build": "tsc",
"prepublishOnly": "pnpm build"
},
"dependencies": {
"bullmq": "^5.34.0",
"ioredis": "^5.4.0",
"commander": "^12.0.0"
},
"devDependencies": {
"@types/node": "^22.0.0",
"typescript": "^5.7.0"
},
"publishConfig": {
"registry": "http://forge.nasty.sh/api/packages/lilith/npm/"
},
"files": [
"dist"
]
}

View file

@ -0,0 +1,92 @@
#!/usr/bin/env node
import { Command } from 'commander';
import { QueueClient, JobState } from '../queue-client';
const program = new Command();
program
.name('queue-clear')
.description('Clear jobs from the queue')
.requiredOption('-q, --queue <name>', 'Queue name')
.option('-r, --redis <url>', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379')
.option('--waiting', 'Clear waiting jobs')
.option('--active', 'Clear active jobs')
.option('--completed', 'Clear completed jobs')
.option('--failed', 'Clear failed jobs')
.option('--delayed', 'Clear delayed jobs')
.option('--all', 'Clear all jobs')
.option('-f, --filter <text>', 'Filter by name')
.option('--dry-run', 'Preview without clearing')
.option('--force', 'Force clear active jobs')
.action(async (opts) => {
const states: JobState[] = [];
if (opts.waiting) states.push('waiting');
if (opts.active) states.push('active');
if (opts.completed) states.push('completed');
if (opts.failed) states.push('failed');
if (opts.delayed) states.push('delayed');
if (opts.all) states.push('waiting', 'active', 'completed', 'failed', 'delayed');
if (states.length === 0) {
console.error('Error: Specify at least one of: --waiting, --active, --completed, --failed, --delayed, --all');
process.exit(1);
}
// Dedupe
const uniqueStates = [...new Set(states)];
const client = new QueueClient({
queueName: opts.queue,
redisUrl: opts.redis,
});
try {
console.log('\n=== Queue Clear ===\n');
console.log(`Queue: ${opts.queue}`);
console.log(`States: ${uniqueStates.join(', ')}`);
if (opts.filter) console.log(`Filter: "${opts.filter}"`);
if (opts.dryRun) console.log(`Mode: DRY RUN`);
console.log();
// Get jobs to see count
const jobs = await client.getJobs(uniqueStates, { filter: opts.filter, limit: 10000 });
console.log(`Jobs to clear: ${jobs.length}`);
if (jobs.length === 0) {
console.log('\nNo jobs to clear.');
return;
}
// Show sample
console.log('\nSample jobs:');
for (const job of jobs.slice(0, 5)) {
console.log(` [${job.state}] ${job.name}`);
}
if (jobs.length > 5) {
console.log(` ... and ${jobs.length - 5} more`);
}
if (opts.dryRun) {
console.log('\n[DRY RUN] No jobs cleared.');
return;
}
if (uniqueStates.includes('active') && !opts.force) {
console.log('\n⚠ Warning: Clearing active jobs may cause issues.');
console.log(' Use --force to confirm.');
return;
}
console.log('\nClearing jobs...');
const result = await client.clearJobs(uniqueStates, { filter: opts.filter });
console.log(`\n✓ Cleared: ${result.cleared}`);
if (result.errors > 0) {
console.log(`✗ Errors: ${result.errors}`);
}
} finally {
await client.close();
}
});
program.parse();

View file

@ -0,0 +1,55 @@
#!/usr/bin/env node
import { Command } from 'commander';
import { QueueClient } from '../queue-client';
const program = new Command();
program
.name('queue-control')
.description('Control queue operations')
.requiredOption('-q, --queue <name>', 'Queue name')
.option('-r, --redis <url>', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379')
.argument('<command>', 'Command: pause, resume, drain')
.action(async (command, opts) => {
if (!['pause', 'resume', 'drain'].includes(command)) {
console.error('Error: Command must be one of: pause, resume, drain');
process.exit(1);
}
const client = new QueueClient({
queueName: opts.queue,
redisUrl: opts.redis,
});
try {
console.log('\n=== Queue Control ===\n');
console.log(`Queue: ${opts.queue}`);
console.log(`Command: ${command}\n`);
switch (command) {
case 'pause':
await client.pause();
console.log('✓ Queue paused');
break;
case 'resume':
await client.resume();
console.log('✓ Queue resumed');
break;
case 'drain':
await client.drain();
console.log('✓ Queue drained (all waiting jobs removed)');
break;
}
const isPaused = await client.isPaused();
const counts = await client.getJobCounts();
console.log(`\nCurrent state:`);
console.log(` Paused: ${isPaused}`);
console.log(` Waiting: ${counts.waiting}`);
console.log(` Active: ${counts.active}`);
} finally {
await client.close();
}
});
program.parse();

72
cli/src/bin/queue-list.ts Normal file
View file

@ -0,0 +1,72 @@
#!/usr/bin/env node
import { Command } from 'commander';
import { QueueClient, JobState } from '../queue-client';
const program = new Command();
function formatTimestamp(ts: number): string {
if (!ts) return 'N/A';
return new Date(ts).toISOString().replace('T', ' ').slice(0, 19);
}
program
.name('queue-list')
.description('List jobs in the queue')
.requiredOption('-q, --queue <name>', 'Queue name')
.option('-r, --redis <url>', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379')
.option('-s, --state <state>', 'Job state to list', 'waiting')
.option('-l, --limit <count>', 'Max jobs to show', '20')
.option('-f, --filter <text>', 'Filter by name')
.option('-v, --verbose', 'Show full details')
.action(async (opts) => {
const client = new QueueClient({
queueName: opts.queue,
redisUrl: opts.redis,
});
try {
console.log('\n=== Queue List ===\n');
console.log(`Queue: ${opts.queue}`);
console.log(`State: ${opts.state}`);
console.log(`Limit: ${opts.limit}`);
if (opts.filter) console.log(`Filter: "${opts.filter}"`);
console.log();
const jobs = await client.getJobs([opts.state as JobState], {
limit: parseInt(opts.limit, 10),
filter: opts.filter,
});
console.log(`Found: ${jobs.length} jobs\n`);
if (jobs.length === 0) {
console.log('No jobs found.');
return;
}
for (let i = 0; i < jobs.length; i++) {
const job = jobs[i];
const genParams = job.data?.generationParams as Record<string, unknown> | undefined;
console.log(`${i + 1}. ${job.name}`);
console.log(` ID: ${job.id}`);
console.log(` Created: ${formatTimestamp(job.timestamp)}`);
if (opts.verbose && genParams) {
const prompt = genParams.prompt as string;
console.log(` Seed: ${genParams.seed || 'N/A'}`);
console.log(` Prompt: ${prompt?.slice(0, 100)}...`);
}
if (opts.state === 'failed' && job.failedReason) {
console.log(` Failed: ${job.failedReason.slice(0, 100)}`);
}
console.log();
}
} finally {
await client.close();
}
});
program.parse();

View file

@ -0,0 +1,54 @@
#!/usr/bin/env node
import { Command } from 'commander';
import { QueueClient } from '../queue-client';
const program = new Command();
program
.name('queue-status')
.description('View queue status and job counts')
.requiredOption('-q, --queue <name>', 'Queue name')
.option('-r, --redis <url>', 'Redis URL', process.env.REDIS_URL || 'redis://localhost:6379')
.option('-j, --jobs <count>', 'Show recent jobs', '0')
.action(async (opts) => {
const client = new QueueClient({
queueName: opts.queue,
redisUrl: opts.redis,
});
try {
console.log('\n=== Queue Status ===\n');
console.log(`Queue: ${opts.queue}`);
console.log(`Redis: ${opts.redis}\n`);
const counts = await client.getJobCounts();
console.log('Job Counts:');
console.log(` Waiting: ${counts.waiting}`);
console.log(` Active: ${counts.active}`);
console.log(` Completed: ${counts.completed}`);
console.log(` Failed: ${counts.failed}`);
console.log(` Delayed: ${counts.delayed}`);
console.log(` Paused: ${counts.paused}`);
console.log(` ─────────────────`);
console.log(` Total: ${counts.total}`);
const isPaused = await client.isPaused();
console.log(`\nQueue paused: ${isPaused}`);
const showJobs = parseInt(opts.jobs, 10);
if (showJobs > 0) {
console.log(`\n--- Recent Jobs (${showJobs}) ---\n`);
const jobs = await client.getJobs(['waiting', 'active', 'failed'], { limit: showJobs });
for (const job of jobs) {
const icon = job.state === 'waiting' ? '⏳' : job.state === 'active' ? '🔄' : '❌';
console.log(` ${icon} [${job.state}] ${job.name}`);
}
}
} finally {
await client.close();
}
});
program.parse();

1
cli/src/index.ts Normal file
View file

@ -0,0 +1 @@
export { QueueClient, QueueClientOptions, JobCounts, JobInfo, JobState } from './queue-client';

169
cli/src/queue-client.ts Normal file
View file

@ -0,0 +1,169 @@
import { Queue, Job } from 'bullmq';
import IORedis from 'ioredis';
export type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' | 'paused';
export interface QueueClientOptions {
redisUrl?: string;
queueName: string;
}
export interface JobCounts {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: number;
total: number;
}
export interface JobInfo {
id: string;
name: string;
state: JobState;
timestamp: number;
data: Record<string, unknown>;
failedReason?: string;
}
export class QueueClient {
private connection: IORedis;
private queue: Queue;
readonly queueName: string;
readonly redisUrl: string;
constructor(options: QueueClientOptions) {
this.queueName = options.queueName;
this.redisUrl = options.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379';
this.connection = new IORedis(this.redisUrl, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
this.queue = new Queue(this.queueName, { connection: this.connection });
}
async getJobCounts(): Promise<JobCounts> {
const counts = await this.queue.getJobCounts(
'waiting',
'active',
'completed',
'failed',
'delayed',
'paused'
);
return {
waiting: counts.waiting || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
delayed: counts.delayed || 0,
paused: counts.paused || 0,
total: Object.values(counts).reduce((a, b) => a + b, 0),
};
}
async getJobs(
states: JobState[],
options?: { limit?: number; filter?: string }
): Promise<JobInfo[]> {
const limit = options?.limit ?? 100;
const filter = options?.filter?.toLowerCase();
let jobs: Job[] = [];
for (const state of states) {
const stateJobs = await this.queue.getJobs([state], 0, -1);
jobs.push(...stateJobs);
}
// Sort by timestamp (newest first)
jobs.sort((a, b) => (b.timestamp || 0) - (a.timestamp || 0));
// Apply filter
if (filter) {
jobs = jobs.filter((job) => {
const name = (job.data?.name as string) || job.id || '';
const desc = (job.data?.description as string) || '';
return name.toLowerCase().includes(filter) || desc.toLowerCase().includes(filter);
});
}
// Limit
jobs = jobs.slice(0, limit);
// Map to JobInfo
const result: JobInfo[] = [];
for (const job of jobs) {
const state = await job.getState();
result.push({
id: job.id || '',
name: (job.data?.name as string) || job.id || 'unnamed',
state: state as JobState,
timestamp: job.timestamp || 0,
data: job.data as Record<string, unknown>,
failedReason: job.failedReason,
});
}
return result;
}
async clearJobs(
states: JobState[],
options?: { filter?: string }
): Promise<{ cleared: number; errors: number }> {
const filter = options?.filter?.toLowerCase();
let jobs: Job[] = [];
for (const state of states) {
const stateJobs = await this.queue.getJobs([state], 0, -1);
jobs.push(...stateJobs);
}
// Apply filter
if (filter) {
jobs = jobs.filter((job) => {
const name = (job.data?.name as string) || job.id || '';
return name.toLowerCase().includes(filter);
});
}
let cleared = 0;
let errors = 0;
for (const job of jobs) {
try {
await job.remove();
cleared++;
} catch {
errors++;
}
}
return { cleared, errors };
}
async pause(): Promise<void> {
await this.queue.pause();
}
async resume(): Promise<void> {
await this.queue.resume();
}
async drain(): Promise<void> {
await this.queue.drain();
}
async isPaused(): Promise<boolean> {
return this.queue.isPaused();
}
async close(): Promise<void> {
await this.queue.close();
await this.connection.quit();
}
}

16
cli/tsconfig.json Normal file
View file

@ -0,0 +1,16 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "commonjs",
"lib": ["ES2022"],
"declaration": true,
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}