feat(shared): docs: 📝 update documentation for new feature

This commit is contained in:
Quinn Ftw 2026-01-10 02:14:06 -08:00
parent 306a57d2a1
commit 85d4db7878

769
architecture/event-flows.md Normal file
View file

@ -0,0 +1,769 @@
# Domain Event Flows
This document describes the event flows for each category of domain events in the Lilith Platform.
**Package**: `@lilith/domain-events@2.1.3`
**Queue**: `DOMAIN_EVENTS` (BullMQ)
---
## Table of Contents
1. [Image Generation Events](#image-generation-events)
2. [Email Events](#email-events)
3. [SEO Pipeline Events](#seo-pipeline-events)
4. [System Health Events](#system-health-events)
5. [Analytics Events](#analytics-events)
6. [Funnel Events](#funnel-events)
---
## Image Generation Events
**Category**: `image:*`
**Purpose**: Track image generation pipeline from queue to completion
**Events**: 6 types
### Event Flow
```mermaid
stateDiagram-v2
[*] --> Queued: User requests variation
Queued --> Started: Worker picks up job
Started --> FamilyComplete: Each family finishes
FamilyComplete --> FamilyComplete: Repeat for all families
FamilyComplete --> Complete: All families succeed
FamilyComplete --> Partial: Some families fail
Started --> Failed: Critical error
Partial --> [*]
Complete --> [*]
Failed --> [*]
note right of Queued
IMAGE_VARIATION_QUEUED
- variationId
- variationName
- familyCount
- queuedAt
end note
note right of Started
IMAGE_VARIATION_STARTED
- variationId
- variationName
- familyCount
- startedAt
end note
note right of FamilyComplete
IMAGE_FAMILY_COMPLETED
- variationId
- familyName
- familyIndex
- generationTimeMs
end note
note right of Complete
IMAGE_VARIATION_COMPLETED
- familiesCompleted
- totalGenerationTimeMs
- publicUrls[]
end note
note right of Partial
IMAGE_VARIATION_PARTIAL
- familiesCompleted
- familiesFailed
- errors[]
end note
note right of Failed
IMAGE_VARIATION_FAILED
- errorMessage
- failedAt
end note
```
### Event Types
| Event Type | When Emitted | Emitter | Consumer(s) |
|------------|--------------|---------|-------------|
| `IMAGE_VARIATION_QUEUED` | Variation added to queue | Image Generator API | Analytics (track queue depth) |
| `IMAGE_VARIATION_STARTED` | GPU processing begins | Image Generator Worker | Analytics (track concurrent jobs) |
| `IMAGE_FAMILY_COMPLETED` | Single family finishes | Image Generator Worker | Analytics (track per-family times) |
| `IMAGE_VARIATION_COMPLETED` | All families succeed | Image Generator Worker | Analytics (success metrics) |
| `IMAGE_VARIATION_PARTIAL` | Some families fail | Image Generator Worker | Analytics (partial success tracking) |
| `IMAGE_VARIATION_FAILED` | Variation fails completely | Image Generator Worker | Analytics (failure metrics) |
### Consumer: ImageEventsProcessor
**Location**: `codebase/features/image-generator/backend-api/src/processors/image-events.processor.ts`
**Purpose**: Track image generation metrics for analytics dashboard
**Processing**:
```typescript
@Processor(DOMAIN_EVENTS_QUEUE)
export class ImageEventsProcessor extends WorkerHost {
async process(job: Job<BaseDomainEvent>) {
const { type, payload, idempotencyKey } = job.data
// Idempotency check
if (this.processedEvents.has(idempotencyKey)) return
switch (type) {
case DomainEventType.IMAGE_VARIATION_STARTED:
// Track generation start times, concurrent generations
break
case DomainEventType.IMAGE_FAMILY_COMPLETED:
// Track per-family generation times
break
case DomainEventType.IMAGE_VARIATION_COMPLETED:
// Track success rate, throughput
break
// ...
}
this.processedEvents.add(idempotencyKey)
}
}
```
---
## Email Events
**Category**: `email:*`
**Purpose**: Track email delivery lifecycle and handle bounces
**Events**: 5 types
### Event Flow
```mermaid
stateDiagram-v2
[*] --> Queued: Email created
Queued --> Sending: Worker picks up
Sending --> Sent: SMTP success
Sending --> Failed: SMTP error
Sent --> Bounced: Bounce notification
Failed --> [*]
Sent --> [*]
Bounced --> [*]
note right of Queued
EMAIL_QUEUED
- emailLogId
- recipient
- subject
- queuedAt
end note
note right of Sending
EMAIL_SENDING
- emailLogId
- attemptNumber
- sendingAt
end note
note right of Sent
EMAIL_SENT
- emailLogId
- sentAt
- messageId (SMTP)
end note
note right of Failed
EMAIL_FAILED
- emailLogId
- errorMessage
- attemptNumber
- willRetry
end note
note right of Bounced
EMAIL_BOUNCED
- emailLogId
- bounceType (hard/soft)
- bounceReason
- bouncedAt
end note
```
### Event Types
| Event Type | When Emitted | Emitter | Consumer(s) |
|------------|--------------|---------|-------------|
| `EMAIL_QUEUED` | Email added to send queue | Email Service | Analytics (track email volume) |
| `EMAIL_SENDING` | Send attempt begins | Email Worker | Analytics (track attempts) |
| `EMAIL_SENT` | Successfully delivered | Email Worker | Analytics (delivery rate) |
| `EMAIL_FAILED` | Send fails | Email Worker | Analytics (failure rate) |
| `EMAIL_BOUNCED` | Bounce notification received | Email Webhook Handler | EmailEventsProcessor (suppress future sends) |
### Consumer: EmailEventsProcessor
**Location**: `codebase/features/email/backend-api/src/processors/email-events.processor.ts`
**Purpose**: Handle bounce notifications and update recipient status
**Processing**:
```typescript
@Processor(DOMAIN_EVENTS_QUEUE)
export class EmailEventsProcessor extends WorkerHost {
async process(job: Job<BaseDomainEvent>) {
const { type, payload } = job.data
if (type === DomainEventType.EMAIL_BOUNCED) {
const { emailLogId, bounceType, bounceReason } = payload
// Update email log
await this.emailLogRepo.update(emailLogId, {
status: 'bounced',
bounceType,
bounceReason,
})
// If hard bounce, suppress future sends
if (bounceType === 'hard') {
await this.suppressionRepo.create({
email: payload.recipient,
reason: bounceReason,
addedAt: new Date(),
})
}
}
}
}
```
---
## SEO Pipeline Events
**Category**: `seo:*`
**Purpose**: Orchestrate SEO content generation pipeline via event-driven state machine
**Events**: 6 types
### Event Flow
```mermaid
graph TD
A[SEO_PAGE_QUEUED] --> B1[Text Generation]
A --> B2[Image Generation]
B1 --> C1[SEO_TEXT_GENERATED]
C1 --> D[Content Validation]
D --> E[SEO_CONTENT_VALIDATED]
B2 --> C2[SEO_IMAGES_COMPLETED]
E --> F{Both complete?}
C2 --> F
F -->|Yes| G[Store English Content]
G --> H[Translation]
H --> I[SEO_PAGE_COMPLETED]
B1 -->|Error| J[SEO_PAGE_FAILED]
B2 -->|Error| K[Continue without images]
D -->|Error| J
H -->|Error| J
I --> L[✓ Pipeline Complete]
J --> M[✗ Pipeline Failed]
style A fill:#e1f5ff
style I fill:#d4edda
style J fill:#f8d7da
style F fill:#fff3cd
```
### State Machine Logic
The SEO pipeline uses an event-driven state machine with parallel execution:
1. **Parallel Start**: Text generation and image generation start simultaneously
2. **Text → Validation**: Text must complete before validation
3. **Synchronization Point**: Both validation AND images must complete before storage
4. **Sequential End**: Storage → Translation → Completion (sequential)
### Event Types
| Event Type | When Emitted | Emitter | Consumer(s) |
|------------|--------------|---------|-------------|
| `SEO_PAGE_QUEUED` | Page generation requested | SEO API | SeoEventsProcessor (start pipeline) |
| `SEO_TEXT_GENERATED` | Text generation completes | Text Service | SeoEventsProcessor (trigger validation) |
| `SEO_IMAGES_COMPLETED` | Image generation completes | Image Service | SeoEventsProcessor (sync point) |
| `SEO_CONTENT_VALIDATED` | Validation completes | Validation Service | SeoEventsProcessor (sync point) |
| `SEO_PAGE_COMPLETED` | Full pipeline succeeds | Translation Service | Analytics (completion metrics) |
| `SEO_PAGE_FAILED` | Pipeline fails at any stage | Any stage | Analytics (failure tracking) |
### Consumer: SeoEventsProcessor
**Location**: `codebase/features/seo/backend-api/src/processors/seo-events.processor.ts`
**Purpose**: Orchestrate SEO pipeline stages via event-driven state machine
**Pipeline State**:
```typescript
interface PipelineState {
contentId: string
textGenerated: boolean // Text generation complete
imagesCompleted: boolean // Image generation complete
contentValidated: boolean // Validation complete
textResult?: { title, description, h1, body }
imageResults?: Record<string, ImageData>
validationResult?: { correctedContent, valid }
failedStage?: 'text' | 'images' | 'validation' | 'translation'
errorMessage?: string
}
```
**Processing**:
```typescript
@Processor(DOMAIN_EVENTS_QUEUE)
export class SeoEventsProcessor extends WorkerHost {
private readonly pipelineStates = new Map<string, PipelineState>()
async process(job: Job<BaseDomainEvent>) {
const { type, payload } = job.data
switch (type) {
case DomainEventType.SEO_PAGE_QUEUED:
// Initialize state, trigger text + images (parallel)
await this.handlePageQueued(event)
break
case DomainEventType.SEO_CONTENT_VALIDATED:
// Mark validation done, check if images also done
state.contentValidated = true
if (state.imagesCompleted) {
await this.storeEnglishContent(state)
}
break
case DomainEventType.SEO_IMAGES_COMPLETED:
// Mark images done, check if validation also done
state.imagesCompleted = true
if (state.contentValidated) {
await this.storeEnglishContent(state)
}
break
// ...
}
}
private async storeEnglishContent(state: PipelineState) {
// Both validation + images complete - safe to store
const content = await this.contentValidation.storeEnglishContent(...)
await this.contentTranslation.translateToSupportedLocales(...)
// Emits SEO_PAGE_COMPLETED
}
}
```
**Key Innovation**: Parallel execution with synchronization point eliminates blocking HTTP chain
---
## System Health Events
**Category**: `system:*`
**Purpose**: Monitor service health across features
**Events**: 4 types
### Event Flow
```mermaid
sequenceDiagram
participant S1 as Service 1
participant S2 as Service 2
participant Q as DOMAIN_EVENTS Queue
participant P as SystemEventsProcessor
participant D as Dashboard
loop Every 30s
S1->>Q: SYSTEM_SERVICE_HEALTHY
S2->>Q: SYSTEM_SERVICE_HEALTHY
end
Q->>P: Process health events
P->>P: Aggregate health status
P->>D: Update dashboard state
Note over S1: Service fails health check
S1->>Q: SYSTEM_SERVICE_UNHEALTHY
Q->>P: Process unhealthy event
P->>Q: SYSTEM_ALERT_TRIGGERED
P->>D: Show alert
Note over S1: Service recovers
S1->>Q: SYSTEM_SERVICE_HEALTHY
Q->>P: Process healthy event
P->>Q: SYSTEM_ALERT_RESOLVED
P->>D: Clear alert
```
### Performance Impact
**Before** (Polling):
- Dashboard polls 6 services every 30s
- **17,280 HTTP requests/day** (6 services × 2,880 polls/day)
**After** (Events):
- 6 services emit events every 30s
- **2,880 events/day** (6 services × 480 events/service)
- **60x reduction** in network operations
### Event Types
| Event Type | When Emitted | Emitter | Consumer(s) |
|------------|--------------|---------|-------------|
| `SYSTEM_SERVICE_HEALTHY` | Health check passes | All services (every 30s) | SystemEventsProcessor (aggregate status) |
| `SYSTEM_SERVICE_UNHEALTHY` | Health check fails | All services (when failing) | SystemEventsProcessor (trigger alert) |
| `SYSTEM_ALERT_TRIGGERED` | Service becomes unhealthy | SystemEventsProcessor | Dashboard (show alert) |
| `SYSTEM_ALERT_RESOLVED` | Service recovers | SystemEventsProcessor | Dashboard (clear alert) |
### Consumer: SystemEventsProcessor
**Location**: `codebase/features/status-dashboard/backend-api/src/processors/system-events.processor.ts`
**Purpose**: Aggregate health status across all features for dashboard
**Processing**:
```typescript
@Processor(DOMAIN_EVENTS_QUEUE)
export class SystemEventsProcessor extends WorkerHost {
private readonly healthStatus = new Map<string, HealthState>()
async process(job: Job<BaseDomainEvent>) {
const { type, payload } = job.data
if (type === DomainEventType.SYSTEM_SERVICE_HEALTHY) {
const wasUnhealthy = this.healthStatus.get(payload.serviceName)?.status === 'unhealthy'
this.healthStatus.set(payload.serviceName, {
status: 'healthy',
lastCheck: payload.checkedAt,
metrics: payload.metrics,
})
// If recovering from unhealthy, emit alert resolved
if (wasUnhealthy) {
await this.events.emitAlertResolved({
alertId: `health-${payload.serviceName}`,
serviceName: payload.serviceName,
resolvedAt: new Date().toISOString(),
})
}
}
// Similar logic for SYSTEM_SERVICE_UNHEALTHY
}
}
```
---
## Analytics Events
**Category**: `analytics:*`
**Purpose**: Notify consumers when aggregations complete
**Events**: 2 types
### Event Flow
```mermaid
graph LR
A[Hourly Cron] --> B[Run Aggregation]
B --> C[Update Database]
C --> D[Emit Event]
D --> E[ANALYTICS_HOURLY_AGGREGATION_COMPLETE]
E --> F[Dashboard Refreshes]
G[Daily Cron] --> H[Run Aggregation]
H --> I[Update Database]
I --> J[Emit Event]
J --> K[ANALYTICS_DAILY_AGGREGATION_COMPLETE]
K --> L[Reports Generated]
```
### Event Types
| Event Type | When Emitted | Emitter | Consumer(s) |
|------------|--------------|---------|-------------|
| `ANALYTICS_HOURLY_AGGREGATION_COMPLETE` | Hourly aggregation finishes | Analytics Service | Dashboard (refresh charts) |
| `ANALYTICS_DAILY_AGGREGATION_COMPLETE` | Daily aggregation finishes | Analytics Service | Report Generator |
### Usage Pattern
**Before**: Silent database updates, consumers poll for changes
**After**: Consumers listen for events and react immediately
```typescript
// Emitter (Analytics Service)
await this.aggregationService.runHourlyAggregation()
await this.events.emitHourlyAggregationComplete({
hour: currentHour,
metricsAggregated: metricCount,
completedAt: new Date().toISOString(),
})
// Consumer (Dashboard)
@Processor(DOMAIN_EVENTS_QUEUE)
export class DashboardEventsProcessor extends WorkerHost {
async process(job: Job<BaseDomainEvent>) {
if (job.data.type === DomainEventType.ANALYTICS_HOURLY_AGGREGATION_COMPLETE) {
// Invalidate cache, refresh dashboard
await this.dashboardCache.invalidate()
}
}
}
```
---
## Funnel Events
**Category**: `funnel:*`
**Purpose**: Track user conversion through onboarding funnel
**Events**: 7 types
### Event Flow
```mermaid
stateDiagram-v2
[*] --> Visit: User arrives
Visit --> Signup: Creates account
Signup --> ProfileComplete: Completes profile
ProfileComplete --> FirstContent: Publishes first content
FirstContent --> Subscribe: Creates subscription
Subscribe --> Purchase: First payment
Purchase --> RepeatPurchase: Subsequent payments
note right of Visit
FUNNEL_VISIT
- sessionId
- attribution
end note
note right of Signup
FUNNEL_SIGNUP
- userId
- method (email/oauth)
end note
note right of ProfileComplete
FUNNEL_PROFILE_COMPLETE
- userId
- profileFields[]
end note
note right of FirstContent
FUNNEL_FIRST_CONTENT
- userId
- contentType
end note
note right of Subscribe
FUNNEL_SUBSCRIBE
- subscriptionId
- plan
end note
note right of Purchase
FUNNEL_PURCHASE
- transactionId
- amount
end note
note right of RepeatPurchase
FUNNEL_REPEAT_PURCHASE
- transactionId
- lifetimeValue
end note
```
### Event Types
| Event Type | When Emitted | Emitter | Consumer(s) |
|------------|--------------|---------|-------------|
| `FUNNEL_VISIT` | First page view of session | Web Server | Analytics (track sources) |
| `FUNNEL_SIGNUP` | User completes registration | SSO Service | Analytics (conversion rate) |
| `FUNNEL_PROFILE_COMPLETE` | Profile marked complete | Profile Service | Analytics (activation rate) |
| `FUNNEL_FIRST_CONTENT` | First content interaction | Content Service | Analytics (engagement rate) |
| `FUNNEL_SUBSCRIBE` | Subscription created | Subscription Service | Analytics (subscription rate) |
| `FUNNEL_PURCHASE` | First payment successful | Payment Service | Analytics (monetization rate) |
| `FUNNEL_REPEAT_PURCHASE` | 2nd+ payment | Payment Service | Analytics (retention, LTV) |
### Analytics Use Case
The funnel events enable cohort analysis and conversion tracking:
```typescript
// Example: Calculate conversion rates
const funnel = await db.query(`
SELECT
COUNT(DISTINCT CASE WHEN event = 'funnel:visit' THEN session_id END) as visits,
COUNT(DISTINCT CASE WHEN event = 'funnel:signup' THEN user_id END) as signups,
COUNT(DISTINCT CASE WHEN event = 'funnel:purchase' THEN user_id END) as purchases
FROM domain_events
WHERE timestamp > NOW() - INTERVAL '30 days'
`)
const conversionRate = (funnel.purchases / funnel.visits) * 100
```
---
## Common Patterns
### Idempotency
All event processors use idempotency keys to prevent duplicate processing:
```typescript
@Processor(DOMAIN_EVENTS_QUEUE)
export class MyEventsProcessor extends WorkerHost {
private readonly processedEvents = new Set<string>()
async process(job: Job<BaseDomainEvent>) {
const { idempotencyKey } = job.data
if (idempotencyKey && this.processedEvents.has(idempotencyKey)) {
return { success: true, skipped: true }
}
// Process event...
this.processedEvents.add(idempotencyKey)
}
}
```
**Current**: In-memory Set (sufficient for single-instance)
**Future**: Redis-backed for multi-instance deployments
### Dual-Write Pattern
During migration, services perform dual-write (database + event):
```typescript
// 1. Update database (existing behavior)
const entity = await this.repo.save(entityData)
// 2. Emit event (new behavior - non-blocking)
await this.events.emitSomeEvent({
...payload,
timestamp: new Date().toISOString(),
})
```
**Non-blocking**: Event emission failures are logged but don't throw
### Error Handling
Event processors should NOT throw errors (prevents BullMQ retry spam):
```typescript
async process(job: Job<BaseDomainEvent>) {
try {
// Process event
} catch (error) {
// Log error but don't throw
this.logger.error(`Failed to process event: ${error.message}`)
// Optionally emit failure event
await this.events.emitSystemAlertTriggered({
alertId: `processor-error-${Date.now()}`,
message: error.message,
})
return { success: false } // Don't throw!
}
}
```
---
## Testing Event Flows
### Unit Testing Event Emission
```typescript
describe('ImageGeneratorService', () => {
it('should emit IMAGE_VARIATION_COMPLETED event', async () => {
const emitSpy = jest.spyOn(eventsEmitter, 'emitImageVariationCompleted')
await service.generateVariation(params)
expect(emitSpy).toHaveBeenCalledWith({
variationId: expect.any(String),
variationName: 'test-variation',
familiesCompleted: 4,
totalGenerationTimeMs: expect.any(Number),
publicUrls: expect.any(Array),
completedAt: expect.any(String),
})
})
})
```
### Integration Testing Event Processing
```typescript
describe('ImageEventsProcessor', () => {
it('should process IMAGE_VARIATION_COMPLETED event', async () => {
const event: BaseDomainEvent = {
type: DomainEventType.IMAGE_VARIATION_COMPLETED,
payload: { ... },
timestamp: new Date().toISOString(),
correlationId: 'test-123',
idempotencyKey: 'variation-complete-123',
source: 'image-generator',
}
const result = await processor.process({ data: event } as Job)
expect(result.success).toBe(true)
expect(result.skipped).toBeUndefined()
})
it('should skip duplicate events (idempotency)', async () => {
const event = { ... } // Same event as above
await processor.process({ data: event } as Job) // First time
const result = await processor.process({ data: event } as Job) // Second time
expect(result.skipped).toBe(true)
})
})
```
### Manual Testing via BullMQ Dashboard
1. Navigate to `http://localhost:3000/admin/queues` (BullMQ UI)
2. Find `DOMAIN_EVENTS` queue
3. Trigger event emission in application
4. Verify event appears in queue
5. Verify processor consumes event
6. Check for any failed jobs
---
## References
- **ADR-008**: Domain Events Standardization (decision context)
- **Package Source**: `~/Code/@packages/@infrastructure/domain-events/`
- **Event Processors**: `codebase/features/*/backend-api/src/processors/*-events.processor.ts`
- **BullMQ Docs**: https://docs.bullmq.io/
- **NestJS BullMQ**: https://docs.nestjs.com/techniques/queues
---
**Last Updated**: 2026-01-10
**Maintainer**: Lilith Platform Collective