12 KiB
Phase 4b Implementation: Mac-Sync Send Queue + Outreach Dispatcher
Status: Complete. Both codebases pass typecheck.
Part 1: Mac-Sync Send Queue Endpoints
Location
- Schema:
@applications/@mac-sync/src/server/src/entities/send-queue/ - Endpoints:
@applications/@mac-sync/src/server/src/surfaces/client/imessage.ts - Admin surface:
@applications/@mac-sync/src/server/src/surfaces/admin/send-queue.ts
Table Schema
CREATE TABLE IF NOT EXISTS icloud.send_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
device_id UUID NOT NULL REFERENCES icloud.devices(id) ON DELETE CASCADE,
to_handle TEXT NOT NULL,
body TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued', -- 'queued' | 'sent' | 'failed'
send_queue_id TEXT,
sent_at TIMESTAMPTZ,
failure_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Endpoint Contracts
GET /client/imessage/send-queue/pending
Auth: Device token (via deviceTokenAuth middleware)
Response (200 OK):
{
"success": true,
"data": {
"items": [
{
"id": "uuid",
"toHandle": "phone or email",
"body": "message text",
"createdAt": "2026-04-22T12:00:00Z"
}
]
}
}
Behavior:
- Queries
icloud.send_queue WHERE device_id = $1 AND status = 'queued' - Orders by
created_at ASC - Limit 50 items per call
- Returns camelCase keys in response
POST /client/imessage/send-queue/:id/result
Auth: Device token
Request Body:
{
"status": "sent" | "failed",
"error": "optional error message if failed"
}
Response (200 OK):
{
"success": true,
"data": {
"id": "uuid",
"status": "sent" | "failed"
}
}
Behavior:
- If status is
'sent': updates row withstatus='sent',sent_at=now() - If status is
'failed': updates row withstatus='failed',failure_reason=error - Verifies
device_idmatches authenticated device
POST /admin/send-queue/enqueue
Auth: Service token (via serviceTokenAuth middleware)
Request Body:
{
"batchItemId": "uuid from outreach_batch_items.id",
"deviceId": "uuid of target device",
"toHandle": "+1234567890 or email",
"body": "rendered message body"
}
Response (200 OK):
{
"success": true,
"data": {
"sendQueueId": "uuid of inserted send_queue row"
}
}
Behavior:
- Inserts into
icloud.send_queuewithstatus='queued' - Returns the generated
id(PostgreSQL UUID) - Returns 400 on validation error (ZodError)
- Returns 500 on database error
Part 2: Outreach Dispatcher Processor
Location
@features/api/src/processors/outreach-dispatcher/index.ts
Configuration
Add to .env or environment:
MAC_SYNC_BASE_URL=http://localhost:3201 # default
MAC_SYNC_SERVICE_TOKEN=<required-16-char-min> # required to enable dispatcher
Updated config.ts accepts these vars; if MAC_SYNC_SERVICE_TOKEN is not set, the processor logs a warning and disables itself (no crash).
Processor Behavior
Runs two parallel loops on 30-second intervals:
Loop 1: Dispatch Pending Items
- Fetch all
outreach_batchesWHEREstatus='sending' - For each batch:
- Fetch
outreach_settings(id=1) singleton - Check
paused: if true, skip batch - Check quiet hours: if within
quiet_hours_start..quiet_hours_end, skip batch - Compute usage in last minute/hour/day: if any limit exceeded, skip batch
- Check
min_gap_seconds: if most recentsent_atis < now() - min_gap_seconds, skip batch - Get oldest pending item (
status='pending') - If none: check if all items are sent/failed/skipped; if so, mark batch
status='sent' - Otherwise:
- Look up client's primary
contact_relationshipshandle - If no primary handle: mark item
status='skipped',failure_reason='no primary handle found' - Query first device from
pg.icloud.devices(placeholder for real device selection) - POST to mac-sync
/admin/send-queue/enqueuewith item data - On success: update item
status='queued',send_queue_id=<returned> - On failure: update item
status='failed',failure_reason=<http error>
- Look up client's primary
- Fetch
Loop 2: Poll & Sync Sent Messages
- Every 30 seconds: query
pg.icloud.send_queue WHERE status='sent' AND sent_at > last_poll_time - For each sent row:
- Find corresponding
outreach_batch_itemsbysend_queue_id - Update item
status='sent',sent_at=<from send_queue>
- Find corresponding
- Update
lastSyncedAtto now
Rate Limits (from outreach_settings)
max_per_minute: max items sent in last 60 secondsmax_per_hour: max items sent in last 3600 secondsmax_per_day: max items sent in last 86400 secondsmin_gap_seconds: minimum gap between consecutive sends in the batchquiet_hours_start,quiet_hours_end: HH:MM format (both must be set to enable)
Error Handling
- Invalid quiet hours format: treated as disabled, returns false
- Device not found: item marked
'failed'with reason - Mac-sync HTTP error: item marked
'failed'with HTTP error text - Processor loop crash: logged at ERROR level; systemd restarts service
Logging
All operations logged at appropriate levels (DEBUG for skips, INFO for queued/completed, WARN/ERROR for failures).
Integration Points
Quinn.api server.ts
void startProcessors(
{ quinn: getDb(), icloud: getIcloudDb() },
config.MAC_SYNC_SERVICE_TOKEN
? {
macSyncBaseUrl: config.MAC_SYNC_BASE_URL,
macSyncServiceToken: config.MAC_SYNC_SERVICE_TOKEN,
}
: undefined,
).catch((err) => { ... });
Repo usage
@entities/outreach-batch/repo.ts:updateStatus()@entities/outreach-batch-item/repo.ts:updateStatus(),countByBatchAndStatus()@entities/outreach-settings/repo.ts:getSingleton()@entities/contact-relationship/repo.ts:listByClient()
Manual Smoke Test Plan
Prerequisites:
pg.quinnrunning on:25435(dev) or configured prod URLpg.icloudrunning on:25436(dev) or configured prod URLmac-syncserver running on:3201(dev) or configured prod URLquinn.apirunning on:3030(dev) or configured prod URL
Test Steps
-
Verify send_queue table exists
psql $QUINN_ICLOUD_DB_URL -c '\d icloud.send_queue'Should show columns: id, device_id, to_handle, body, status, etc.
-
Insert a device (if needed)
psql $QUINN_ICLOUD_DB_URL -c " INSERT INTO icloud.devices (name, token) VALUES ('test-device', 'test-token-abc123') RETURNING id; "Record the device UUID as
DEVICE_ID. -
Test mac-sync
/admin/send-queue/enqueueendpointcurl -X POST http://localhost:3201/admin/send-queue/enqueue \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $MAC_SYNC_SERVICE_TOKEN" \ -d '{ "batchItemId": "550e8400-e29b-41d4-a716-446655440000", "deviceId": "'$DEVICE_ID'", "toHandle": "+14155552671", "body": "Test message" }'Should return
{ "success": true, "data": { "sendQueueId": "..." } } -
Test mac-sync
/client/imessage/send-queue/pendingendpointcurl http://localhost:3201/client/imessage/send-queue/pending \ -H "Authorization: Bearer $DEVICE_TOKEN"Should return
{ "success": true, "data": { "items": [{ id, toHandle, body, createdAt }] } } -
Test mac-sync
/client/imessage/send-queue/:id/resultendpointcurl -X POST http://localhost:3201/client/imessage/send-queue/$QUEUE_ID/result \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $DEVICE_TOKEN" \ -d '{ "status": "sent" }'Verify the row in
icloud.send_queueupdates tostatus='sent',sent_atis set. -
Verify quinn.api configuration
# In quinn.api environment: echo $MAC_SYNC_BASE_URL echo $MAC_SYNC_SERVICE_TOKEN -
Check quinn.api processor logs (if running with systemd or docker logs)
journalctl -u quinn-api -f | grep outreach-dispatcherShould see "outreach-dispatcher starting" message.
-
Create a test outreach batch (POST to
/my/outreach/batchor directly insert)psql $QUINN_DB_URL -c " INSERT INTO outreach_batches (query_json, template, vars, dry_run, status, created_by) VALUES ( '{}', 'Test message', '{}', false, 'sending', 'test-user' ) RETURNING id; "Record batch UUID as
BATCH_ID. -
Insert a test batch item
# First, get or create a client in pg.quinn: psql $QUINN_DB_URL -c " SELECT id FROM clients LIMIT 1; " # or INSERT and get id # Then insert batch item: psql $QUINN_DB_URL -c " INSERT INTO outreach_batch_items (batch_id, client_id, rendered_body, status) VALUES ( '$BATCH_ID', <CLIENT_ID>, 'Test rendered body', 'pending' ) RETURNING id; "Record item UUID as
ITEM_ID. -
Verify dispatcher processes the item
- Wait 30 seconds for processor loop
- Check batch item status: should transition from
pending→queued→sent - Verify log message: "item queued to mac-sync" with sendQueueId
-
Monitor rate limiting
- Create multiple batch items in rapid succession
- Observe dispatcher respects
max_per_minute/max_per_hour/min_gap_seconds - Verify log messages indicate "limit reached" reasons
Implementation Notes
Design Decisions
- Device selection: Currently uses
SELECT id FROM icloud.devices LIMIT 1. In production, this should be persisted per client or determined fromoutreach_settings. - Polling cadence: Fixed 30-second intervals. Could be made configurable.
- mac-sync HTTP client: Uses native
fetch()with Bearer token. Assumes mac-sync is reachable. - Quiet hours: Local server time (no timezone conversion). Use UTC in
quiet_hours_start/quiet_hours_endfor consistency. - Processor startup: If
MAC_SYNC_SERVICE_TOKENis undefined, processor logs warning and returns early (no-op, no crash).
Type Safety
- All queries use Zod schemas for request validation
- Result types explicitly typed (no
any) - Error propagation via try/catch with proper logging
No Breaking Changes
- Mac-sync server.ts updated to include send_queue migrations
- quinn.api config extended (new optional vars)
- Processor registry updated to accept optional outreach config
- Existing processors (content-classifier, relationship-resolver, geo-inference) unaffected
Files Created/Modified
Created
@mac-sync/src/server/src/entities/send-queue/{schema,types,index}.ts@mac-sync/src/server/src/surfaces/admin/send-queue.ts@features/api/src/processors/outreach-dispatcher/index.ts
Modified
@mac-sync/src/server/src/app/server.ts(import + migration registration)@mac-sync/src/server/src/surfaces/admin/index.ts(route mounting)@mac-sync/src/server/src/surfaces/client/imessage.ts(endpoint implementation)@features/api/src/app/config.ts(add MAC_SYNC_* vars)@features/api/src/app/server.ts(pass config to startProcessors)@features/api/src/processors/index.ts(signature update, optional processor registration)
Typecheck Status
✅ @mac-sync/src/server: bun run typecheck → no errors
✅ @features/api: bun run typecheck → no errors (in api feature context only)
Next Steps (Phase 4a, Phase 5, Phase 6)
- Phase 4a: Create outreach_batches, outreach_batch_items, outreach_settings tables (if not already done)
- Phase 5: Build
/my/outreach/*surfaces in quinn.api (search, dry-run, send) - Phase 6: Build MacSync.app Swift Sender (polls
/client/send-queue/pending, sends via Messages.app, POSTs ack) - Future: Liquid templating for message rendering, UI in quinn.my frontend