queue/cli/src/queue-client.ts

167 lines
3.8 KiB
TypeScript

import { Queue, Job } from 'bullmq';
export type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' | 'paused';
export interface QueueClientOptions {
redisUrl?: string;
queueName: string;
}
export interface JobCounts {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: number;
total: number;
}
export interface JobInfo {
id: string;
name: string;
state: JobState;
timestamp: number;
data: Record<string, unknown>;
failedReason?: string;
}
export class QueueClient {
private queue: Queue;
readonly queueName: string;
readonly redisUrl: string;
constructor(options: QueueClientOptions) {
this.queueName = options.queueName;
this.redisUrl = options.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379';
this.queue = new Queue(this.queueName, {
connection: {
url: this.redisUrl,
maxRetriesPerRequest: null,
enableReadyCheck: false,
},
});
}
async getJobCounts(): Promise<JobCounts> {
const counts = await this.queue.getJobCounts(
'waiting',
'active',
'completed',
'failed',
'delayed',
'paused'
);
return {
waiting: counts.waiting || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
delayed: counts.delayed || 0,
paused: counts.paused || 0,
total: Object.values(counts).reduce((a, b) => a + b, 0),
};
}
async getJobs(
states: JobState[],
options?: { limit?: number; filter?: string }
): Promise<JobInfo[]> {
const limit = options?.limit ?? 100;
const filter = options?.filter?.toLowerCase();
let jobs: Job[] = [];
for (const state of states) {
const stateJobs = await this.queue.getJobs([state], 0, -1);
jobs.push(...stateJobs);
}
// Sort by timestamp (newest first)
jobs.sort((a, b) => (b.timestamp || 0) - (a.timestamp || 0));
// Apply filter
if (filter) {
jobs = jobs.filter((job) => {
const name = (job.data?.name as string) || job.id || '';
const desc = (job.data?.description as string) || '';
return name.toLowerCase().includes(filter) || desc.toLowerCase().includes(filter);
});
}
// Limit
jobs = jobs.slice(0, limit);
// Map to JobInfo
const result: JobInfo[] = [];
for (const job of jobs) {
const state = await job.getState();
result.push({
id: job.id || '',
name: (job.data?.name as string) || job.id || 'unnamed',
state: state as JobState,
timestamp: job.timestamp || 0,
data: job.data as Record<string, unknown>,
failedReason: job.failedReason,
});
}
return result;
}
async clearJobs(
states: JobState[],
options?: { filter?: string }
): Promise<{ cleared: number; errors: number }> {
const filter = options?.filter?.toLowerCase();
let jobs: Job[] = [];
for (const state of states) {
const stateJobs = await this.queue.getJobs([state], 0, -1);
jobs.push(...stateJobs);
}
// Apply filter
if (filter) {
jobs = jobs.filter((job) => {
const name = (job.data?.name as string) || job.id || '';
return name.toLowerCase().includes(filter);
});
}
let cleared = 0;
let errors = 0;
for (const job of jobs) {
try {
await job.remove();
cleared++;
} catch {
errors++;
}
}
return { cleared, errors };
}
async pause(): Promise<void> {
await this.queue.pause();
}
async resume(): Promise<void> {
await this.queue.resume();
}
async drain(): Promise<void> {
await this.queue.drain();
}
async isPaused(): Promise<boolean> {
return this.queue.isPaused();
}
async close(): Promise<void> {
await this.queue.close();
}
}