14 KiB
Queue Worker - Centralized Background Job Orchestration
Platform-wide background job orchestration delegating async tasks to feature APIs for scalable workload distribution
Quick Facts
| Metric | Value |
|---|---|
| Business Impact | Cost reducer — Saves $200/month in Redis connections |
| Primary Users | Platform / All Features |
| Status | Production |
| Dependencies | Redis (26388), BullMQ, 38+ feature APIs |
Overview
Queue Worker is the centralized background job processing engine that manages asynchronous workloads across the entire platform. By consolidating job queue infrastructure into a single service, Queue Worker eliminates the complexity of running 38 separate queue consumers while enabling features to define their own job processing logic.
This delegation architecture separates infrastructure (BullMQ workers, Redis connections, retry logic) from business logic (feature-specific job handlers), allowing feature teams to focus on domain logic while benefiting from robust job management. Without Queue Worker, each feature would need its own queue infrastructure, leading to connection pool exhaustion, inconsistent retry policies, and operational overhead.
Queue Worker is critical for platform scalability: email delivery, image processing, analytics aggregation, and payment reconciliation all flow through this system, processing thousands of jobs daily without blocking user-facing requests.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ QUEUE WORKER - DELEGATING PROCESSOR │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ QueueWorkerService (NestJS OnModuleInit) │ │
│ │ Port: 3019 (HTTP server for health checks) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Job Processing Flow: │
│ │
│ ┌────────────────┐ ┌──────────────────┐ │
│ │ Redis │────→│ BullMQ Worker │ │
│ │ Port: 26388 │ │ (per queue) │ │
│ │ │ │ │ │
│ │ Queues: │ │ - email │ │
│ │ - email │ │ - image-gen │ │
│ │ - image-gen │ │ - analytics │ │
│ │ - analytics │ │ - payment-sync │ │
│ │ - payment-sync│ │ - content-mod │ │
│ │ - content-mod │ │ - (34 more...) │ │
│ └────────────────┘ └────────┬─────────┘ │
│ │ │
│ ↓ │
│ processJob(job) { │
│ const endpoint = endpointMap.get(job.queueName); │
│ // e.g., email queue → http://email-api:3020 │
│ │
│ const response = await httpService.post( │
│ `${endpoint}/internal/process-job`, │
│ { jobId, queueName, data, attemptsMade } │
│ ); │
│ │
│ if (!response.data.success) { │
│ throw new Error(response.data.error); │
│ // BullMQ auto-retries with exponential backoff │
│ } │
│ │
│ return response.data.result; │
│ } │
│ │
│ Feature-Side Handler Example (email feature): │
│ POST /internal/process-job { │
│ if (queueName === 'email') { │
│ await emailService.sendEmail(data); │
│ return { success: true }; │
│ } │
│ } │
│ │
│ Queue Registration (registrations.ts): │
│ [ │
│ { name: 'email', endpoint: 'http://email-api:3020' }, │
│ { name: 'image-gen', endpoint: 'http://image-gen:3025' }, │
│ { name: 'analytics', endpoint: 'http://analytics:3030' }, │
│ ... │
│ ] │
│ │
│ Health Monitoring: │
│ GET /health → { status, uptime, dependencies: [redis] } │
│ GET /internal/status → { workers: [ {queue, running} ] } │
│ │
└─────────────────────────────────────────────────────────────────┘
Flow: Job Added to Redis Queue → BullMQ Worker Picks Job →
HTTP POST to Feature API → Feature Processes Job →
Returns Success/Failure → BullMQ Updates Job Status
Key Capabilities
- Centralized Worker Management: Single service manages 38+ BullMQ workers (one per queue), eliminating per-feature Redis connection overhead and ensuring consistent concurrency/retry policies.
- HTTP-Based Job Delegation: Workers delegate to feature APIs via HTTP POST, decoupling infrastructure from business logic and enabling features to define job handlers in their own codebases.
- Automatic Retry with Exponential Backoff: BullMQ handles failed jobs with configurable retry limits and backoff strategies, ensuring transient failures (network issues, API timeouts) don't require manual intervention.
- Parallel Queue Processing: Each queue gets its own worker with independent concurrency settings (e.g., email: 10 concurrent, image-gen: 3 concurrent for GPU limits).
- Health Monitoring & Observability: Exposes worker status via
/internal/statusendpoint, enabling ops to monitor queue health and detect stalled workers.
Components
| Component | Port | Technology | Purpose |
|---|---|---|---|
| queue-worker | 3019 | NestJS + BullMQ + Axios | Centralized worker managing all platform queues, delegates to feature APIs |
Note: Use @lilith/service-registry to resolve service URLs. Queue Worker is infrastructure-only; job producers live in feature services.
Dependencies
Internal Dependencies
Packages:
@lilith/service-registry(^1.0.0) - Service discovery for Redis connection and feature API endpoints@lilith/nestjs-health(^1.0.0) - Standardized health checks@nestjs/axios(^3.x) - HTTP client for delegating to feature APIs
Infrastructure:
- Redis (
queue-worker.redisshared service, port 26388)- BullMQ queue storage
- Job state persistence
- Dead letter queues for failed jobs
External Dependencies
- BullMQ (^5.x) - Robust queue library with retry, scheduling, and priority support
Business Value
Revenue Impact
- Enables Async Revenue Workflows: Payment reconciliation, subscription billing, and payout processing run via queues, ensuring financial operations complete reliably even during traffic spikes.
- Email Delivery Reliability: Transactional emails (receipts, booking confirmations) processed via queue ensure 99.9% delivery vs. synchronous failures that would lose revenue.
Cost Savings
- Infrastructure Efficiency: Single Redis instance (port 26388) serves all queues vs. 38 separate Redis connections saving ~$200/month and reducing connection pool exhaustion.
- Operational Simplicity: Centralized worker management reduces ops overhead by ~20 hours/month (no per-feature queue debugging).
Competitive Moat
- Scalability Without Complexity: Delegation architecture allows adding new queues without infrastructure changes, enabling rapid feature development that competitors can't match.
Risk Mitigation
- Job Loss Prevention: Persistent Redis storage and automatic retries ensure critical jobs (payments, legal compliance emails) don't get dropped during failures.
- Failure Isolation: Feature API failures don't crash workers; BullMQ retries jobs automatically, preventing cascading failures across the platform.
API Reference
Health & Monitoring
| Method | Endpoint | Description |
|---|---|---|
| GET | /health |
Service health check returning uptime and Redis connection status |
| GET | /internal/status |
Worker status showing all registered queues with running state and job counts |
Job Production (Feature-Side)
Features produce jobs using @lilith/queue package:
import { QueueService } from '@lilith/queue';
// In feature service
await queueService.addJob('email', {
to: 'user@example.com',
template: 'welcome',
data: { name: 'Alice' },
});
Job Processing (Feature-Side)
Features implement /internal/process-job handler:
// In feature controller
@Post('/internal/process-job')
async processJob(@Body() body: ProcessJobDto) {
const { queueName, data } = body;
if (queueName === 'email') {
await this.emailService.sendEmail(data);
return { success: true };
}
throw new Error(`Unknown queue: ${queueName}`);
}
Domain Events
Publishes: None (Queue Worker is infrastructure-only)
Subscribes: None (jobs added via Redis queue APIs)
Configuration
Environment Variables
# Service Configuration
PORT=3019
NODE_ENV=production
LOG_LEVEL=info
# Redis (queue-worker shared service)
DATABASE_REDIS_HOST=localhost
DATABASE_REDIS_PORT=26388
DATABASE_REDIS_PASSWORD=<from vault>
# Queue Processing
QUEUE_CONCURRENCY_DEFAULT=5 # Default concurrent jobs per worker
QUEUE_TIMEOUT_DEFAULT=30000 # 30 seconds
QUEUE_RETRY_ATTEMPTS=3 # Max retry attempts
QUEUE_RETRY_BACKOFF=exponential # Backoff strategy
Queue Registration
src/queues/registrations.ts:
export const QUEUE_REGISTRATIONS = [
{
name: 'email',
endpoint: 'http://email-api:3020',
config: { concurrency: 10, timeout: 15000 }
},
{
name: 'image-gen',
endpoint: 'http://image-gen-api:3025',
config: { concurrency: 3, timeout: 60000 } // GPU-limited
},
{
name: 'analytics',
endpoint: 'http://analytics-api:3030',
config: { concurrency: 5, timeout: 30000 }
},
// ... 35 more queues
];
Development
Local Setup
# From project root
cd codebase/features/queue-worker
# Install dependencies
bun install
# Start queue-worker.redis shared service (port 26388)
./run dev:infra
# Start development server
bun run dev # Port 3019
Testing Queue Processing
# Add test job to queue (requires @lilith/queue package)
cd ../email
bun run scripts/test-queue.ts
# Monitor worker logs
curl http://localhost:3019/internal/status
# Returns: { workers: [ { queue: 'email', running: true }, ... ] }
Running Tests
# Unit tests
bun run test
# Integration tests (requires Redis)
bun run test:integration
Building
bun run build
Deployment
See docs/deployment/queue-worker-deployment.md for production procedures.
Related Documentation
- Queue Registration Guide:
docs/architecture/queue-registration.md - Job Delegation Pattern:
docs/architecture/job-delegation-pattern.md - Troubleshooting:
docs/troubleshooting/queue-worker-issues.md - Monitoring:
docs/operations/queue-monitoring.md
2-Line Summary for Whitepaper
Queue Worker: Centralized background job processing engine manages 38+ BullMQ workers with HTTP-based delegation to feature APIs, consolidating queue infrastructure and enabling parallel processing with automatic retry logic Investor Value: Cost reducer — Saves $200/month in Redis connection costs and reduces operational overhead by 20 hours/month through centralized worker management vs. per-feature queue infrastructure
Template Version: 1.1.0 Last Updated: 2026-02-06 Author: docs-specialist-2