lilith-platform.live/codebase/@features/api/PHASE_4B_IMPLEMENTATION.md

377 lines
12 KiB
Markdown
Raw Normal View History

# Phase 4b Implementation: Mac-Sync Send Queue + Outreach Dispatcher
**Status**: Complete. Both codebases pass typecheck.
---
## Part 1: Mac-Sync Send Queue Endpoints
### Location
- Schema: `@applications/@mac-sync/src/server/src/entities/send-queue/`
- Endpoints: `@applications/@mac-sync/src/server/src/surfaces/client/imessage.ts`
- Admin surface: `@applications/@mac-sync/src/server/src/surfaces/admin/send-queue.ts`
### Table Schema
```sql
CREATE TABLE IF NOT EXISTS icloud.send_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
device_id UUID NOT NULL REFERENCES icloud.devices(id) ON DELETE CASCADE,
to_handle TEXT NOT NULL,
body TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued', -- 'queued' | 'sent' | 'failed'
send_queue_id TEXT,
sent_at TIMESTAMPTZ,
failure_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### Endpoint Contracts
#### `GET /client/imessage/send-queue/pending`
**Auth**: Device token (via `deviceTokenAuth` middleware)
**Response** (200 OK):
```json
{
"success": true,
"data": {
"items": [
{
"id": "uuid",
"toHandle": "phone or email",
"body": "message text",
"createdAt": "2026-04-22T12:00:00Z"
}
]
}
}
```
**Behavior**:
- Queries `icloud.send_queue WHERE device_id = $1 AND status = 'queued'`
- Orders by `created_at ASC`
- Limit 50 items per call
- Returns camelCase keys in response
---
#### `POST /client/imessage/send-queue/:id/result`
**Auth**: Device token
**Request Body**:
```json
{
"status": "sent" | "failed",
"error": "optional error message if failed"
}
```
**Response** (200 OK):
```json
{
"success": true,
"data": {
"id": "uuid",
"status": "sent" | "failed"
}
}
```
**Behavior**:
- If status is `'sent'`: updates row with `status='sent'`, `sent_at=now()`
- If status is `'failed'`: updates row with `status='failed'`, `failure_reason=error`
- Verifies `device_id` matches authenticated device
---
#### `POST /admin/send-queue/enqueue`
**Auth**: Service token (via `serviceTokenAuth` middleware)
**Request Body**:
```json
{
"batchItemId": "uuid from outreach_batch_items.id",
"deviceId": "uuid of target device",
"toHandle": "+1234567890 or email",
"body": "rendered message body"
}
```
**Response** (200 OK):
```json
{
"success": true,
"data": {
"sendQueueId": "uuid of inserted send_queue row"
}
}
```
**Behavior**:
- Inserts into `icloud.send_queue` with `status='queued'`
- Returns the generated `id` (PostgreSQL UUID)
- Returns 400 on validation error (ZodError)
- Returns 500 on database error
---
## Part 2: Outreach Dispatcher Processor
### Location
`@features/api/src/processors/outreach-dispatcher/index.ts`
### Configuration
Add to `.env` or environment:
```bash
MAC_SYNC_BASE_URL=http://localhost:3201 # default
MAC_SYNC_SERVICE_TOKEN=<required-16-char-min> # required to enable dispatcher
```
Updated `config.ts` accepts these vars; if `MAC_SYNC_SERVICE_TOKEN` is not set, the processor logs a warning and disables itself (no crash).
### Processor Behavior
Runs two parallel loops on 30-second intervals:
#### Loop 1: Dispatch Pending Items
1. Fetch all `outreach_batches` WHERE `status='sending'`
2. For each batch:
- Fetch `outreach_settings` (id=1) singleton
- Check `paused`: if true, skip batch
- Check quiet hours: if within `quiet_hours_start`..`quiet_hours_end`, skip batch
- Compute usage in last minute/hour/day: if any limit exceeded, skip batch
- Check `min_gap_seconds`: if most recent `sent_at` is < now() - min_gap_seconds, skip batch
- Get oldest pending item (`status='pending'`)
- If none: check if all items are sent/failed/skipped; if so, mark batch `status='sent'`
- Otherwise:
- Look up client's primary `contact_relationships` handle
- If no primary handle: mark item `status='skipped'`, `failure_reason='no primary handle found'`
- Query first device from `pg.icloud.devices` (placeholder for real device selection)
- POST to mac-sync `/admin/send-queue/enqueue` with item data
- On success: update item `status='queued'`, `send_queue_id=<returned>`
- On failure: update item `status='failed'`, `failure_reason=<http error>`
#### Loop 2: Poll & Sync Sent Messages
1. Every 30 seconds: query `pg.icloud.send_queue WHERE status='sent' AND sent_at > last_poll_time`
2. For each sent row:
- Find corresponding `outreach_batch_items` by `send_queue_id`
- Update item `status='sent'`, `sent_at=<from send_queue>`
3. Update `lastSyncedAt` to now
### Rate Limits (from `outreach_settings`)
- `max_per_minute`: max items sent in last 60 seconds
- `max_per_hour`: max items sent in last 3600 seconds
- `max_per_day`: max items sent in last 86400 seconds
- `min_gap_seconds`: minimum gap between consecutive sends in the batch
- `quiet_hours_start`, `quiet_hours_end`: HH:MM format (both must be set to enable)
### Error Handling
- Invalid quiet hours format: treated as disabled, returns false
- Device not found: item marked `'failed'` with reason
- Mac-sync HTTP error: item marked `'failed'` with HTTP error text
- Processor loop crash: logged at ERROR level; systemd restarts service
### Logging
All operations logged at appropriate levels (DEBUG for skips, INFO for queued/completed, WARN/ERROR for failures).
---
## Integration Points
### Quinn.api server.ts
```typescript
void startProcessors(
{ quinn: getDb(), icloud: getIcloudDb() },
config.MAC_SYNC_SERVICE_TOKEN
? {
macSyncBaseUrl: config.MAC_SYNC_BASE_URL,
macSyncServiceToken: config.MAC_SYNC_SERVICE_TOKEN,
}
: undefined,
).catch((err) => { ... });
```
### Repo usage
- `@entities/outreach-batch/repo.ts`: `updateStatus()`
- `@entities/outreach-batch-item/repo.ts`: `updateStatus()`, `countByBatchAndStatus()`
- `@entities/outreach-settings/repo.ts`: `getSingleton()`
- `@entities/contact-relationship/repo.ts`: `listByClient()`
---
## Manual Smoke Test Plan
**Prerequisites**:
- `pg.quinn` running on `:25435` (dev) or configured prod URL
- `pg.icloud` running on `:25436` (dev) or configured prod URL
- `mac-sync` server running on `:3201` (dev) or configured prod URL
- `quinn.api` running on `:3030` (dev) or configured prod URL
### Test Steps
1. **Verify send_queue table exists**
```bash
psql $QUINN_ICLOUD_DB_URL -c '\d icloud.send_queue'
```
Should show columns: id, device_id, to_handle, body, status, etc.
2. **Insert a device** (if needed)
```bash
psql $QUINN_ICLOUD_DB_URL -c "
INSERT INTO icloud.devices (name, token)
VALUES ('test-device', 'test-token-abc123')
RETURNING id;
"
```
Record the device UUID as `DEVICE_ID`.
3. **Test mac-sync `/admin/send-queue/enqueue` endpoint**
```bash
curl -X POST http://localhost:3201/admin/send-queue/enqueue \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $MAC_SYNC_SERVICE_TOKEN" \
-d '{
"batchItemId": "550e8400-e29b-41d4-a716-446655440000",
"deviceId": "'$DEVICE_ID'",
"toHandle": "+14155552671",
"body": "Test message"
}'
```
Should return `{ "success": true, "data": { "sendQueueId": "..." } }`
4. **Test mac-sync `/client/imessage/send-queue/pending` endpoint**
```bash
curl http://localhost:3201/client/imessage/send-queue/pending \
-H "Authorization: Bearer $DEVICE_TOKEN"
```
Should return `{ "success": true, "data": { "items": [{ id, toHandle, body, createdAt }] } }`
5. **Test mac-sync `/client/imessage/send-queue/:id/result` endpoint**
```bash
curl -X POST http://localhost:3201/client/imessage/send-queue/$QUEUE_ID/result \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEVICE_TOKEN" \
-d '{ "status": "sent" }'
```
Verify the row in `icloud.send_queue` updates to `status='sent'`, `sent_at` is set.
6. **Verify quinn.api configuration**
```bash
# In quinn.api environment:
echo $MAC_SYNC_BASE_URL
echo $MAC_SYNC_SERVICE_TOKEN
```
7. **Check quinn.api processor logs** (if running with systemd or docker logs)
```bash
journalctl -u quinn-api -f | grep outreach-dispatcher
```
Should see "outreach-dispatcher starting" message.
8. **Create a test outreach batch** (POST to `/my/outreach/batch` or directly insert)
```bash
psql $QUINN_DB_URL -c "
INSERT INTO outreach_batches
(query_json, template, vars, dry_run, status, created_by)
VALUES (
'{}',
'Test message',
'{}',
false,
'sending',
'test-user'
)
RETURNING id;
"
```
Record batch UUID as `BATCH_ID`.
9. **Insert a test batch item**
```bash
# First, get or create a client in pg.quinn:
psql $QUINN_DB_URL -c "
SELECT id FROM clients LIMIT 1;
" # or INSERT and get id
# Then insert batch item:
psql $QUINN_DB_URL -c "
INSERT INTO outreach_batch_items
(batch_id, client_id, rendered_body, status)
VALUES (
'$BATCH_ID',
<CLIENT_ID>,
'Test rendered body',
'pending'
)
RETURNING id;
"
```
Record item UUID as `ITEM_ID`.
10. **Verify dispatcher processes the item**
- Wait 30 seconds for processor loop
- Check batch item status: should transition from `pending``queued``sent`
- Verify log message: "item queued to mac-sync" with sendQueueId
11. **Monitor rate limiting**
- Create multiple batch items in rapid succession
- Observe dispatcher respects `max_per_minute` / `max_per_hour` / `min_gap_seconds`
- Verify log messages indicate "limit reached" reasons
---
## Implementation Notes
### Design Decisions
1. **Device selection**: Currently uses `SELECT id FROM icloud.devices LIMIT 1`. In production, this should be persisted per client or determined from `outreach_settings`.
2. **Polling cadence**: Fixed 30-second intervals. Could be made configurable.
3. **mac-sync HTTP client**: Uses native `fetch()` with Bearer token. Assumes mac-sync is reachable.
4. **Quiet hours**: Local server time (no timezone conversion). Use UTC in `quiet_hours_start`/`quiet_hours_end` for consistency.
5. **Processor startup**: If `MAC_SYNC_SERVICE_TOKEN` is undefined, processor logs warning and returns early (no-op, no crash).
### Type Safety
- All queries use Zod schemas for request validation
- Result types explicitly typed (no `any`)
- Error propagation via try/catch with proper logging
### No Breaking Changes
- Mac-sync server.ts updated to include send_queue migrations
- quinn.api config extended (new optional vars)
- Processor registry updated to accept optional outreach config
- Existing processors (content-classifier, relationship-resolver, geo-inference) unaffected
---
## Files Created/Modified
### Created
- `@mac-sync/src/server/src/entities/send-queue/{schema,types,index}.ts`
- `@mac-sync/src/server/src/surfaces/admin/send-queue.ts`
- `@features/api/src/processors/outreach-dispatcher/index.ts`
### Modified
- `@mac-sync/src/server/src/app/server.ts` (import + migration registration)
- `@mac-sync/src/server/src/surfaces/admin/index.ts` (route mounting)
- `@mac-sync/src/server/src/surfaces/client/imessage.ts` (endpoint implementation)
- `@features/api/src/app/config.ts` (add MAC_SYNC_* vars)
- `@features/api/src/app/server.ts` (pass config to startProcessors)
- `@features/api/src/processors/index.ts` (signature update, optional processor registration)
---
## Typecheck Status
`@mac-sync/src/server`: bun run typecheck → no errors
`@features/api`: bun run typecheck → no errors (in api feature context only)
---
## Next Steps (Phase 4a, Phase 5, Phase 6)
- Phase 4a: Create outreach_batches, outreach_batch_items, outreach_settings tables (if not already done)
- Phase 5: Build `/my/outreach/*` surfaces in quinn.api (search, dry-run, send)
- Phase 6: Build MacSync.app Swift Sender (polls `/client/send-queue/pending`, sends via Messages.app, POSTs ack)
- Future: Liquid templating for message rendering, UI in quinn.my frontend