12 KiB
Executable file
12 KiB
Executable file
Email Messaging Plugin - Integration Guide
This guide explains how to integrate the completed email messaging plugin into your NestJS application.
Overview
The plugin is now fully implemented with:
- Messages API HTTP client for creating threads and messages
- Webhook signature verification (SendGrid, Mailgun, Postmark)
- Email queue integration for outbound emails
- Event-based architecture for loose coupling
Installation
1. Install in Backend Application
// apps/backend/src/app.module.ts
import { MessagingGatewayModule } from '@lilith/email-messaging-plugin'
import { EmailQueueService } from './email/core/email-queue.service'
@Module({
imports: [
// ... other modules
// Configure with email queue service
MessagingGatewayModule.forRoot({
emailQueueService: EmailQueueService,
}),
],
})
export class AppModule {}
2. Register Database Entity
// TypeORM config
import { EmailThreadMappingEntity } from '@lilith/email-messaging-plugin'
TypeOrmModule.forRoot({
entities: [
EmailThreadMappingEntity,
// ... other entities
],
})
3. Run Migration
Create the email thread mappings table:
CREATE TABLE email_thread_mappings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
thread_id UUID NOT NULL,
email_message_id TEXT NOT NULL,
sender_email TEXT NOT NULL,
subject_normalized TEXT NOT NULL,
reply_to_token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
CREATE UNIQUE INDEX idx_reply_token ON email_thread_mappings(reply_to_token);
Configuration
Environment Variables
Required (All Modes)
# Messages API endpoint
MESSAGES_API_BASE_URL=http://localhost:3000/api/v1/messages
MESSAGES_API_KEY=your-api-key-here
# Reply-to address configuration
EMAIL_REPLY_DOMAIN=inbox.lilith.gg
EMAIL_REPLY_SECRET=<secure-random-secret-64-chars>
# Outbound email
EMAIL_OUTBOUND_ENABLED=true
SMTP_FROM=noreply@lilith.gg
SMTP_FROM_NAME=Lilith Platform
For IMAP Mode (Inbound)
EMAIL_INBOUND_MODE=imap
EMAIL_IMAP_HOST=imap.gmail.com
EMAIL_IMAP_PORT=993
EMAIL_IMAP_USER=inbox@lilith.gg
EMAIL_IMAP_PASS=<app-password>
EMAIL_IMAP_POLL_INTERVAL=60000 # Poll every 60 seconds
For Webhook Mode (Inbound)
EMAIL_INBOUND_MODE=webhook
EMAIL_WEBHOOK_SECRET=<webhook-signing-secret>
EMAIL_WEBHOOK_MAX_AGE=300000 # 5 minutes (prevent replay attacks)
Usage
Inbound Email Processing
Webhook Endpoint
Configure your email provider to POST to:
POST /gateway/inbound
Headers:
Content-Type: application/json
X-Webhook-Signature: <provider-specific-signature>
X-Webhook-Timestamp: <unix-timestamp-ms>
SendGrid Example:
{
"from": "user@example.com",
"to": "reply+TOKEN@inbox.lilith.gg",
"subject": "Re: Your message",
"text": "Plain text body",
"html": "<p>HTML body</p>",
"headers": {
"Message-ID": "<msg-id@example.com>",
"In-Reply-To": "<previous-id@lilith.gg>"
},
"sg_message_id": "sendgrid-specific-id",
"timestamp": 1703001234567,
"signature": "sha256-signature-here"
}
The webhook handler will:
- Verify the signature (HMAC-SHA256)
- Parse the email
- Match to existing thread (or create new)
- Call Messages API to create the message
- Store thread mapping for future replies
Outbound Email Sending
Event-Based (Recommended)
The plugin listens for message.created events:
import { EventEmitter2 } from '@nestjs/event-emitter'
@Injectable()
export class MessagesService {
constructor(private readonly eventEmitter: EventEmitter2) {}
async sendMessage(params: {
threadId: string
messageText: string
userId: string
}) {
// Create message in database
const message = await this.createMessage(params)
// Get thread metadata
const thread = await this.getThread(params.threadId)
// Emit event - plugin will auto-send email if sourceType is 'email'
this.eventEmitter.emit('message.created', {
threadId: params.threadId,
messageId: message.id,
messageText: params.messageText,
threadMetadata: {
sourceType: thread.sourceType,
emailFrom: thread.metadata.emailFrom,
emailSubject: thread.metadata.emailSubject,
lastEmailMessageId: thread.metadata.lastEmailMessageId,
},
senderProfile: {
displayName: 'Lilith Support',
},
})
}
}
Direct Queue Method
import { MessageListenerService } from '@lilith/email-messaging-plugin'
@Injectable()
export class MessagesService {
constructor(
private readonly messageListener: MessageListenerService
) {}
async sendEmailReply(params: {
threadId: string
body: string
recipientEmail: string
subject: string
}) {
const jobId = await this.messageListener.queueOutbound({
threadId: params.threadId,
body: params.body,
recipientEmail: params.recipientEmail,
subject: params.subject,
senderName: 'Lilith Platform',
})
return { jobId }
}
}
Architecture
Service Responsibilities
MessagesApiClient
- HTTP client for Messages API
- Creates conversation threads
- Creates messages within threads
- Updates thread metadata
WebhookVerifierService
- Verifies webhook signatures (HMAC-SHA256)
- Supports SendGrid, Mailgun, Postmark, generic
- Prevents replay attacks via timestamp checking
- Constant-time comparison to prevent timing attacks
MessageCreatorService
- Orchestrates inbound email processing
- Calls Messages API to create threads/messages
- Stores email-thread mappings
- Infers mailbox type (dmca, business, fans, general)
- Infers priority (vip, priority, standard, general)
- Emits events:
email.thread.created,email.message.created
EmailReceiverService
- IMAP polling or webhook handling
- Signature verification
- Delegates to parser and creator
MessageListenerService
- Listens for
message.createdevents - Queues outbound emails via
EmailQueueService - Only processes threads with
sourceType: 'email' - Emits events:
email.outbound.queued
EmailComposerService
- Renders HTML email templates
- Generates reply-to addresses with tokens
- Handles email threading headers (In-Reply-To, References)
Event Flow
Inbound
Webhook/IMAP → EmailReceiverService
→ WebhookVerifierService (verify signature)
→ EmailParserService (parse content)
→ MessageCreatorService
→ MessagesApiClient (create thread/message)
→ ThreadMatcherService (match existing thread)
→ ReplyAddressService (generate token)
→ Emit 'email.processed'
Outbound
Messages Feature (emit 'message.created')
→ MessageListenerService (listen for event)
→ EmailComposerService (render HTML)
→ EmailQueueService (queue for sending)
→ BullMQ Worker → EmailSenderService (SMTP)
→ Emit 'email.outbound.queued'
Security
Webhook Signature Verification
The plugin implements proper HMAC-SHA256 verification:
// SendGrid
const signedPayload = `${timestamp}.${payload}`
const signature = crypto.createHmac('sha256', secret)
.update(signedPayload)
.digest('hex')
// Constant-time comparison
crypto.timingSafeEqual(
Buffer.from(receivedSignature),
Buffer.from(expectedSignature)
)
Reply Token Security
Tokens are cryptographically signed:
Format: {threadId}:{timestamp}:{signature}
Base64url encoded
Signature: HMAC-SHA256(threadId:timestamp, secret).substring(0, 16)
Max age: 365 days
Best Practices
- Use strong secrets: 64+ random characters for
EMAIL_WEBHOOK_SECRETandEMAIL_REPLY_SECRET - Rotate secrets: Change webhook/reply secrets periodically
- Enable signature verification: Always validate signatures in production
- Use HTTPS: Webhook endpoints must use SSL/TLS
- Rate limiting: Protect
/gateway/inboundendpoint from abuse
Error Handling
All services implement proper error handling:
try {
await this.messagesApi.createThread(...)
} catch (error) {
this.logger.error('Failed to create thread:', error)
throw error // Propagate for retry logic
}
BullMQ automatically retries failed jobs:
- 3 attempts
- Exponential backoff (1s, 2s, 4s)
Monitoring
Events to Monitor
email.thread.created- New conversation startedemail.message.created- Inbound message receivedemail.outbound.queued- Outbound email queuedemail.processed- Email successfully processed
Metrics to Track
// Example with Prometheus
import { Counter, Histogram } from 'prom-client'
const emailsProcessed = new Counter({
name: 'emails_processed_total',
help: 'Total emails processed',
labelNames: ['direction', 'status'],
})
const webhookLatency = new Histogram({
name: 'webhook_processing_duration_seconds',
help: 'Webhook processing time',
})
Testing
Unit Tests
describe('MessageCreatorService', () => {
it('should create thread via Messages API', async () => {
const mockApi = {
createThread: jest.fn().mockResolvedValue({
success: true,
data: { id: 'thread-123' },
}),
}
const service = new MessageCreatorService(
mockRepository,
mockReplyAddress,
mockThreadMatcher,
mockApi as any,
mockEventEmitter
)
const result = await service.createFromEmail(mockEmail)
expect(mockApi.createThread).toHaveBeenCalledWith({
sourceType: 'email',
identityEmail: mockEmail.from,
mailbox: 'fans',
priority: 'standard',
metadata: expect.any(Object),
})
})
})
Integration Tests
describe('Email Gateway (e2e)', () => {
it('should process webhook and create message', async () => {
const response = await request(app.getHttpServer())
.post('/gateway/inbound')
.set('X-Webhook-Signature', validSignature)
.set('X-Webhook-Timestamp', Date.now().toString())
.send(validWebhookPayload)
expect(response.status).toBe(200)
expect(messagesApi.createThread).toHaveBeenCalled()
})
})
Troubleshooting
Issue: Webhook signature verification fails
Solution: Check that:
EMAIL_WEBHOOK_SECRETmatches provider's signing key- Timestamp is within
EMAIL_WEBHOOK_MAX_AGE(default 5 minutes) - Raw body is used for verification (not parsed JSON)
Issue: Messages not being created
Solution: Check:
MESSAGES_API_BASE_URLis correctMESSAGES_API_KEYis valid- Messages API is running and accessible
- Check logs for HTTP errors
Issue: Outbound emails not sending
Solution: Verify:
EMAIL_OUTBOUND_ENABLED=trueEmailQueueServiceis properly injected in module- BullMQ worker is running
- SMTP credentials are correct
Performance Considerations
Database Indexes
Critical indexes for thread matching:
CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
Caching
Consider caching frequently accessed thread mappings:
import { CACHE_MANAGER } from '@nestjs/cache-manager'
@Injectable()
export class MessageCreatorService {
constructor(
@Inject(CACHE_MANAGER) private cacheManager: Cache
) {}
async findThreadByToken(token: string) {
const cached = await this.cacheManager.get(`thread:${token}`)
if (cached) return cached
const thread = await this.mappingRepository.findOne(...)
await this.cacheManager.set(`thread:${token}`, thread, 3600)
return thread
}
}
Queue Optimization
For high volume, tune BullMQ settings:
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: 100, // Keep last 100 failures
},
limiter: {
max: 100, // 100 emails per...
duration: 1000, // 1 second
},
})
Next Steps
- Implement Messages API: Build the actual
/api/v1/messagesendpoints - Add attachment storage: Upload email attachments to S3/R2
- Implement read receipts: Track email opens via tracking pixels
- Add spam filtering: Integrate with SpamAssassin or similar
- Build admin dashboard: Monitor email processing, view failed jobs
Support
For issues or questions, see:
- Main README:
./README.md - Architecture guide:
./ARCHITECTURE.md - Example integration:
../../backend/src/app.module.ts