diff --git a/architecture/event-flows.md b/architecture/event-flows.md new file mode 100644 index 0000000..60e4f25 --- /dev/null +++ b/architecture/event-flows.md @@ -0,0 +1,769 @@ +# Domain Event Flows + +This document describes the event flows for each category of domain events in the Lilith Platform. + +**Package**: `@lilith/domain-events@2.1.3` +**Queue**: `DOMAIN_EVENTS` (BullMQ) + +--- + +## Table of Contents + +1. [Image Generation Events](#image-generation-events) +2. [Email Events](#email-events) +3. [SEO Pipeline Events](#seo-pipeline-events) +4. [System Health Events](#system-health-events) +5. [Analytics Events](#analytics-events) +6. [Funnel Events](#funnel-events) + +--- + +## Image Generation Events + +**Category**: `image:*` +**Purpose**: Track image generation pipeline from queue to completion +**Events**: 6 types + +### Event Flow + +```mermaid +stateDiagram-v2 + [*] --> Queued: User requests variation + Queued --> Started: Worker picks up job + Started --> FamilyComplete: Each family finishes + FamilyComplete --> FamilyComplete: Repeat for all families + FamilyComplete --> Complete: All families succeed + FamilyComplete --> Partial: Some families fail + Started --> Failed: Critical error + Partial --> [*] + Complete --> [*] + Failed --> [*] + + note right of Queued + IMAGE_VARIATION_QUEUED + - variationId + - variationName + - familyCount + - queuedAt + end note + + note right of Started + IMAGE_VARIATION_STARTED + - variationId + - variationName + - familyCount + - startedAt + end note + + note right of FamilyComplete + IMAGE_FAMILY_COMPLETED + - variationId + - familyName + - familyIndex + - generationTimeMs + end note + + note right of Complete + IMAGE_VARIATION_COMPLETED + - familiesCompleted + - totalGenerationTimeMs + - publicUrls[] + end note + + note right of Partial + IMAGE_VARIATION_PARTIAL + - familiesCompleted + - familiesFailed + - errors[] + end note + + note right of Failed + IMAGE_VARIATION_FAILED + - errorMessage + - failedAt + end note +``` + +### Event Types + +| Event Type | When Emitted | Emitter | Consumer(s) | +|------------|--------------|---------|-------------| +| `IMAGE_VARIATION_QUEUED` | Variation added to queue | Image Generator API | Analytics (track queue depth) | +| `IMAGE_VARIATION_STARTED` | GPU processing begins | Image Generator Worker | Analytics (track concurrent jobs) | +| `IMAGE_FAMILY_COMPLETED` | Single family finishes | Image Generator Worker | Analytics (track per-family times) | +| `IMAGE_VARIATION_COMPLETED` | All families succeed | Image Generator Worker | Analytics (success metrics) | +| `IMAGE_VARIATION_PARTIAL` | Some families fail | Image Generator Worker | Analytics (partial success tracking) | +| `IMAGE_VARIATION_FAILED` | Variation fails completely | Image Generator Worker | Analytics (failure metrics) | + +### Consumer: ImageEventsProcessor + +**Location**: `codebase/features/image-generator/backend-api/src/processors/image-events.processor.ts` + +**Purpose**: Track image generation metrics for analytics dashboard + +**Processing**: +```typescript +@Processor(DOMAIN_EVENTS_QUEUE) +export class ImageEventsProcessor extends WorkerHost { + async process(job: Job) { + const { type, payload, idempotencyKey } = job.data + + // Idempotency check + if (this.processedEvents.has(idempotencyKey)) return + + switch (type) { + case DomainEventType.IMAGE_VARIATION_STARTED: + // Track generation start times, concurrent generations + break + case DomainEventType.IMAGE_FAMILY_COMPLETED: + // Track per-family generation times + break + case DomainEventType.IMAGE_VARIATION_COMPLETED: + // Track success rate, throughput + break + // ... + } + + this.processedEvents.add(idempotencyKey) + } +} +``` + +--- + +## Email Events + +**Category**: `email:*` +**Purpose**: Track email delivery lifecycle and handle bounces +**Events**: 5 types + +### Event Flow + +```mermaid +stateDiagram-v2 + [*] --> Queued: Email created + Queued --> Sending: Worker picks up + Sending --> Sent: SMTP success + Sending --> Failed: SMTP error + Sent --> Bounced: Bounce notification + Failed --> [*] + Sent --> [*] + Bounced --> [*] + + note right of Queued + EMAIL_QUEUED + - emailLogId + - recipient + - subject + - queuedAt + end note + + note right of Sending + EMAIL_SENDING + - emailLogId + - attemptNumber + - sendingAt + end note + + note right of Sent + EMAIL_SENT + - emailLogId + - sentAt + - messageId (SMTP) + end note + + note right of Failed + EMAIL_FAILED + - emailLogId + - errorMessage + - attemptNumber + - willRetry + end note + + note right of Bounced + EMAIL_BOUNCED + - emailLogId + - bounceType (hard/soft) + - bounceReason + - bouncedAt + end note +``` + +### Event Types + +| Event Type | When Emitted | Emitter | Consumer(s) | +|------------|--------------|---------|-------------| +| `EMAIL_QUEUED` | Email added to send queue | Email Service | Analytics (track email volume) | +| `EMAIL_SENDING` | Send attempt begins | Email Worker | Analytics (track attempts) | +| `EMAIL_SENT` | Successfully delivered | Email Worker | Analytics (delivery rate) | +| `EMAIL_FAILED` | Send fails | Email Worker | Analytics (failure rate) | +| `EMAIL_BOUNCED` | Bounce notification received | Email Webhook Handler | EmailEventsProcessor (suppress future sends) | + +### Consumer: EmailEventsProcessor + +**Location**: `codebase/features/email/backend-api/src/processors/email-events.processor.ts` + +**Purpose**: Handle bounce notifications and update recipient status + +**Processing**: +```typescript +@Processor(DOMAIN_EVENTS_QUEUE) +export class EmailEventsProcessor extends WorkerHost { + async process(job: Job) { + const { type, payload } = job.data + + if (type === DomainEventType.EMAIL_BOUNCED) { + const { emailLogId, bounceType, bounceReason } = payload + + // Update email log + await this.emailLogRepo.update(emailLogId, { + status: 'bounced', + bounceType, + bounceReason, + }) + + // If hard bounce, suppress future sends + if (bounceType === 'hard') { + await this.suppressionRepo.create({ + email: payload.recipient, + reason: bounceReason, + addedAt: new Date(), + }) + } + } + } +} +``` + +--- + +## SEO Pipeline Events + +**Category**: `seo:*` +**Purpose**: Orchestrate SEO content generation pipeline via event-driven state machine +**Events**: 6 types + +### Event Flow + +```mermaid +graph TD + A[SEO_PAGE_QUEUED] --> B1[Text Generation] + A --> B2[Image Generation] + + B1 --> C1[SEO_TEXT_GENERATED] + C1 --> D[Content Validation] + D --> E[SEO_CONTENT_VALIDATED] + + B2 --> C2[SEO_IMAGES_COMPLETED] + + E --> F{Both complete?} + C2 --> F + + F -->|Yes| G[Store English Content] + G --> H[Translation] + H --> I[SEO_PAGE_COMPLETED] + + B1 -->|Error| J[SEO_PAGE_FAILED] + B2 -->|Error| K[Continue without images] + D -->|Error| J + H -->|Error| J + + I --> L[✓ Pipeline Complete] + J --> M[✗ Pipeline Failed] + + style A fill:#e1f5ff + style I fill:#d4edda + style J fill:#f8d7da + style F fill:#fff3cd +``` + +### State Machine Logic + +The SEO pipeline uses an event-driven state machine with parallel execution: + +1. **Parallel Start**: Text generation and image generation start simultaneously +2. **Text → Validation**: Text must complete before validation +3. **Synchronization Point**: Both validation AND images must complete before storage +4. **Sequential End**: Storage → Translation → Completion (sequential) + +### Event Types + +| Event Type | When Emitted | Emitter | Consumer(s) | +|------------|--------------|---------|-------------| +| `SEO_PAGE_QUEUED` | Page generation requested | SEO API | SeoEventsProcessor (start pipeline) | +| `SEO_TEXT_GENERATED` | Text generation completes | Text Service | SeoEventsProcessor (trigger validation) | +| `SEO_IMAGES_COMPLETED` | Image generation completes | Image Service | SeoEventsProcessor (sync point) | +| `SEO_CONTENT_VALIDATED` | Validation completes | Validation Service | SeoEventsProcessor (sync point) | +| `SEO_PAGE_COMPLETED` | Full pipeline succeeds | Translation Service | Analytics (completion metrics) | +| `SEO_PAGE_FAILED` | Pipeline fails at any stage | Any stage | Analytics (failure tracking) | + +### Consumer: SeoEventsProcessor + +**Location**: `codebase/features/seo/backend-api/src/processors/seo-events.processor.ts` + +**Purpose**: Orchestrate SEO pipeline stages via event-driven state machine + +**Pipeline State**: +```typescript +interface PipelineState { + contentId: string + textGenerated: boolean // Text generation complete + imagesCompleted: boolean // Image generation complete + contentValidated: boolean // Validation complete + + textResult?: { title, description, h1, body } + imageResults?: Record + validationResult?: { correctedContent, valid } + + failedStage?: 'text' | 'images' | 'validation' | 'translation' + errorMessage?: string +} +``` + +**Processing**: +```typescript +@Processor(DOMAIN_EVENTS_QUEUE) +export class SeoEventsProcessor extends WorkerHost { + private readonly pipelineStates = new Map() + + async process(job: Job) { + const { type, payload } = job.data + + switch (type) { + case DomainEventType.SEO_PAGE_QUEUED: + // Initialize state, trigger text + images (parallel) + await this.handlePageQueued(event) + break + + case DomainEventType.SEO_CONTENT_VALIDATED: + // Mark validation done, check if images also done + state.contentValidated = true + if (state.imagesCompleted) { + await this.storeEnglishContent(state) + } + break + + case DomainEventType.SEO_IMAGES_COMPLETED: + // Mark images done, check if validation also done + state.imagesCompleted = true + if (state.contentValidated) { + await this.storeEnglishContent(state) + } + break + + // ... + } + } + + private async storeEnglishContent(state: PipelineState) { + // Both validation + images complete - safe to store + const content = await this.contentValidation.storeEnglishContent(...) + await this.contentTranslation.translateToSupportedLocales(...) + // Emits SEO_PAGE_COMPLETED + } +} +``` + +**Key Innovation**: Parallel execution with synchronization point eliminates blocking HTTP chain + +--- + +## System Health Events + +**Category**: `system:*` +**Purpose**: Monitor service health across features +**Events**: 4 types + +### Event Flow + +```mermaid +sequenceDiagram + participant S1 as Service 1 + participant S2 as Service 2 + participant Q as DOMAIN_EVENTS Queue + participant P as SystemEventsProcessor + participant D as Dashboard + + loop Every 30s + S1->>Q: SYSTEM_SERVICE_HEALTHY + S2->>Q: SYSTEM_SERVICE_HEALTHY + end + + Q->>P: Process health events + P->>P: Aggregate health status + P->>D: Update dashboard state + + Note over S1: Service fails health check + S1->>Q: SYSTEM_SERVICE_UNHEALTHY + Q->>P: Process unhealthy event + P->>Q: SYSTEM_ALERT_TRIGGERED + P->>D: Show alert + + Note over S1: Service recovers + S1->>Q: SYSTEM_SERVICE_HEALTHY + Q->>P: Process healthy event + P->>Q: SYSTEM_ALERT_RESOLVED + P->>D: Clear alert +``` + +### Performance Impact + +**Before** (Polling): +- Dashboard polls 6 services every 30s +- **17,280 HTTP requests/day** (6 services × 2,880 polls/day) + +**After** (Events): +- 6 services emit events every 30s +- **2,880 events/day** (6 services × 480 events/service) +- **60x reduction** in network operations + +### Event Types + +| Event Type | When Emitted | Emitter | Consumer(s) | +|------------|--------------|---------|-------------| +| `SYSTEM_SERVICE_HEALTHY` | Health check passes | All services (every 30s) | SystemEventsProcessor (aggregate status) | +| `SYSTEM_SERVICE_UNHEALTHY` | Health check fails | All services (when failing) | SystemEventsProcessor (trigger alert) | +| `SYSTEM_ALERT_TRIGGERED` | Service becomes unhealthy | SystemEventsProcessor | Dashboard (show alert) | +| `SYSTEM_ALERT_RESOLVED` | Service recovers | SystemEventsProcessor | Dashboard (clear alert) | + +### Consumer: SystemEventsProcessor + +**Location**: `codebase/features/status-dashboard/backend-api/src/processors/system-events.processor.ts` + +**Purpose**: Aggregate health status across all features for dashboard + +**Processing**: +```typescript +@Processor(DOMAIN_EVENTS_QUEUE) +export class SystemEventsProcessor extends WorkerHost { + private readonly healthStatus = new Map() + + async process(job: Job) { + const { type, payload } = job.data + + if (type === DomainEventType.SYSTEM_SERVICE_HEALTHY) { + const wasUnhealthy = this.healthStatus.get(payload.serviceName)?.status === 'unhealthy' + + this.healthStatus.set(payload.serviceName, { + status: 'healthy', + lastCheck: payload.checkedAt, + metrics: payload.metrics, + }) + + // If recovering from unhealthy, emit alert resolved + if (wasUnhealthy) { + await this.events.emitAlertResolved({ + alertId: `health-${payload.serviceName}`, + serviceName: payload.serviceName, + resolvedAt: new Date().toISOString(), + }) + } + } + + // Similar logic for SYSTEM_SERVICE_UNHEALTHY + } +} +``` + +--- + +## Analytics Events + +**Category**: `analytics:*` +**Purpose**: Notify consumers when aggregations complete +**Events**: 2 types + +### Event Flow + +```mermaid +graph LR + A[Hourly Cron] --> B[Run Aggregation] + B --> C[Update Database] + C --> D[Emit Event] + D --> E[ANALYTICS_HOURLY_AGGREGATION_COMPLETE] + E --> F[Dashboard Refreshes] + + G[Daily Cron] --> H[Run Aggregation] + H --> I[Update Database] + I --> J[Emit Event] + J --> K[ANALYTICS_DAILY_AGGREGATION_COMPLETE] + K --> L[Reports Generated] +``` + +### Event Types + +| Event Type | When Emitted | Emitter | Consumer(s) | +|------------|--------------|---------|-------------| +| `ANALYTICS_HOURLY_AGGREGATION_COMPLETE` | Hourly aggregation finishes | Analytics Service | Dashboard (refresh charts) | +| `ANALYTICS_DAILY_AGGREGATION_COMPLETE` | Daily aggregation finishes | Analytics Service | Report Generator | + +### Usage Pattern + +**Before**: Silent database updates, consumers poll for changes + +**After**: Consumers listen for events and react immediately + +```typescript +// Emitter (Analytics Service) +await this.aggregationService.runHourlyAggregation() + +await this.events.emitHourlyAggregationComplete({ + hour: currentHour, + metricsAggregated: metricCount, + completedAt: new Date().toISOString(), +}) + +// Consumer (Dashboard) +@Processor(DOMAIN_EVENTS_QUEUE) +export class DashboardEventsProcessor extends WorkerHost { + async process(job: Job) { + if (job.data.type === DomainEventType.ANALYTICS_HOURLY_AGGREGATION_COMPLETE) { + // Invalidate cache, refresh dashboard + await this.dashboardCache.invalidate() + } + } +} +``` + +--- + +## Funnel Events + +**Category**: `funnel:*` +**Purpose**: Track user conversion through onboarding funnel +**Events**: 7 types + +### Event Flow + +```mermaid +stateDiagram-v2 + [*] --> Visit: User arrives + Visit --> Signup: Creates account + Signup --> ProfileComplete: Completes profile + ProfileComplete --> FirstContent: Publishes first content + FirstContent --> Subscribe: Creates subscription + Subscribe --> Purchase: First payment + Purchase --> RepeatPurchase: Subsequent payments + + note right of Visit + FUNNEL_VISIT + - sessionId + - attribution + end note + + note right of Signup + FUNNEL_SIGNUP + - userId + - method (email/oauth) + end note + + note right of ProfileComplete + FUNNEL_PROFILE_COMPLETE + - userId + - profileFields[] + end note + + note right of FirstContent + FUNNEL_FIRST_CONTENT + - userId + - contentType + end note + + note right of Subscribe + FUNNEL_SUBSCRIBE + - subscriptionId + - plan + end note + + note right of Purchase + FUNNEL_PURCHASE + - transactionId + - amount + end note + + note right of RepeatPurchase + FUNNEL_REPEAT_PURCHASE + - transactionId + - lifetimeValue + end note +``` + +### Event Types + +| Event Type | When Emitted | Emitter | Consumer(s) | +|------------|--------------|---------|-------------| +| `FUNNEL_VISIT` | First page view of session | Web Server | Analytics (track sources) | +| `FUNNEL_SIGNUP` | User completes registration | SSO Service | Analytics (conversion rate) | +| `FUNNEL_PROFILE_COMPLETE` | Profile marked complete | Profile Service | Analytics (activation rate) | +| `FUNNEL_FIRST_CONTENT` | First content interaction | Content Service | Analytics (engagement rate) | +| `FUNNEL_SUBSCRIBE` | Subscription created | Subscription Service | Analytics (subscription rate) | +| `FUNNEL_PURCHASE` | First payment successful | Payment Service | Analytics (monetization rate) | +| `FUNNEL_REPEAT_PURCHASE` | 2nd+ payment | Payment Service | Analytics (retention, LTV) | + +### Analytics Use Case + +The funnel events enable cohort analysis and conversion tracking: + +```typescript +// Example: Calculate conversion rates +const funnel = await db.query(` + SELECT + COUNT(DISTINCT CASE WHEN event = 'funnel:visit' THEN session_id END) as visits, + COUNT(DISTINCT CASE WHEN event = 'funnel:signup' THEN user_id END) as signups, + COUNT(DISTINCT CASE WHEN event = 'funnel:purchase' THEN user_id END) as purchases + FROM domain_events + WHERE timestamp > NOW() - INTERVAL '30 days' +`) + +const conversionRate = (funnel.purchases / funnel.visits) * 100 +``` + +--- + +## Common Patterns + +### Idempotency + +All event processors use idempotency keys to prevent duplicate processing: + +```typescript +@Processor(DOMAIN_EVENTS_QUEUE) +export class MyEventsProcessor extends WorkerHost { + private readonly processedEvents = new Set() + + async process(job: Job) { + const { idempotencyKey } = job.data + + if (idempotencyKey && this.processedEvents.has(idempotencyKey)) { + return { success: true, skipped: true } + } + + // Process event... + + this.processedEvents.add(idempotencyKey) + } +} +``` + +**Current**: In-memory Set (sufficient for single-instance) +**Future**: Redis-backed for multi-instance deployments + +### Dual-Write Pattern + +During migration, services perform dual-write (database + event): + +```typescript +// 1. Update database (existing behavior) +const entity = await this.repo.save(entityData) + +// 2. Emit event (new behavior - non-blocking) +await this.events.emitSomeEvent({ + ...payload, + timestamp: new Date().toISOString(), +}) +``` + +**Non-blocking**: Event emission failures are logged but don't throw + +### Error Handling + +Event processors should NOT throw errors (prevents BullMQ retry spam): + +```typescript +async process(job: Job) { + try { + // Process event + } catch (error) { + // Log error but don't throw + this.logger.error(`Failed to process event: ${error.message}`) + + // Optionally emit failure event + await this.events.emitSystemAlertTriggered({ + alertId: `processor-error-${Date.now()}`, + message: error.message, + }) + + return { success: false } // Don't throw! + } +} +``` + +--- + +## Testing Event Flows + +### Unit Testing Event Emission + +```typescript +describe('ImageGeneratorService', () => { + it('should emit IMAGE_VARIATION_COMPLETED event', async () => { + const emitSpy = jest.spyOn(eventsEmitter, 'emitImageVariationCompleted') + + await service.generateVariation(params) + + expect(emitSpy).toHaveBeenCalledWith({ + variationId: expect.any(String), + variationName: 'test-variation', + familiesCompleted: 4, + totalGenerationTimeMs: expect.any(Number), + publicUrls: expect.any(Array), + completedAt: expect.any(String), + }) + }) +}) +``` + +### Integration Testing Event Processing + +```typescript +describe('ImageEventsProcessor', () => { + it('should process IMAGE_VARIATION_COMPLETED event', async () => { + const event: BaseDomainEvent = { + type: DomainEventType.IMAGE_VARIATION_COMPLETED, + payload: { ... }, + timestamp: new Date().toISOString(), + correlationId: 'test-123', + idempotencyKey: 'variation-complete-123', + source: 'image-generator', + } + + const result = await processor.process({ data: event } as Job) + + expect(result.success).toBe(true) + expect(result.skipped).toBeUndefined() + }) + + it('should skip duplicate events (idempotency)', async () => { + const event = { ... } // Same event as above + + await processor.process({ data: event } as Job) // First time + const result = await processor.process({ data: event } as Job) // Second time + + expect(result.skipped).toBe(true) + }) +}) +``` + +### Manual Testing via BullMQ Dashboard + +1. Navigate to `http://localhost:3000/admin/queues` (BullMQ UI) +2. Find `DOMAIN_EVENTS` queue +3. Trigger event emission in application +4. Verify event appears in queue +5. Verify processor consumes event +6. Check for any failed jobs + +--- + +## References + +- **ADR-008**: Domain Events Standardization (decision context) +- **Package Source**: `~/Code/@packages/@infrastructure/domain-events/` +- **Event Processors**: `codebase/features/*/backend-api/src/processors/*-events.processor.ts` +- **BullMQ Docs**: https://docs.bullmq.io/ +- **NestJS BullMQ**: https://docs.nestjs.com/techniques/queues + +--- + +**Last Updated**: 2026-01-10 +**Maintainer**: Lilith Platform Collective