lilith-platform.live/codebase/@features/api/PHASE_4B_IMPLEMENTATION.md
2026-04-22 16:58:26 -07:00

12 KiB

Phase 4b Implementation: Mac-Sync Send Queue + Outreach Dispatcher

Status: Complete. Both codebases pass typecheck.


Part 1: Mac-Sync Send Queue Endpoints

Location

  • Schema: @applications/@mac-sync/src/server/src/entities/send-queue/
  • Endpoints: @applications/@mac-sync/src/server/src/surfaces/client/imessage.ts
  • Admin surface: @applications/@mac-sync/src/server/src/surfaces/admin/send-queue.ts

Table Schema

CREATE TABLE IF NOT EXISTS icloud.send_queue (
  id                UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  device_id         UUID NOT NULL REFERENCES icloud.devices(id) ON DELETE CASCADE,
  to_handle         TEXT NOT NULL,
  body              TEXT NOT NULL,
  status            TEXT NOT NULL DEFAULT 'queued',  -- 'queued' | 'sent' | 'failed'
  send_queue_id     TEXT,
  sent_at           TIMESTAMPTZ,
  failure_reason    TEXT,
  created_at        TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at        TIMESTAMPTZ NOT NULL DEFAULT now()
);

Endpoint Contracts

GET /client/imessage/send-queue/pending

Auth: Device token (via deviceTokenAuth middleware)

Response (200 OK):

{
  "success": true,
  "data": {
    "items": [
      {
        "id": "uuid",
        "toHandle": "phone or email",
        "body": "message text",
        "createdAt": "2026-04-22T12:00:00Z"
      }
    ]
  }
}

Behavior:

  • Queries icloud.send_queue WHERE device_id = $1 AND status = 'queued'
  • Orders by created_at ASC
  • Limit 50 items per call
  • Returns camelCase keys in response

POST /client/imessage/send-queue/:id/result

Auth: Device token

Request Body:

{
  "status": "sent" | "failed",
  "error": "optional error message if failed"
}

Response (200 OK):

{
  "success": true,
  "data": {
    "id": "uuid",
    "status": "sent" | "failed"
  }
}

Behavior:

  • If status is 'sent': updates row with status='sent', sent_at=now()
  • If status is 'failed': updates row with status='failed', failure_reason=error
  • Verifies device_id matches authenticated device

POST /admin/send-queue/enqueue

Auth: Service token (via serviceTokenAuth middleware)

Request Body:

{
  "batchItemId": "uuid from outreach_batch_items.id",
  "deviceId": "uuid of target device",
  "toHandle": "+1234567890 or email",
  "body": "rendered message body"
}

Response (200 OK):

{
  "success": true,
  "data": {
    "sendQueueId": "uuid of inserted send_queue row"
  }
}

Behavior:

  • Inserts into icloud.send_queue with status='queued'
  • Returns the generated id (PostgreSQL UUID)
  • Returns 400 on validation error (ZodError)
  • Returns 500 on database error

Part 2: Outreach Dispatcher Processor

Location

@features/api/src/processors/outreach-dispatcher/index.ts

Configuration

Add to .env or environment:

MAC_SYNC_BASE_URL=http://localhost:3201        # default
MAC_SYNC_SERVICE_TOKEN=<required-16-char-min>  # required to enable dispatcher

Updated config.ts accepts these vars; if MAC_SYNC_SERVICE_TOKEN is not set, the processor logs a warning and disables itself (no crash).

Processor Behavior

Runs two parallel loops on 30-second intervals:

Loop 1: Dispatch Pending Items

  1. Fetch all outreach_batches WHERE status='sending'
  2. For each batch:
    • Fetch outreach_settings (id=1) singleton
    • Check paused: if true, skip batch
    • Check quiet hours: if within quiet_hours_start..quiet_hours_end, skip batch
    • Compute usage in last minute/hour/day: if any limit exceeded, skip batch
    • Check min_gap_seconds: if most recent sent_at is < now() - min_gap_seconds, skip batch
    • Get oldest pending item (status='pending')
    • If none: check if all items are sent/failed/skipped; if so, mark batch status='sent'
    • Otherwise:
      • Look up client's primary contact_relationships handle
      • If no primary handle: mark item status='skipped', failure_reason='no primary handle found'
      • Query first device from pg.icloud.devices (placeholder for real device selection)
      • POST to mac-sync /admin/send-queue/enqueue with item data
      • On success: update item status='queued', send_queue_id=<returned>
      • On failure: update item status='failed', failure_reason=<http error>

Loop 2: Poll & Sync Sent Messages

  1. Every 30 seconds: query pg.icloud.send_queue WHERE status='sent' AND sent_at > last_poll_time
  2. For each sent row:
    • Find corresponding outreach_batch_items by send_queue_id
    • Update item status='sent', sent_at=<from send_queue>
  3. Update lastSyncedAt to now

Rate Limits (from outreach_settings)

  • max_per_minute: max items sent in last 60 seconds
  • max_per_hour: max items sent in last 3600 seconds
  • max_per_day: max items sent in last 86400 seconds
  • min_gap_seconds: minimum gap between consecutive sends in the batch
  • quiet_hours_start, quiet_hours_end: HH:MM format (both must be set to enable)

Error Handling

  • Invalid quiet hours format: treated as disabled, returns false
  • Device not found: item marked 'failed' with reason
  • Mac-sync HTTP error: item marked 'failed' with HTTP error text
  • Processor loop crash: logged at ERROR level; systemd restarts service

Logging

All operations logged at appropriate levels (DEBUG for skips, INFO for queued/completed, WARN/ERROR for failures).


Integration Points

Quinn.api server.ts

void startProcessors(
  { quinn: getDb(), icloud: getIcloudDb() },
  config.MAC_SYNC_SERVICE_TOKEN
    ? {
        macSyncBaseUrl: config.MAC_SYNC_BASE_URL,
        macSyncServiceToken: config.MAC_SYNC_SERVICE_TOKEN,
      }
    : undefined,
).catch((err) => { ... });

Repo usage

  • @entities/outreach-batch/repo.ts: updateStatus()
  • @entities/outreach-batch-item/repo.ts: updateStatus(), countByBatchAndStatus()
  • @entities/outreach-settings/repo.ts: getSingleton()
  • @entities/contact-relationship/repo.ts: listByClient()

Manual Smoke Test Plan

Prerequisites:

  • pg.quinn running on :25435 (dev) or configured prod URL
  • pg.icloud running on :25436 (dev) or configured prod URL
  • mac-sync server running on :3201 (dev) or configured prod URL
  • quinn.api running on :3030 (dev) or configured prod URL

Test Steps

  1. Verify send_queue table exists

    psql $QUINN_ICLOUD_DB_URL -c '\d icloud.send_queue'
    

    Should show columns: id, device_id, to_handle, body, status, etc.

  2. Insert a device (if needed)

    psql $QUINN_ICLOUD_DB_URL -c "
    INSERT INTO icloud.devices (name, token) 
    VALUES ('test-device', 'test-token-abc123')
    RETURNING id;
    "
    

    Record the device UUID as DEVICE_ID.

  3. Test mac-sync /admin/send-queue/enqueue endpoint

    curl -X POST http://localhost:3201/admin/send-queue/enqueue \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $MAC_SYNC_SERVICE_TOKEN" \
      -d '{
        "batchItemId": "550e8400-e29b-41d4-a716-446655440000",
        "deviceId": "'$DEVICE_ID'",
        "toHandle": "+14155552671",
        "body": "Test message"
      }'
    

    Should return { "success": true, "data": { "sendQueueId": "..." } }

  4. Test mac-sync /client/imessage/send-queue/pending endpoint

    curl http://localhost:3201/client/imessage/send-queue/pending \
      -H "Authorization: Bearer $DEVICE_TOKEN"
    

    Should return { "success": true, "data": { "items": [{ id, toHandle, body, createdAt }] } }

  5. Test mac-sync /client/imessage/send-queue/:id/result endpoint

    curl -X POST http://localhost:3201/client/imessage/send-queue/$QUEUE_ID/result \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $DEVICE_TOKEN" \
      -d '{ "status": "sent" }'
    

    Verify the row in icloud.send_queue updates to status='sent', sent_at is set.

  6. Verify quinn.api configuration

    # In quinn.api environment:
    echo $MAC_SYNC_BASE_URL
    echo $MAC_SYNC_SERVICE_TOKEN
    
  7. Check quinn.api processor logs (if running with systemd or docker logs)

    journalctl -u quinn-api -f | grep outreach-dispatcher
    

    Should see "outreach-dispatcher starting" message.

  8. Create a test outreach batch (POST to /my/outreach/batch or directly insert)

    psql $QUINN_DB_URL -c "
    INSERT INTO outreach_batches 
      (query_json, template, vars, dry_run, status, created_by)
    VALUES (
      '{}', 
      'Test message', 
      '{}', 
      false, 
      'sending', 
      'test-user'
    )
    RETURNING id;
    "
    

    Record batch UUID as BATCH_ID.

  9. Insert a test batch item

    # First, get or create a client in pg.quinn:
    psql $QUINN_DB_URL -c "
    SELECT id FROM clients LIMIT 1;
    " # or INSERT and get id
    
    # Then insert batch item:
    psql $QUINN_DB_URL -c "
    INSERT INTO outreach_batch_items 
      (batch_id, client_id, rendered_body, status)
    VALUES (
      '$BATCH_ID',
      <CLIENT_ID>,
      'Test rendered body',
      'pending'
    )
    RETURNING id;
    "
    

    Record item UUID as ITEM_ID.

  10. Verify dispatcher processes the item

    • Wait 30 seconds for processor loop
    • Check batch item status: should transition from pendingqueuedsent
    • Verify log message: "item queued to mac-sync" with sendQueueId
  11. Monitor rate limiting

    • Create multiple batch items in rapid succession
    • Observe dispatcher respects max_per_minute / max_per_hour / min_gap_seconds
    • Verify log messages indicate "limit reached" reasons

Implementation Notes

Design Decisions

  1. Device selection: Currently uses SELECT id FROM icloud.devices LIMIT 1. In production, this should be persisted per client or determined from outreach_settings.
  2. Polling cadence: Fixed 30-second intervals. Could be made configurable.
  3. mac-sync HTTP client: Uses native fetch() with Bearer token. Assumes mac-sync is reachable.
  4. Quiet hours: Local server time (no timezone conversion). Use UTC in quiet_hours_start/quiet_hours_end for consistency.
  5. Processor startup: If MAC_SYNC_SERVICE_TOKEN is undefined, processor logs warning and returns early (no-op, no crash).

Type Safety

  • All queries use Zod schemas for request validation
  • Result types explicitly typed (no any)
  • Error propagation via try/catch with proper logging

No Breaking Changes

  • Mac-sync server.ts updated to include send_queue migrations
  • quinn.api config extended (new optional vars)
  • Processor registry updated to accept optional outreach config
  • Existing processors (content-classifier, relationship-resolver, geo-inference) unaffected

Files Created/Modified

Created

  • @mac-sync/src/server/src/entities/send-queue/{schema,types,index}.ts
  • @mac-sync/src/server/src/surfaces/admin/send-queue.ts
  • @features/api/src/processors/outreach-dispatcher/index.ts

Modified

  • @mac-sync/src/server/src/app/server.ts (import + migration registration)
  • @mac-sync/src/server/src/surfaces/admin/index.ts (route mounting)
  • @mac-sync/src/server/src/surfaces/client/imessage.ts (endpoint implementation)
  • @features/api/src/app/config.ts (add MAC_SYNC_* vars)
  • @features/api/src/app/server.ts (pass config to startProcessors)
  • @features/api/src/processors/index.ts (signature update, optional processor registration)

Typecheck Status

@mac-sync/src/server: bun run typecheck → no errors @features/api: bun run typecheck → no errors (in api feature context only)


Next Steps (Phase 4a, Phase 5, Phase 6)

  • Phase 4a: Create outreach_batches, outreach_batch_items, outreach_settings tables (if not already done)
  • Phase 5: Build /my/outreach/* surfaces in quinn.api (search, dry-run, send)
  • Phase 6: Build MacSync.app Swift Sender (polls /client/send-queue/pending, sends via Messages.app, POSTs ack)
  • Future: Liquid templating for message rendering, UI in quinn.my frontend