diff --git a/admin/backend/src/controllers/dlq.controller.spec.ts b/admin/backend/src/controllers/dlq.controller.spec.ts new file mode 100644 index 0000000..b6983d6 --- /dev/null +++ b/admin/backend/src/controllers/dlq.controller.spec.ts @@ -0,0 +1,181 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import type { Job } from 'bullmq'; +import { DlqController } from './dlq.controller'; +import { QueueAdminService } from '../services/queue-admin.service'; + +// Mock QueueAdminService +const createMockAdminService = () => { + return { + listFailedJobs: vi.fn(), + retryFailedJob: vi.fn(), + removeFailedJob: vi.fn(), + } as unknown as QueueAdminService; +}; + +describe('DlqController', () => { + let controller: DlqController; + let adminService: ReturnType; + + beforeEach(() => { + adminService = createMockAdminService(); + controller = new DlqController(adminService); + }); + + describe('listFailedJobs', () => { + it('should return paginated failed jobs with default pagination', async () => { + const mockJobs = [ + { + id: '1', + name: 'failed-job-1', + data: { test: 'data1' }, + failedReason: 'Error occurred', + }, + { + id: '2', + name: 'failed-job-2', + data: { test: 'data2' }, + failedReason: 'Another error', + }, + ] as Job[]; + + adminService.listFailedJobs.mockResolvedValue(mockJobs); + + const result = await controller.listFailedJobs('test-queue', {}); + + expect(result).toEqual({ + jobs: mockJobs, + pagination: { + page: 1, + limit: 50, + }, + }); + expect(adminService.listFailedJobs).toHaveBeenCalledWith('test-queue', 0, 50); + }); + + it('should return paginated failed jobs with custom page and limit', async () => { + const mockJobs = [ + { + id: '11', + name: 'failed-job-11', + data: { test: 'data11' }, + failedReason: 'Error on page 2', + }, + ] as Job[]; + + adminService.listFailedJobs.mockResolvedValue(mockJobs); + + const result = await controller.listFailedJobs('test-queue', { + page: 2, + limit: 10, + }); + + expect(result).toEqual({ + jobs: mockJobs, + pagination: { + page: 2, + limit: 10, + }, + }); + expect(adminService.listFailedJobs).toHaveBeenCalledWith('test-queue', 10, 10); + }); + + it('should calculate start offset correctly for page 3', async () => { + const mockJobs = [] as Job[]; + + adminService.listFailedJobs.mockResolvedValue(mockJobs); + + await controller.listFailedJobs('test-queue', { + page: 3, + limit: 25, + }); + + expect(adminService.listFailedJobs).toHaveBeenCalledWith('test-queue', 50, 25); + }); + + it('should handle empty failed jobs list', async () => { + adminService.listFailedJobs.mockResolvedValue([]); + + const result = await controller.listFailedJobs('test-queue', {}); + + expect(result).toEqual({ + jobs: [], + pagination: { + page: 1, + limit: 50, + }, + }); + }); + + it('should use page 1 when page is undefined', async () => { + const mockJobs = [{ id: '1' }] as Job[]; + adminService.listFailedJobs.mockResolvedValue(mockJobs); + + await controller.listFailedJobs('test-queue', { limit: 20 }); + + expect(adminService.listFailedJobs).toHaveBeenCalledWith('test-queue', 0, 20); + }); + + it('should use limit 50 when limit is undefined', async () => { + const mockJobs = [{ id: '1' }] as Job[]; + adminService.listFailedJobs.mockResolvedValue(mockJobs); + + await controller.listFailedJobs('test-queue', { page: 1 }); + + expect(adminService.listFailedJobs).toHaveBeenCalledWith('test-queue', 0, 50); + }); + }); + + describe('retryFailedJob', () => { + it('should call adminService.retryFailedJob with correct parameters', async () => { + adminService.retryFailedJob.mockResolvedValue(undefined); + + await controller.retryFailedJob('test-queue', '123'); + + expect(adminService.retryFailedJob).toHaveBeenCalledWith('test-queue', '123'); + expect(adminService.retryFailedJob).toHaveBeenCalledOnce(); + }); + + it('should handle retrying jobs with numeric job IDs as strings', async () => { + adminService.retryFailedJob.mockResolvedValue(undefined); + + await controller.retryFailedJob('my-queue', '456'); + + expect(adminService.retryFailedJob).toHaveBeenCalledWith('my-queue', '456'); + }); + + it('should return void on successful retry', async () => { + adminService.retryFailedJob.mockResolvedValue(undefined); + + const result = await controller.retryFailedJob('test-queue', '789'); + + expect(result).toBeUndefined(); + }); + }); + + describe('removeFailedJob', () => { + it('should call adminService.removeFailedJob with correct parameters', async () => { + adminService.removeFailedJob.mockResolvedValue(undefined); + + await controller.removeFailedJob('test-queue', '123'); + + expect(adminService.removeFailedJob).toHaveBeenCalledWith('test-queue', '123'); + expect(adminService.removeFailedJob).toHaveBeenCalledOnce(); + }); + + it('should handle removing jobs with numeric job IDs as strings', async () => { + adminService.removeFailedJob.mockResolvedValue(undefined); + + await controller.removeFailedJob('my-queue', '456'); + + expect(adminService.removeFailedJob).toHaveBeenCalledWith('my-queue', '456'); + }); + + it('should return void on successful removal', async () => { + adminService.removeFailedJob.mockResolvedValue(undefined); + + const result = await controller.removeFailedJob('test-queue', '789'); + + expect(result).toBeUndefined(); + }); + }); +}); diff --git a/admin/frontend/src/hooks/useJobs.spec.tsx b/admin/frontend/src/hooks/useJobs.spec.tsx new file mode 100644 index 0000000..696c441 --- /dev/null +++ b/admin/frontend/src/hooks/useJobs.spec.tsx @@ -0,0 +1,680 @@ +import React from 'react'; +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { renderHook, waitFor } from '@testing-library/react'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { useJobs } from './useJobs'; +import type { ReactNode } from 'react'; + +// Mock fetch globally +const mockFetch = vi.fn(); +global.fetch = mockFetch; + +// Create wrapper for react-query +const createWrapper = () => { + const queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false, + gcTime: 0, + }, + }, + }); + + return ({ children }: { children: ReactNode }) => ( + {children} + ); +}; + +describe('useJobs', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.clearAllTimers(); + }); + + describe('successful data fetching', () => { + it('should fetch jobs successfully with default options', async () => { + const mockJobs = [ + { + id: '1', + name: 'test-job-1', + state: 'completed', + data: { test: 'data' }, + timestamp: Date.now(), + }, + { + id: '2', + name: 'test-job-2', + state: 'failed', + data: { test: 'data' }, + timestamp: Date.now(), + }, + ]; + + const mockResponse = { + data: mockJobs, + pagination: { + total: 2, + page: 1, + limit: 20, + totalPages: 1, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + }), + { wrapper: createWrapper() } + ); + + // Initially loading + expect(result.current.isLoading).toBe(true); + expect(result.current.jobs).toEqual([]); + + // Wait for data to load + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(result.current.jobs).toEqual(mockJobs); + expect(result.current.total).toBe(2); + expect(result.current.page).toBe(1); + expect(result.current.limit).toBe(20); + expect(result.current.totalPages).toBe(1); + expect(result.current.error).toBeNull(); + expect(mockFetch).toHaveBeenCalledTimes(1); + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs' + ); + }); + + it('should build correct query string with all filter types', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 2, + limit: 10, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + state: 'failed', + name: 'test-job', + page: 2, + limit: 10, + sortBy: 'timestamp', + sortOrder: 'desc', + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs?state=failed&name=test-job&page=2&limit=10&sortBy=timestamp&sortOrder=desc' + ); + }); + + it('should return correct pagination info', async () => { + const mockJobs = Array.from({ length: 10 }, (_, i) => ({ + id: String(i + 1), + name: `job-${i + 1}`, + state: 'completed', + data: {}, + timestamp: Date.now(), + })); + + const mockResponse = { + data: mockJobs, + pagination: { + total: 100, + page: 3, + limit: 10, + totalPages: 10, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + page: 3, + limit: 10, + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(result.current.jobs).toHaveLength(10); + expect(result.current.total).toBe(100); + expect(result.current.page).toBe(3); + expect(result.current.limit).toBe(10); + expect(result.current.totalPages).toBe(10); + }); + + it('should handle enabled=false option', async () => { + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + enabled: false, + }), + { wrapper: createWrapper() } + ); + + // Should not make any requests + expect(mockFetch).not.toHaveBeenCalled(); + expect(result.current.isLoading).toBe(false); + expect(result.current.jobs).toEqual([]); + expect(result.current.total).toBe(0); + expect(result.current.page).toBe(1); + expect(result.current.limit).toBe(20); + expect(result.current.totalPages).toBe(0); + }); + + it('should refetch data when refetch is called', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValue({ + ok: true, + json: async () => mockResponse, + }); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + // Clear previous calls + mockFetch.mockClear(); + + // Trigger refetch + await result.current.refetch(); + + // Should make new request + await waitFor(() => { + expect(mockFetch).toHaveBeenCalledTimes(1); + }); + }); + }); + + describe('error handling', () => { + it('should handle fetch errors', async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + statusText: 'Internal Server Error', + }); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(result.current.error).toBeTruthy(); + expect(result.current.error?.message).toContain('Failed to fetch jobs'); + expect(result.current.jobs).toEqual([]); + expect(result.current.total).toBe(0); + }); + + it('should handle network errors', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(result.current.error).toBeTruthy(); + expect(result.current.error?.message).toBe('Network error'); + expect(result.current.jobs).toEqual([]); + }); + }); + + describe('URL encoding', () => { + it('should encode queue name in URL properly', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue with spaces', + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue%20with%20spaces/jobs' + ); + }); + + it('should encode special characters in queue name', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'queue/with/slashes', + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/queue%2Fwith%2Fslashes/jobs' + ); + }); + }); + + describe('staleTime configuration', () => { + it('should set correct staleTime', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValue({ + ok: true, + json: async () => mockResponse, + }); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + + // Clear mocks to test staleTime behavior + mockFetch.mockClear(); + + // Rerender the same hook (simulating component rerender) + // This should NOT trigger a new fetch since we're within staleTime window + result.current; // Access current to ensure no refetch on same instance + + // The staleTime prevents immediate refetch - no new calls expected + expect(mockFetch).not.toHaveBeenCalled(); + }); + }); + + describe('query string building', () => { + it('should build query string with only state filter', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + state: 'waiting', + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs?state=waiting' + ); + }); + + it('should build query string with only name filter', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + name: 'my-job', + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs?name=my-job' + ); + }); + + it('should build query string with pagination filters', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 5, + limit: 50, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + page: 5, + limit: 50, + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs?page=5&limit=50' + ); + }); + + it('should build query string with sort filters', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + sortBy: 'timestamp', + sortOrder: 'asc', + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs?sortBy=timestamp&sortOrder=asc' + ); + }); + + it('should not include undefined filter values in query string', async () => { + const mockResponse = { + data: [], + pagination: { + total: 0, + page: 1, + limit: 20, + totalPages: 0, + }, + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + state: 'waiting', + name: undefined, + page: undefined, + limit: undefined, + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(mockFetch).toHaveBeenCalled(); + }); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues/test-queue/jobs?state=waiting' + ); + }); + }); + + describe('default values', () => { + it('should return default pagination values when data is not available', async () => { + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + enabled: false, + }), + { wrapper: createWrapper() } + ); + + expect(result.current.jobs).toEqual([]); + expect(result.current.total).toBe(0); + expect(result.current.page).toBe(1); + expect(result.current.limit).toBe(20); + expect(result.current.totalPages).toBe(0); + }); + + it('should use filter values as fallback for pagination', async () => { + const mockResponse = { + data: [], + pagination: undefined, // Simulating missing pagination + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + json: async () => mockResponse, + }); + + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: 'test-queue', + filters: { + page: 3, + limit: 50, + }, + }), + { wrapper: createWrapper() } + ); + + await waitFor(() => { + expect(result.current.isLoading).toBe(false); + }); + + // Should fallback to filter values + expect(result.current.page).toBe(3); + expect(result.current.limit).toBe(50); + expect(result.current.total).toBe(0); + expect(result.current.totalPages).toBe(0); + }); + }); + + describe('empty queue name handling', () => { + it('should not fetch when queue name is empty', async () => { + const { result } = renderHook( + () => + useJobs({ + apiUrl: 'http://localhost:3000', + queueName: '', + }), + { wrapper: createWrapper() } + ); + + // Should not make any requests + expect(mockFetch).not.toHaveBeenCalled(); + expect(result.current.isLoading).toBe(false); + expect(result.current.jobs).toEqual([]); + }); + }); +}); diff --git a/admin/frontend/src/hooks/useQueueWebSocket.spec.tsx b/admin/frontend/src/hooks/useQueueWebSocket.spec.tsx new file mode 100644 index 0000000..2b0b871 --- /dev/null +++ b/admin/frontend/src/hooks/useQueueWebSocket.spec.tsx @@ -0,0 +1,626 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { renderHook, act, waitFor } from '@testing-library/react'; +import { useQueueWebSocket } from './useQueueWebSocket'; +import type { QueueMetrics } from '../types'; + +// Mock socket.io-client +const mockSocket = { + on: vi.fn(), + emit: vi.fn(), + disconnect: vi.fn(), + connected: false, +}; + +const mockIo = vi.fn(() => mockSocket); + +vi.mock('socket.io-client', () => ({ + io: mockIo, +})); + +describe('useQueueWebSocket', () => { + const defaultOptions = { + url: 'http://localhost:3000', + queues: ['email-queue', 'image-processing'], + token: 'test-token', + }; + + const createMockMetrics = (name: string): QueueMetrics => ({ + name, + counts: { + waiting: 5, + active: 2, + completed: 100, + failed: 3, + delayed: 1, + paused: 0, + }, + isPaused: false, + avgProcessingTime: 1500, + errorRate: 0.03, + throughput: { + completedLastHour: 50, + failedLastHour: 2, + }, + oldestJob: { + id: 'job-123', + state: 'waiting', + age: 3600000, + }, + timestamp: Date.now(), + }); + + beforeEach(() => { + vi.clearAllMocks(); + mockSocket.connected = false; + mockSocket.on.mockReturnValue(mockSocket); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('should initialize with correct default state', () => { + const { result } = renderHook(() => useQueueWebSocket({ ...defaultOptions, autoConnect: false })); + + expect(result.current.isConnected).toBe(false); + expect(result.current.metrics.size).toBe(0); + expect(result.current.error).toBeNull(); + expect(typeof result.current.subscribe).toBe('function'); + expect(typeof result.current.unsubscribe).toBe('function'); + expect(typeof result.current.disconnect).toBe('function'); + }); + + it('should connect to WebSocket server with correct configuration', async () => { + renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockIo).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues', + expect.objectContaining({ + auth: { token: 'test-token' }, + transports: ['websocket', 'polling'], + }) + ); + }); + }); + + it('should set isConnected to true on connect event', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalledWith('connect', expect.any(Function)); + }); + + // Simulate connect event + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + expect(connectHandler).toBeDefined(); + + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + expect(result.current.error).toBeNull(); + }); + }); + + it('should subscribe to initial queues on connect', async () => { + renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalledWith('connect', expect.any(Function)); + }); + + // Simulate connect event + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('subscribe', { + queues: ['email-queue', 'image-processing'], + }); + }); + }); + + it('should handle metrics event and update Map', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalledWith('metrics', expect.any(Function)); + }); + + // Simulate metrics event + const metricsHandler = mockSocket.on.mock.calls.find(call => call[0] === 'metrics')?.[1]; + expect(metricsHandler).toBeDefined(); + + const mockMetrics1 = createMockMetrics('email-queue'); + const mockMetrics2 = createMockMetrics('image-processing'); + + act(() => { + metricsHandler?.(mockMetrics1); + }); + + await waitFor(() => { + expect(result.current.metrics.size).toBe(1); + expect(result.current.metrics.get('email-queue')).toEqual(mockMetrics1); + }); + + act(() => { + metricsHandler?.(mockMetrics2); + }); + + await waitFor(() => { + expect(result.current.metrics.size).toBe(2); + expect(result.current.metrics.get('email-queue')).toEqual(mockMetrics1); + expect(result.current.metrics.get('image-processing')).toEqual(mockMetrics2); + }); + }); + + it('should handle disconnect event', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalledWith('disconnect', expect.any(Function)); + }); + + // First connect + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + // Then disconnect + const disconnectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'disconnect')?.[1]; + + act(() => { + mockSocket.connected = false; + disconnectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(false); + }); + }); + + it('should handle connect_error event', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalledWith('connect_error', expect.any(Function)); + }); + + const connectErrorHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect_error')?.[1]; + expect(connectErrorHandler).toBeDefined(); + + const testError = new Error('Connection failed'); + + act(() => { + connectErrorHandler?.(testError); + }); + + await waitFor(() => { + expect(result.current.error).toEqual(testError); + expect(result.current.isConnected).toBe(false); + }); + }); + + it('should subscribe to a queue using subscribe function', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + vi.clearAllMocks(); // Clear the initial subscribe call + + act(() => { + result.current.subscribe('new-queue'); + }); + + expect(mockSocket.emit).toHaveBeenCalledWith('subscribe', { + queues: ['new-queue'], + }); + }); + + it('should not subscribe if socket is not connected', async () => { + const { result } = renderHook(() => useQueueWebSocket({ ...defaultOptions, autoConnect: false })); + + act(() => { + result.current.subscribe('new-queue'); + }); + + expect(mockSocket.emit).not.toHaveBeenCalled(); + expect(console.warn).toHaveBeenCalledWith('Cannot subscribe: socket not connected'); + }); + + it('should not subscribe to the same queue twice', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + vi.clearAllMocks(); + + // Try to subscribe twice + act(() => { + result.current.subscribe('new-queue'); + result.current.subscribe('new-queue'); + }); + + // Should only emit once + expect(mockSocket.emit).toHaveBeenCalledTimes(1); + }); + + it('should unsubscribe from a queue and remove metrics', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + // Add metrics for the queue + const metricsHandler = mockSocket.on.mock.calls.find(call => call[0] === 'metrics')?.[1]; + const mockMetrics = createMockMetrics('email-queue'); + + act(() => { + metricsHandler?.(mockMetrics); + }); + + await waitFor(() => { + expect(result.current.metrics.has('email-queue')).toBe(true); + }); + + vi.clearAllMocks(); + + // Unsubscribe + act(() => { + result.current.unsubscribe('email-queue'); + }); + + expect(mockSocket.emit).toHaveBeenCalledWith('unsubscribe', { + queues: ['email-queue'], + }); + + await waitFor(() => { + expect(result.current.metrics.has('email-queue')).toBe(false); + }); + }); + + it('should not unsubscribe if not subscribed', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + vi.clearAllMocks(); + + // Try to unsubscribe from queue we never subscribed to + act(() => { + result.current.unsubscribe('non-existent-queue'); + }); + + // Should not emit + expect(mockSocket.emit).not.toHaveBeenCalled(); + }); + + it('should disconnect socket and clear state', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + // Disconnect + act(() => { + result.current.disconnect(); + }); + + expect(mockSocket.disconnect).toHaveBeenCalled(); + expect(result.current.isConnected).toBe(false); + }); + + it('should not connect when autoConnect is false', async () => { + renderHook(() => useQueueWebSocket({ ...defaultOptions, autoConnect: false })); + + // Wait a bit to ensure no connection is attempted + await new Promise(resolve => setTimeout(resolve, 100)); + + expect(mockIo).not.toHaveBeenCalled(); + }); + + it('should handle dynamic queue changes - subscribe to new queues', async () => { + const { result, rerender } = renderHook( + ({ queues }) => useQueueWebSocket({ ...defaultOptions, queues }), + { initialProps: { queues: ['email-queue'] } } + ); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + vi.clearAllMocks(); + + // Add new queue + rerender({ queues: ['email-queue', 'image-processing'] }); + + await waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('subscribe', { + queues: ['image-processing'], + }); + }); + }); + + it('should handle dynamic queue changes - unsubscribe from removed queues', async () => { + const { result, rerender } = renderHook( + ({ queues }) => useQueueWebSocket({ ...defaultOptions, queues }), + { initialProps: { queues: ['email-queue', 'image-processing'] } } + ); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate connected state + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + // Add metrics for both queues + const metricsHandler = mockSocket.on.mock.calls.find(call => call[0] === 'metrics')?.[1]; + act(() => { + metricsHandler?.(createMockMetrics('email-queue')); + metricsHandler?.(createMockMetrics('image-processing')); + }); + + await waitFor(() => { + expect(result.current.metrics.size).toBe(2); + }); + + vi.clearAllMocks(); + + // Remove image-processing queue + rerender({ queues: ['email-queue'] }); + + await waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('unsubscribe', { + queues: ['image-processing'], + }); + expect(result.current.metrics.has('image-processing')).toBe(false); + expect(result.current.metrics.has('email-queue')).toBe(true); + }); + }); + + it('should not handle queue changes if socket is not connected', async () => { + const { rerender } = renderHook( + ({ queues }) => useQueueWebSocket({ ...defaultOptions, queues, autoConnect: false }), + { initialProps: { queues: ['email-queue'] } } + ); + + // Change queues without connecting + rerender({ queues: ['email-queue', 'image-processing'] }); + + // Wait a bit + await new Promise(resolve => setTimeout(resolve, 100)); + + // Should not attempt to subscribe + expect(mockSocket.emit).not.toHaveBeenCalled(); + }); + + it('should cleanup socket on unmount', async () => { + const { unmount } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + unmount(); + + expect(mockSocket.disconnect).toHaveBeenCalled(); + }); + + it('should resubscribe to queues after reconnection', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + // Simulate initial connect + const connectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'connect')?.[1]; + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }); + + // Verify initial subscription + expect(mockSocket.emit).toHaveBeenCalledWith('subscribe', { + queues: ['email-queue', 'image-processing'], + }); + + // Simulate disconnect + const disconnectHandler = mockSocket.on.mock.calls.find(call => call[0] === 'disconnect')?.[1]; + + vi.clearAllMocks(); + + act(() => { + mockSocket.connected = false; + disconnectHandler?.(); + }); + + // Wait for disconnect to be reflected + await waitFor(() => { + expect(result.current.isConnected).toBe(false); + }, { timeout: 2000 }); + + // Simulate reconnect + act(() => { + mockSocket.connected = true; + connectHandler?.(); + }); + + // Wait for reconnect to be reflected + await waitFor(() => { + expect(result.current.isConnected).toBe(true); + }, { timeout: 2000 }); + + // Verify resubscription + await waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('subscribe', { + queues: ['email-queue', 'image-processing'], + }); + }); + }); + + it('should handle metrics updates for the same queue', async () => { + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + await waitFor(() => { + expect(mockSocket.on).toHaveBeenCalled(); + }); + + const metricsHandler = mockSocket.on.mock.calls.find(call => call[0] === 'metrics')?.[1]; + + const metrics1 = createMockMetrics('email-queue'); + const metrics2 = { ...createMockMetrics('email-queue'), counts: { ...metrics1.counts, waiting: 10 } }; + + act(() => { + metricsHandler?.(metrics1); + }); + + await waitFor(() => { + expect(result.current.metrics.get('email-queue')?.counts.waiting).toBe(5); + }); + + act(() => { + metricsHandler?.(metrics2); + }); + + await waitFor(() => { + expect(result.current.metrics.get('email-queue')?.counts.waiting).toBe(10); + }); + }); + + it('should handle missing token in connection options', async () => { + const optionsWithoutToken = { + url: 'http://localhost:3000', + queues: ['email-queue'], + }; + + renderHook(() => useQueueWebSocket(optionsWithoutToken)); + + await waitFor(() => { + expect(mockIo).toHaveBeenCalledWith( + 'http://localhost:3000/admin/queues', + expect.objectContaining({ + auth: undefined, + transports: ['websocket', 'polling'], + }) + ); + }); + }); + + it('should handle error when socket.io-client fails to load', async () => { + // Mock import failure + vi.doMock('socket.io-client', () => { + throw new Error('Module not found'); + }); + + const { result } = renderHook(() => useQueueWebSocket(defaultOptions)); + + // The hook should handle the error gracefully + // Note: In real implementation, the error would be set via the catch block + // but since we're testing with vi.mock, the dynamic import succeeds + // This test documents the expected behavior if import fails + expect(result.current.isConnected).toBe(false); + expect(result.current.metrics.size).toBe(0); + }); +}); diff --git a/bull-adapter/src/queue.service.spec.ts b/bull-adapter/src/queue.service.spec.ts new file mode 100644 index 0000000..67fa272 --- /dev/null +++ b/bull-adapter/src/queue.service.spec.ts @@ -0,0 +1,1037 @@ +import { describe, it, expect, vi, beforeEach, beforeAll } from 'vitest'; + +// Mock BullMQ before importing the service +vi.mock('bullmq', () => { + class MockQueue { + name: string; + options: any; + add = vi.fn().mockResolvedValue({ id: 'job-123', name: 'test-type' }); + addBulk = vi.fn().mockImplementation((jobs: any[]) => { + // Return mocked jobs based on the input length + return Promise.resolve( + jobs.map((job, index) => ({ + id: `job-${index + 1}`, + name: job.name, + })), + ); + }); + getJobCounts = vi.fn().mockResolvedValue({ + waiting: 5, + active: 2, + completed: 10, + failed: 1, + delayed: 3, + paused: 0, + }); + getJob = vi.fn().mockResolvedValue(null); + pause = vi.fn().mockResolvedValue(undefined); + resume = vi.fn().mockResolvedValue(undefined); + drain = vi.fn().mockResolvedValue(undefined); + clean = vi.fn().mockResolvedValue(['job-1', 'job-2', 'job-3']); + close = vi.fn().mockResolvedValue(undefined); + + constructor(name: string, options: any) { + this.name = name; + this.options = options; + } + } + + class MockQueueEvents { + name: string; + options: any; + on = vi.fn(); + close = vi.fn().mockResolvedValue(undefined); + + constructor(name: string, options: any) { + this.name = name; + this.options = options; + } + } + + return { + Queue: MockQueue, + QueueEvents: MockQueueEvents, + }; +}); + +// Dynamic imports +let BaseQueueService: any; + +describe('BaseQueueService', () => { + let service: InstanceType; + + beforeAll(async () => { + const serviceModule = await import('./queue.service'); + BaseQueueService = serviceModule.BaseQueueService; + }); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe('constructor', () => { + it('should initialize with default queue name', () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + }); + + expect(service).toBeDefined(); + }); + + it('should initialize with custom queue name', () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'custom-queue', + }); + + expect(service).toBeDefined(); + }); + + it('should initialize with default job options', () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultJobOptions: { + attempts: 5, + priority: 10, + }, + }); + + expect(service).toBeDefined(); + }); + }); + + describe('getQueue', () => { + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + }); + + it('should create a new queue instance with default name', () => { + const queue = service.getQueue(); + + expect(queue).toBeDefined(); + expect(queue.name).toBe('test-queue'); + }); + + it('should create a new queue instance with custom name', () => { + const queue = service.getQueue('custom-queue'); + + expect(queue).toBeDefined(); + expect(queue.name).toBe('custom-queue'); + }); + + it('should cache and return existing queue instance', () => { + const queue1 = service.getQueue('my-queue'); + const queue2 = service.getQueue('my-queue'); + + expect(queue1).toBe(queue2); + }); + + it('should create different instances for different queue names', () => { + const queue1 = service.getQueue('queue-1'); + const queue2 = service.getQueue('queue-2'); + + expect(queue1).not.toBe(queue2); + expect(queue1.name).toBe('queue-1'); + expect(queue2.name).toBe('queue-2'); + }); + + it('should pass connection options to queue', () => { + service = new BaseQueueService({ + connection: { host: 'redis.example.com', port: 6380 }, + defaultQueueName: 'test-queue', + }); + + const queue = service.getQueue(); + + expect(queue).toBeDefined(); + expect(queue.options.connection.host).toBe('redis.example.com'); + expect(queue.options.connection.port).toBe(6380); + }); + + it('should pass additional queue options', () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + queueOptions: { + defaultJobOptions: { removeOnComplete: true }, + }, + }); + + const queue = service.getQueue(); + + expect(queue).toBeDefined(); + expect(queue.options.defaultJobOptions).toEqual({ removeOnComplete: true }); + }); + }); + + describe('getQueueEvents', () => { + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + }); + + it('should create a new QueueEvents instance with default name', () => { + const events = service.getQueueEvents(); + + expect(events).toBeDefined(); + expect(events.name).toBe('test-queue'); + }); + + it('should create a new QueueEvents instance with custom name', () => { + const events = service.getQueueEvents('custom-queue'); + + expect(events).toBeDefined(); + expect(events.name).toBe('custom-queue'); + }); + + it('should cache and return existing QueueEvents instance', () => { + const events1 = service.getQueueEvents('my-queue'); + const events2 = service.getQueueEvents('my-queue'); + + expect(events1).toBe(events2); + }); + + it('should pass connection options to QueueEvents', () => { + service = new BaseQueueService({ + connection: { host: 'redis.example.com', port: 6380 }, + }); + + const events = service.getQueueEvents(); + + expect(events).toBeDefined(); + expect(events.options.connection.host).toBe('redis.example.com'); + expect(events.options.connection.port).toBe(6380); + }); + }); + + describe('addJob', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should add a job to the default queue', async () => { + const result = await service.addJob('send-email', { to: 'user@example.com' }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'send-email', + expect.objectContaining({ + type: 'send-email', + data: { to: 'user@example.com' }, + createdAt: expect.any(Number), + }), + expect.any(Object), + ); + expect(result).toEqual({ + id: 'job-123', + queue: 'test-queue', + type: 'send-email', + }); + }); + + it('should add a job to a specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + + const result = await service.addJob( + 'process-image', + { url: 'https://example.com/image.jpg' }, + undefined, + 'custom-queue', + ); + + expect(customQueue.add).toHaveBeenCalled(); + expect(result.queue).toBe('custom-queue'); + }); + + it('should merge job options with defaults', async () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultJobOptions: { + attempts: 5, + priority: 10, + }, + }); + mockQueue = service.getQueue(); + + await service.addJob('test-job', { data: 'test' }, { delay: 1000 }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining({ + attempts: 5, + priority: 10, + delay: 1000, + }), + ); + }); + + it('should override default options with provided options', async () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultJobOptions: { + attempts: 5, + }, + }); + mockQueue = service.getQueue(); + + await service.addJob('test-job', { data: 'test' }, { attempts: 10 }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining({ + attempts: 10, + }), + ); + }); + + it('should include correlationId when jobId is provided', async () => { + await service.addJob('test-job', { data: 'test' }, { jobId: 'corr-123' }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.objectContaining({ + correlationId: 'corr-123', + }), + expect.any(Object), + ); + }); + + it('should pass all job options to BullMQ', async () => { + const options = { + attempts: 3, + backoff: { type: 'exponential' as const, delay: 2000 }, + delay: 5000, + priority: 1, + removeOnComplete: true, + removeOnFail: false, + jobId: 'unique-id', + repeat: { every: 60000 }, + }; + + await service.addJob('test-job', { data: 'test' }, options); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining(options), + ); + }); + }); + + describe('addBulk', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should add multiple jobs in bulk', async () => { + const jobs = [ + { type: 'email', data: { to: 'user1@example.com' } }, + { type: 'sms', data: { to: '+1234567890' } }, + ]; + + const results = await service.addBulk(jobs); + + expect(mockQueue.addBulk).toHaveBeenCalledWith( + expect.arrayContaining([ + expect.objectContaining({ + name: 'email', + data: expect.objectContaining({ type: 'email', data: { to: 'user1@example.com' } }), + }), + expect.objectContaining({ + name: 'sms', + data: expect.objectContaining({ type: 'sms', data: { to: '+1234567890' } }), + }), + ]), + ); + expect(results).toHaveLength(2); + expect(results[0]).toEqual({ id: 'job-1', queue: 'test-queue', type: 'email' }); + expect(results[1]).toEqual({ id: 'job-2', queue: 'test-queue', type: 'sms' }); + }); + + it('should add bulk jobs to a specific queue', async () => { + const jobs = [{ type: 'test', data: { value: 1 } }]; + + const results = await service.addBulk(jobs, 'custom-queue'); + + expect(results).toHaveLength(1); + expect(results[0].queue).toBe('custom-queue'); + expect(results[0].type).toBe('test'); + }); + + it('should merge options for each job in bulk', async () => { + const jobs = [ + { type: 'job1', data: { value: 1 }, options: { priority: 5 } }, + { type: 'job2', data: { value: 2 }, options: { delay: 1000 } }, + ]; + + await service.addBulk(jobs); + + expect(mockQueue.addBulk).toHaveBeenCalledWith( + expect.arrayContaining([ + expect.objectContaining({ + opts: expect.objectContaining({ priority: 5 }), + }), + expect.objectContaining({ + opts: expect.objectContaining({ delay: 1000 }), + }), + ]), + ); + }); + + it('should handle empty bulk job array', async () => { + mockQueue.addBulk.mockResolvedValueOnce([]); + + const results = await service.addBulk([]); + + expect(results).toHaveLength(0); + }); + }); + + describe('getMetrics', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should return queue metrics', async () => { + const metrics = await service.getMetrics(); + + expect(mockQueue.getJobCounts).toHaveBeenCalled(); + expect(metrics).toEqual({ + name: 'test-queue', + waiting: 5, + active: 2, + completed: 10, + failed: 1, + delayed: 3, + paused: 0, + timestamp: expect.any(Number), + }); + }); + + it('should return metrics for a specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + + const metrics = await service.getMetrics('custom-queue'); + + expect(customQueue.getJobCounts).toHaveBeenCalled(); + expect(metrics.name).toBe('custom-queue'); + }); + + it('should handle missing job counts with default values', async () => { + mockQueue.getJobCounts.mockResolvedValueOnce({ + waiting: undefined, + active: 1, + }); + + const metrics = await service.getMetrics(); + + expect(metrics.waiting).toBe(0); + expect(metrics.active).toBe(1); + expect(metrics.completed).toBe(0); + }); + }); + + describe('getJobStatus', () => { + let mockQueue: any; + let mockJob: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should return job status for existing job', async () => { + mockJob = { + id: 'job-123', + name: 'send-email', + progress: 50, + data: { to: 'user@example.com' }, + attemptsMade: 1, + timestamp: 1234567890, + finishedOn: undefined, + processedOn: 1234567900, + failedReason: undefined, + getState: vi.fn().mockResolvedValue('active'), + }; + mockQueue.getJob.mockResolvedValueOnce(mockJob); + + const status = await service.getJobStatus('job-123'); + + expect(mockQueue.getJob).toHaveBeenCalledWith('job-123'); + expect(status).toEqual({ + id: 'job-123', + name: 'send-email', + state: 'active', + progress: 50, + data: { to: 'user@example.com' }, + attemptsMade: 1, + timestamp: 1234567890, + finishedOn: undefined, + processedOn: 1234567900, + failedReason: undefined, + }); + }); + + it('should return null for non-existent job', async () => { + mockQueue.getJob.mockResolvedValueOnce(null); + + const status = await service.getJobStatus('non-existent'); + + expect(status).toBeNull(); + }); + + it('should handle job with object progress', async () => { + mockJob = { + id: 'job-123', + name: 'test', + progress: { current: 5, total: 10 }, + data: {}, + attemptsMade: 0, + timestamp: 123, + getState: vi.fn().mockResolvedValue('active'), + }; + mockQueue.getJob.mockResolvedValueOnce(mockJob); + + const status = await service.getJobStatus('job-123'); + + expect(status?.progress).toBe(0); + }); + + it('should get job status from specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + mockJob = { + id: 'job-456', + name: 'test', + data: {}, + attemptsMade: 0, + timestamp: 123, + getState: vi.fn().mockResolvedValue('completed'), + }; + customQueue.getJob.mockResolvedValueOnce(mockJob); + + await service.getJobStatus('job-456', 'custom-queue'); + + expect(customQueue.getJob).toHaveBeenCalledWith('job-456'); + }); + }); + + describe('removeJob', () => { + let mockQueue: any; + let mockJob: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should remove existing job', async () => { + mockJob = { + id: 'job-123', + remove: vi.fn().mockResolvedValue(undefined), + }; + mockQueue.getJob.mockResolvedValueOnce(mockJob); + + const result = await service.removeJob('job-123'); + + expect(mockQueue.getJob).toHaveBeenCalledWith('job-123'); + expect(mockJob.remove).toHaveBeenCalled(); + expect(result).toBe(true); + }); + + it('should return false for non-existent job', async () => { + mockQueue.getJob.mockResolvedValueOnce(null); + + const result = await service.removeJob('non-existent'); + + expect(result).toBe(false); + }); + + it('should remove job from specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + mockJob = { + id: 'job-456', + remove: vi.fn().mockResolvedValue(undefined), + }; + customQueue.getJob.mockResolvedValueOnce(mockJob); + + await service.removeJob('job-456', 'custom-queue'); + + expect(customQueue.getJob).toHaveBeenCalledWith('job-456'); + }); + }); + + describe('retryJob', () => { + let mockQueue: any; + let mockJob: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should retry existing job', async () => { + mockJob = { + id: 'job-123', + retry: vi.fn().mockResolvedValue(undefined), + }; + mockQueue.getJob.mockResolvedValueOnce(mockJob); + + const result = await service.retryJob('job-123'); + + expect(mockQueue.getJob).toHaveBeenCalledWith('job-123'); + expect(mockJob.retry).toHaveBeenCalled(); + expect(result).toBe(true); + }); + + it('should return false for non-existent job', async () => { + mockQueue.getJob.mockResolvedValueOnce(null); + + const result = await service.retryJob('non-existent'); + + expect(result).toBe(false); + }); + + it('should retry job from specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + mockJob = { + id: 'job-456', + retry: vi.fn().mockResolvedValue(undefined), + }; + customQueue.getJob.mockResolvedValueOnce(mockJob); + + await service.retryJob('job-456', 'custom-queue'); + + expect(customQueue.getJob).toHaveBeenCalledWith('job-456'); + }); + }); + + describe('pause', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should pause the default queue', async () => { + await service.pause(); + + expect(mockQueue.pause).toHaveBeenCalled(); + }); + + it('should pause a specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + + await service.pause('custom-queue'); + + expect(customQueue.pause).toHaveBeenCalled(); + }); + }); + + describe('resume', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should resume the default queue', async () => { + await service.resume(); + + expect(mockQueue.resume).toHaveBeenCalled(); + }); + + it('should resume a specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + + await service.resume('custom-queue'); + + expect(customQueue.resume).toHaveBeenCalled(); + }); + }); + + describe('drain', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should drain the default queue', async () => { + await service.drain(); + + expect(mockQueue.drain).toHaveBeenCalled(); + }); + + it('should drain a specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + + await service.drain('custom-queue'); + + expect(customQueue.drain).toHaveBeenCalled(); + }); + }); + + describe('clean', () => { + let mockQueue: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueue = service.getQueue(); + }); + + it('should clean completed jobs', async () => { + const result = await service.clean(5000, 100, 'completed'); + + expect(mockQueue.clean).toHaveBeenCalledWith(5000, 100, 'completed'); + expect(result).toEqual(['job-1', 'job-2', 'job-3']); + }); + + it('should clean failed jobs', async () => { + await service.clean(10000, 50, 'failed'); + + expect(mockQueue.clean).toHaveBeenCalledWith(10000, 50, 'failed'); + }); + + it('should clean jobs from specific queue', async () => { + const customQueue = service.getQueue('custom-queue'); + + await service.clean(3600000, 1000, 'completed', 'custom-queue'); + + expect(customQueue.clean).toHaveBeenCalledWith(3600000, 1000, 'completed'); + }); + + it('should handle different job statuses', async () => { + const statuses: Array<'completed' | 'failed' | 'delayed' | 'wait' | 'active'> = [ + 'completed', + 'failed', + 'delayed', + 'wait', + 'active', + ]; + + for (const status of statuses) { + await service.clean(5000, 100, status); + expect(mockQueue.clean).toHaveBeenCalledWith(5000, 100, status); + } + }); + }); + + describe('waitForJob', () => { + let mockQueueEvents: any; + + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + mockQueueEvents = service.getQueueEvents(); + }); + + it('should resolve when job completes', async () => { + const eventHandlers: Record = {}; + mockQueueEvents.on.mockImplementation((event: string, handler: Function) => { + eventHandlers[event] = handler; + }); + + const waitPromise = service.waitForJob('job-123'); + + // Simulate job completion + setTimeout(() => { + eventHandlers['completed']?.({ jobId: 'job-123', returnvalue: { result: 'success' } }); + }, 10); + + const result = await waitPromise; + + expect(result).toEqual({ result: 'success' }); + }); + + it('should reject when job fails', async () => { + const eventHandlers: Record = {}; + mockQueueEvents.on.mockImplementation((event: string, handler: Function) => { + eventHandlers[event] = handler; + }); + + const waitPromise = service.waitForJob('job-123'); + + // Simulate job failure + setTimeout(() => { + eventHandlers['failed']?.({ jobId: 'job-123', failedReason: 'Connection timeout' }); + }, 10); + + await expect(waitPromise).rejects.toThrow('Connection timeout'); + }); + + it('should reject when job fails with timeout and clear timer', async () => { + const eventHandlers: Record = {}; + mockQueueEvents.on.mockImplementation((event: string, handler: Function) => { + eventHandlers[event] = handler; + }); + + const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout'); + + const waitPromise = service.waitForJob('job-123', undefined, 5000); + + // Simulate job failure + setTimeout(() => { + eventHandlers['failed']?.({ jobId: 'job-123', failedReason: 'Processing failed' }); + }, 10); + + await expect(waitPromise).rejects.toThrow('Processing failed'); + expect(clearTimeoutSpy).toHaveBeenCalled(); + }); + + it('should reject when timeout is reached', async () => { + const waitPromise = service.waitForJob('job-123', undefined, 100); + + await expect(waitPromise).rejects.toThrow('Job job-123 timed out after 100ms'); + }); + + it('should only resolve for matching job ID', async () => { + const eventHandlers: Record = {}; + mockQueueEvents.on.mockImplementation((event: string, handler: Function) => { + eventHandlers[event] = handler; + }); + + const waitPromise = service.waitForJob('job-123'); + + // Simulate different job completion + setTimeout(() => { + eventHandlers['completed']?.({ jobId: 'job-456', returnvalue: { result: 'wrong' } }); + }, 10); + + // Simulate correct job completion + setTimeout(() => { + eventHandlers['completed']?.({ jobId: 'job-123', returnvalue: { result: 'correct' } }); + }, 20); + + const result = await waitPromise; + + expect(result).toEqual({ result: 'correct' }); + }); + + it('should wait for job from specific queue', async () => { + const customQueueEvents = service.getQueueEvents('custom-queue'); + const eventHandlers: Record = {}; + customQueueEvents.on.mockImplementation((event: string, handler: Function) => { + eventHandlers[event] = handler; + }); + + const waitPromise = service.waitForJob('job-123', 'custom-queue', 1000); + + // Simulate job completion + setTimeout(() => { + eventHandlers['completed']?.({ jobId: 'job-123', returnvalue: 'result' }); + }, 10); + + const result = await waitPromise; + + expect(customQueueEvents.on).toHaveBeenCalledWith('completed', expect.any(Function)); + expect(customQueueEvents.on).toHaveBeenCalledWith('failed', expect.any(Function)); + expect(result).toBe('result'); + }); + + it('should clear timeout when job completes', async () => { + const eventHandlers: Record = {}; + mockQueueEvents.on.mockImplementation((event: string, handler: Function) => { + eventHandlers[event] = handler; + }); + + const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout'); + + const waitPromise = service.waitForJob('job-123', undefined, 5000); + + setTimeout(() => { + eventHandlers['completed']?.({ jobId: 'job-123', returnvalue: 'done' }); + }, 10); + + await waitPromise; + + expect(clearTimeoutSpy).toHaveBeenCalled(); + }); + }); + + describe('close', () => { + beforeEach(() => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultQueueName: 'test-queue', + }); + }); + + it('should close all queues and queue events', async () => { + const queue1 = service.getQueue('queue-1'); + const queue2 = service.getQueue('queue-2'); + const events1 = service.getQueueEvents('queue-1'); + const events2 = service.getQueueEvents('queue-2'); + + await service.close(); + + expect(queue1.close).toHaveBeenCalled(); + expect(queue2.close).toHaveBeenCalled(); + expect(events1.close).toHaveBeenCalled(); + expect(events2.close).toHaveBeenCalled(); + }); + + it('should clear internal caches after closing', async () => { + const queue1 = service.getQueue('queue-1'); + service.getQueueEvents('queue-1'); + + await service.close(); + + // After close, getting a queue should create a new instance + const queue2 = service.getQueue('queue-1'); + + expect(queue1).not.toBe(queue2); + }); + + it('should handle empty queues and events maps', async () => { + await expect(service.close()).resolves.toBeUndefined(); + }); + + it('should wait for all close operations to complete', async () => { + const queue1 = service.getQueue('queue-1'); + const queue2 = service.getQueue('queue-2'); + + let queue1Closed = false; + let queue2Closed = false; + + queue1.close.mockImplementation(async () => { + await new Promise((resolve) => setTimeout(resolve, 50)); + queue1Closed = true; + }); + + queue2.close.mockImplementation(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + queue2Closed = true; + }); + + await service.close(); + + expect(queue1Closed).toBe(true); + expect(queue2Closed).toBe(true); + }); + }); + + describe('mergeJobOptions (private method behavior)', () => { + it('should use default retry attempts when not specified', async () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + }); + const mockQueue = service.getQueue(); + + await service.addJob('test-job', { data: 'test' }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining({ + attempts: 3, // DEFAULT_RETRY_ATTEMPTS + }), + ); + }); + + it('should use default backoff configuration when not specified', async () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + }); + const mockQueue = service.getQueue(); + + await service.addJob('test-job', { data: 'test' }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining({ + backoff: { type: 'exponential', delay: 1000 }, + }), + ); + }); + + it('should use default removeOnComplete and removeOnFail', async () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + }); + const mockQueue = service.getQueue(); + + await service.addJob('test-job', { data: 'test' }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining({ + removeOnComplete: true, + removeOnFail: false, + }), + ); + }); + + it('should prefer custom default job options over system defaults', async () => { + service = new BaseQueueService({ + connection: { host: 'localhost', port: 6379 }, + defaultJobOptions: { + attempts: 10, + removeOnComplete: false, + }, + }); + const mockQueue = service.getQueue(); + + await service.addJob('test-job', { data: 'test' }); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'test-job', + expect.any(Object), + expect.objectContaining({ + attempts: 10, + removeOnComplete: false, + }), + ); + }); + }); +}); diff --git a/reporting/src/reporting.module.ts b/reporting/src/reporting.module.ts index 4137e71..6a043f3 100644 --- a/reporting/src/reporting.module.ts +++ b/reporting/src/reporting.module.ts @@ -90,7 +90,8 @@ export class ReportingModule { return { module: ReportingModule, imports: [ - TypeOrmModule.forFeature([JobEvent], options.connection), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + TypeOrmModule.forFeature([JobEvent], options.connection as any), ], providers: [JobReporterService, JobAnalyticsService], exports: [JobReporterService, JobAnalyticsService],