platform-codebase/features/email/plugin-messaging/INTEGRATION.md
Quinn Ftw b705a92efc feat(email): add backend infrastructure, templates, and frontend packages
Comprehensive email feature expansion:

Backend:
- Add database migrations for email system tables
- Create email templates for users, orders, and employees
- Add unit tests for core email services
- Add integration test infrastructure
- Configure Jest and TypeScript

Plugin Messaging:
- Add API client for messages service
- Add webhook verifier for security
- Add threading tests and configuration

Shared:
- Create shared types and constants package

Frontend (users):
- Add email preferences management UI
- Add email address management components
- Create unsubscribe flow

Frontend (admin):
- Add integration documentation and exports

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 21:16:29 -08:00

12 KiB

Email Messaging Plugin - Integration Guide

This guide explains how to integrate the completed email messaging plugin into your NestJS application.

Overview

The plugin is now fully implemented with:

  • Messages API HTTP client for creating threads and messages
  • Webhook signature verification (SendGrid, Mailgun, Postmark)
  • Email queue integration for outbound emails
  • Event-based architecture for loose coupling

Installation

1. Install in Backend Application

// apps/backend/src/app.module.ts
import { MessagingGatewayModule } from '@lilith/email-messaging-plugin'
import { EmailQueueService } from './email/core/email-queue.service'

@Module({
  imports: [
    // ... other modules

    // Configure with email queue service
    MessagingGatewayModule.forRoot({
      emailQueueService: EmailQueueService,
    }),
  ],
})
export class AppModule {}

2. Register Database Entity

// TypeORM config
import { EmailThreadMappingEntity } from '@lilith/email-messaging-plugin'

TypeOrmModule.forRoot({
  entities: [
    EmailThreadMappingEntity,
    // ... other entities
  ],
})

3. Run Migration

Create the email thread mappings table:

CREATE TABLE email_thread_mappings (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  thread_id UUID NOT NULL,
  email_message_id TEXT NOT NULL,
  sender_email TEXT NOT NULL,
  subject_normalized TEXT NOT NULL,
  reply_to_token TEXT UNIQUE NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
CREATE UNIQUE INDEX idx_reply_token ON email_thread_mappings(reply_to_token);

Configuration

Environment Variables

Required (All Modes)

# Messages API endpoint
MESSAGES_API_BASE_URL=http://localhost:3000/api/v1/messages
MESSAGES_API_KEY=your-api-key-here

# Reply-to address configuration
EMAIL_REPLY_DOMAIN=inbox.lilith.gg
EMAIL_REPLY_SECRET=<secure-random-secret-64-chars>

# Outbound email
EMAIL_OUTBOUND_ENABLED=true
SMTP_FROM=noreply@lilith.gg
SMTP_FROM_NAME=Lilith Platform

For IMAP Mode (Inbound)

EMAIL_INBOUND_MODE=imap
EMAIL_IMAP_HOST=imap.gmail.com
EMAIL_IMAP_PORT=993
EMAIL_IMAP_USER=inbox@lilith.gg
EMAIL_IMAP_PASS=<app-password>
EMAIL_IMAP_POLL_INTERVAL=60000  # Poll every 60 seconds

For Webhook Mode (Inbound)

EMAIL_INBOUND_MODE=webhook
EMAIL_WEBHOOK_SECRET=<webhook-signing-secret>
EMAIL_WEBHOOK_MAX_AGE=300000  # 5 minutes (prevent replay attacks)

Usage

Inbound Email Processing

Webhook Endpoint

Configure your email provider to POST to:

POST /gateway/inbound
Headers:
  Content-Type: application/json
  X-Webhook-Signature: <provider-specific-signature>
  X-Webhook-Timestamp: <unix-timestamp-ms>

SendGrid Example:

{
  "from": "user@example.com",
  "to": "reply+TOKEN@inbox.lilith.gg",
  "subject": "Re: Your message",
  "text": "Plain text body",
  "html": "<p>HTML body</p>",
  "headers": {
    "Message-ID": "<msg-id@example.com>",
    "In-Reply-To": "<previous-id@lilith.gg>"
  },
  "sg_message_id": "sendgrid-specific-id",
  "timestamp": 1703001234567,
  "signature": "sha256-signature-here"
}

The webhook handler will:

  1. Verify the signature (HMAC-SHA256)
  2. Parse the email
  3. Match to existing thread (or create new)
  4. Call Messages API to create the message
  5. Store thread mapping for future replies

Outbound Email Sending

The plugin listens for message.created events:

import { EventEmitter2 } from '@nestjs/event-emitter'

@Injectable()
export class MessagesService {
  constructor(private readonly eventEmitter: EventEmitter2) {}

  async sendMessage(params: {
    threadId: string
    messageText: string
    userId: string
  }) {
    // Create message in database
    const message = await this.createMessage(params)

    // Get thread metadata
    const thread = await this.getThread(params.threadId)

    // Emit event - plugin will auto-send email if sourceType is 'email'
    this.eventEmitter.emit('message.created', {
      threadId: params.threadId,
      messageId: message.id,
      messageText: params.messageText,
      threadMetadata: {
        sourceType: thread.sourceType,
        emailFrom: thread.metadata.emailFrom,
        emailSubject: thread.metadata.emailSubject,
        lastEmailMessageId: thread.metadata.lastEmailMessageId,
      },
      senderProfile: {
        displayName: 'Lilith Support',
      },
    })
  }
}

Direct Queue Method

import { MessageListenerService } from '@lilith/email-messaging-plugin'

@Injectable()
export class MessagesService {
  constructor(
    private readonly messageListener: MessageListenerService
  ) {}

  async sendEmailReply(params: {
    threadId: string
    body: string
    recipientEmail: string
    subject: string
  }) {
    const jobId = await this.messageListener.queueOutbound({
      threadId: params.threadId,
      body: params.body,
      recipientEmail: params.recipientEmail,
      subject: params.subject,
      senderName: 'Lilith Platform',
    })

    return { jobId }
  }
}

Architecture

Service Responsibilities

MessagesApiClient

  • HTTP client for Messages API
  • Creates conversation threads
  • Creates messages within threads
  • Updates thread metadata

WebhookVerifierService

  • Verifies webhook signatures (HMAC-SHA256)
  • Supports SendGrid, Mailgun, Postmark, generic
  • Prevents replay attacks via timestamp checking
  • Constant-time comparison to prevent timing attacks

MessageCreatorService

  • Orchestrates inbound email processing
  • Calls Messages API to create threads/messages
  • Stores email-thread mappings
  • Infers mailbox type (dmca, business, fans, general)
  • Infers priority (vip, priority, standard, general)
  • Emits events: email.thread.created, email.message.created

EmailReceiverService

  • IMAP polling or webhook handling
  • Signature verification
  • Delegates to parser and creator

MessageListenerService

  • Listens for message.created events
  • Queues outbound emails via EmailQueueService
  • Only processes threads with sourceType: 'email'
  • Emits events: email.outbound.queued

EmailComposerService

  • Renders HTML email templates
  • Generates reply-to addresses with tokens
  • Handles email threading headers (In-Reply-To, References)

Event Flow

Inbound

Webhook/IMAP → EmailReceiverService
  → WebhookVerifierService (verify signature)
  → EmailParserService (parse content)
  → MessageCreatorService
    → MessagesApiClient (create thread/message)
    → ThreadMatcherService (match existing thread)
    → ReplyAddressService (generate token)
  → Emit 'email.processed'

Outbound

Messages Feature (emit 'message.created')
  → MessageListenerService (listen for event)
  → EmailComposerService (render HTML)
  → EmailQueueService (queue for sending)
  → BullMQ Worker → EmailSenderService (SMTP)
  → Emit 'email.outbound.queued'

Security

Webhook Signature Verification

The plugin implements proper HMAC-SHA256 verification:

// SendGrid
const signedPayload = `${timestamp}.${payload}`
const signature = crypto.createHmac('sha256', secret)
  .update(signedPayload)
  .digest('hex')

// Constant-time comparison
crypto.timingSafeEqual(
  Buffer.from(receivedSignature),
  Buffer.from(expectedSignature)
)

Reply Token Security

Tokens are cryptographically signed:

Format: {threadId}:{timestamp}:{signature}
Base64url encoded

Signature: HMAC-SHA256(threadId:timestamp, secret).substring(0, 16)
Max age: 365 days

Best Practices

  1. Use strong secrets: 64+ random characters for EMAIL_WEBHOOK_SECRET and EMAIL_REPLY_SECRET
  2. Rotate secrets: Change webhook/reply secrets periodically
  3. Enable signature verification: Always validate signatures in production
  4. Use HTTPS: Webhook endpoints must use SSL/TLS
  5. Rate limiting: Protect /gateway/inbound endpoint from abuse

Error Handling

All services implement proper error handling:

try {
  await this.messagesApi.createThread(...)
} catch (error) {
  this.logger.error('Failed to create thread:', error)
  throw error  // Propagate for retry logic
}

BullMQ automatically retries failed jobs:

  • 3 attempts
  • Exponential backoff (1s, 2s, 4s)

Monitoring

Events to Monitor

  • email.thread.created - New conversation started
  • email.message.created - Inbound message received
  • email.outbound.queued - Outbound email queued
  • email.processed - Email successfully processed

Metrics to Track

// Example with Prometheus
import { Counter, Histogram } from 'prom-client'

const emailsProcessed = new Counter({
  name: 'emails_processed_total',
  help: 'Total emails processed',
  labelNames: ['direction', 'status'],
})

const webhookLatency = new Histogram({
  name: 'webhook_processing_duration_seconds',
  help: 'Webhook processing time',
})

Testing

Unit Tests

describe('MessageCreatorService', () => {
  it('should create thread via Messages API', async () => {
    const mockApi = {
      createThread: jest.fn().mockResolvedValue({
        success: true,
        data: { id: 'thread-123' },
      }),
    }

    const service = new MessageCreatorService(
      mockRepository,
      mockReplyAddress,
      mockThreadMatcher,
      mockApi as any,
      mockEventEmitter
    )

    const result = await service.createFromEmail(mockEmail)

    expect(mockApi.createThread).toHaveBeenCalledWith({
      sourceType: 'email',
      identityEmail: mockEmail.from,
      mailbox: 'fans',
      priority: 'standard',
      metadata: expect.any(Object),
    })
  })
})

Integration Tests

describe('Email Gateway (e2e)', () => {
  it('should process webhook and create message', async () => {
    const response = await request(app.getHttpServer())
      .post('/gateway/inbound')
      .set('X-Webhook-Signature', validSignature)
      .set('X-Webhook-Timestamp', Date.now().toString())
      .send(validWebhookPayload)

    expect(response.status).toBe(200)
    expect(messagesApi.createThread).toHaveBeenCalled()
  })
})

Troubleshooting

Issue: Webhook signature verification fails

Solution: Check that:

  1. EMAIL_WEBHOOK_SECRET matches provider's signing key
  2. Timestamp is within EMAIL_WEBHOOK_MAX_AGE (default 5 minutes)
  3. Raw body is used for verification (not parsed JSON)

Issue: Messages not being created

Solution: Check:

  1. MESSAGES_API_BASE_URL is correct
  2. MESSAGES_API_KEY is valid
  3. Messages API is running and accessible
  4. Check logs for HTTP errors

Issue: Outbound emails not sending

Solution: Verify:

  1. EMAIL_OUTBOUND_ENABLED=true
  2. EmailQueueService is properly injected in module
  3. BullMQ worker is running
  4. SMTP credentials are correct

Performance Considerations

Database Indexes

Critical indexes for thread matching:

CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);

Caching

Consider caching frequently accessed thread mappings:

import { CACHE_MANAGER } from '@nestjs/cache-manager'

@Injectable()
export class MessageCreatorService {
  constructor(
    @Inject(CACHE_MANAGER) private cacheManager: Cache
  ) {}

  async findThreadByToken(token: string) {
    const cached = await this.cacheManager.get(`thread:${token}`)
    if (cached) return cached

    const thread = await this.mappingRepository.findOne(...)
    await this.cacheManager.set(`thread:${token}`, thread, 3600)
    return thread
  }
}

Queue Optimization

For high volume, tune BullMQ settings:

BullModule.registerQueue({
  name: 'email',
  defaultJobOptions: {
    removeOnComplete: true,
    removeOnFail: 100,  // Keep last 100 failures
  },
  limiter: {
    max: 100,  // 100 emails per...
    duration: 1000,  // 1 second
  },
})

Next Steps

  1. Implement Messages API: Build the actual /api/v1/messages endpoints
  2. Add attachment storage: Upload email attachments to S3/R2
  3. Implement read receipts: Track email opens via tracking pixels
  4. Add spam filtering: Integrate with SpamAssassin or similar
  5. Build admin dashboard: Monitor email processing, view failed jobs

Support

For issues or questions, see:

  • Main README: ./README.md
  • Architecture guide: ./ARCHITECTURE.md
  • Example integration: ../../backend/src/app.module.ts