From 8d821c8c970cf94d0d3706af0207290b663fe001 Mon Sep 17 00:00:00 2001 From: Natalie Date: Mon, 29 Jun 2026 11:35:13 -0400 Subject: [PATCH] feat(outbox): outbox entity, service, sweep + my-surface endpoint & tests Co-Authored-By: Claude Opus 4.8 --- src/server/src/entities/outbox/index.ts | 1 + src/server/src/entities/outbox/repo.ts | 36 +++++++++- src/server/src/entities/outbox/schema.ts | 9 +++ src/server/src/entities/outbox/types.ts | 3 + src/server/src/features/outbox/index.ts | 1 + src/server/src/features/outbox/service.ts | 34 +++++++++- src/server/src/features/outbox/sweep.ts | 81 +++++++++++++++++++++++ src/server/src/surfaces/my/outbox.ts | 2 + src/server/src/test/outbox.repo.test.ts | 27 +++++++- 9 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 src/server/src/features/outbox/sweep.ts diff --git a/src/server/src/entities/outbox/index.ts b/src/server/src/entities/outbox/index.ts index 224aa1e..1247fb2 100644 --- a/src/server/src/entities/outbox/index.ts +++ b/src/server/src/entities/outbox/index.ts @@ -14,5 +14,6 @@ export { markSending, markResult, getOutbox, + requeueStaleSending, countByStatus, } from './repo'; diff --git a/src/server/src/entities/outbox/repo.ts b/src/server/src/entities/outbox/repo.ts index 205416f..f783ee1 100644 --- a/src/server/src/entities/outbox/repo.ts +++ b/src/server/src/entities/outbox/repo.ts @@ -25,6 +25,7 @@ interface Row { error: string | null; dedupe_key: string | null; schedule_reason: string | null; + mark_read_on_send: boolean; created_at: string; updated_at: string; sent_at: string | null; @@ -45,6 +46,7 @@ const hydrate = (r: Row): OutboxRecord => ({ error: r.error, dedupeKey: r.dedupe_key, scheduleReason: r.schedule_reason, + markReadOnSend: r.mark_read_on_send, createdAt: r.created_at, updatedAt: r.updated_at, sentAt: r.sent_at, @@ -66,8 +68,8 @@ export async function enqueueOutbox(pool: Pool, draft: OutboxDraft): Promise<{ i const inserted = await pgGet<{ id: string }>( pool, `INSERT INTO macsync.outbox - (device_id, recipient, body, priority, channel_pref, not_before, max_attempts, dedupe_key, schedule_reason) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + (device_id, recipient, body, priority, channel_pref, not_before, max_attempts, dedupe_key, schedule_reason, mark_read_on_send) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (device_id, dedupe_key) WHERE dedupe_key IS NOT NULL DO NOTHING RETURNING id`, [ @@ -80,6 +82,7 @@ export async function enqueueOutbox(pool: Pool, draft: OutboxDraft): Promise<{ i draft.maxAttempts ?? 3, draft.dedupeKey ?? null, draft.scheduleReason ?? null, + draft.markReadOnSend ?? false, ], ); if (inserted) return { id: inserted.id, deduped: false }; @@ -192,6 +195,35 @@ export async function getOutbox(pool: Pool, id: string): Promise { + try { + const rows = await pgAll<{ id: string; status: string }>( + pool, + `UPDATE macsync.outbox + SET status = CASE WHEN attempts >= max_attempts THEN 'failed' ELSE 'queued' END, + error = COALESCE(error, 'stale send reclaimed: no result reported'), + updated_at = now() + WHERE status = 'sending' AND updated_at < $1 + RETURNING id, status`, + [olderThan], + ); + return rows.map((r) => ({ id: r.id, status: r.status as OutboxStatus })); + } catch (err) { + throw wrap(err, 'requeueStaleSending'); + } +} + export async function countByStatus(pool: Pool, deviceId: string, status: OutboxStatus): Promise { try { const row = await pgGet<{ n: string }>( diff --git a/src/server/src/entities/outbox/schema.ts b/src/server/src/entities/outbox/schema.ts index ada4b8f..6a274ba 100644 --- a/src/server/src/entities/outbox/schema.ts +++ b/src/server/src/entities/outbox/schema.ts @@ -39,4 +39,13 @@ export const outboxMigrations: readonly Migration[] = [ ON macsync.outbox (device_id, dedupe_key) WHERE dedupe_key IS NOT NULL; `, }, + { + // Handoff 04: when set, marking this reply sent also marks the recipient's + // conversation read (logical-read model — see entities/conversation-read). + id: '2026-06-28_outbox_mark_read_on_send', + sql: ` + ALTER TABLE macsync.outbox + ADD COLUMN IF NOT EXISTS mark_read_on_send BOOLEAN NOT NULL DEFAULT false; + `, + }, ]; diff --git a/src/server/src/entities/outbox/types.ts b/src/server/src/entities/outbox/types.ts index 32b582f..3cf5317 100644 --- a/src/server/src/entities/outbox/types.ts +++ b/src/server/src/entities/outbox/types.ts @@ -17,6 +17,8 @@ export interface OutboxRecord { readonly error: string | null; readonly dedupeKey: string | null; readonly scheduleReason: string | null; + /** When true, marking this reply sent also marks the recipient's thread read. */ + readonly markReadOnSend: boolean; readonly createdAt: string; readonly updatedAt: string; readonly sentAt: string | null; @@ -32,6 +34,7 @@ export interface OutboxDraft { readonly maxAttempts?: number; readonly dedupeKey?: string | null; readonly scheduleReason?: string | null; + readonly markReadOnSend?: boolean; } /** A due item handed to the Mac client to send. */ diff --git a/src/server/src/features/outbox/index.ts b/src/server/src/features/outbox/index.ts index 0467a5f..ea6c030 100644 --- a/src/server/src/features/outbox/index.ts +++ b/src/server/src/features/outbox/index.ts @@ -20,3 +20,4 @@ export { type SchedulerConfig, } from './scheduler'; export { timezoneForHandle, areaCodeFromHandle, DEFAULT_TIMEZONE } from './timezone'; +export { startOutboxSweep, stopOutboxSweep, sweepOnce, type OutboxSweepConfig } from './sweep'; diff --git a/src/server/src/features/outbox/service.ts b/src/server/src/features/outbox/service.ts index 4ecca0f..9609eab 100644 --- a/src/server/src/features/outbox/service.ts +++ b/src/server/src/features/outbox/service.ts @@ -9,6 +9,7 @@ import { type ChannelPref, type OutboxResultInput, } from '@/entities/outbox'; +import { markRead } from '@/entities/conversation-read'; import { schedule, type Priority } from './scheduler'; import { timezoneForHandle } from './timezone'; @@ -33,6 +34,8 @@ export interface EnqueueReplyInput { /** Dedupe key; defaults to recipient+body-hash+local-day. */ dedupeKey?: string | null; maxAttempts?: number; + /** When true, a confirmed send also marks the recipient's thread read. */ + markReadOnSend?: boolean; } export interface EnqueueReplyResult { @@ -90,6 +93,7 @@ export async function enqueueReply( maxAttempts: input.maxAttempts, dedupeKey, scheduleReason: decision.reason, + markReadOnSend: input.markReadOnSend ?? false, }); return { id, deduped, priority, notBefore, reason: decision.reason }; @@ -112,9 +116,33 @@ export function claimForSend(id: string, deviceId: string): Promise { return markSending(getDb(), id, deviceId); } -/** Returns false when the row isn't owned by `input.deviceId` (surface maps to 404). */ -export function recordResult(input: OutboxResultInput): Promise { - return markResult(getDb(), input); +/** + * Record a send result. Returns false when the row isn't owned by + * `input.deviceId` (surface maps to 404). On a confirmed send of a row flagged + * `mark_read_on_send`, also marks the recipient's thread read (logical-read + * model). The read-mark is best-effort: it must never fail the result write + * (Handoff 04 — "never block sends on it"). + */ +export async function recordResult(input: OutboxResultInput): Promise { + try { + const db = getDb(); + const ok = await markResult(db, input); + if (ok && (input.status === 'sent' || input.status === 'delivered')) { + const row = await getOutbox(db, input.id); + if (row?.markReadOnSend) { + await markRead(db, { deviceId: input.deviceId, handle: row.recipient, source: 'outbox_send' }).catch( + (err) => { + console.warn(`outbox: mark-read-on-send failed for ${input.id}: ${String(err)}`); + }, + ); + } + } + return ok; + } catch (err) { + throw new Error(`recordResult failed for ${input.id}: ${err instanceof Error ? err.message : String(err)}`, { + cause: err, + }); + } } export function getReply(id: string) { diff --git a/src/server/src/features/outbox/sweep.ts b/src/server/src/features/outbox/sweep.ts new file mode 100644 index 0000000..f040b31 --- /dev/null +++ b/src/server/src/features/outbox/sweep.ts @@ -0,0 +1,81 @@ +/** + * Outbox stale-`sending` sweep — the retry backstop noted in Handoff 03. + * + * The Mac client claims a row (queued → sending) *before* sending so a + * concurrent tick can't double-send. If its result POST then fails, the row is + * stranded in `sending` and `listDue` (queued-only) never refetches it. This + * periodic sweep reclaims any row stuck in `sending` past `staleAfterMs`, + * re-queueing it while attempts remain (else hard-failing it) so the drain can + * pick it up again. + * + * Call `startOutboxSweep()` once at app startup; it runs for the process + * lifetime. A single tick can never overlap itself, and a failed tick is logged + * and retried on the next interval (never throws into the timer). + */ + +import { logger } from '@/shared/logger'; +import { getDb } from '@/shared/db'; +import { requeueStaleSending } from '@/entities/outbox'; + +/** How often to sweep. */ +const DEFAULT_INTERVAL_MS = 5 * 60_000; +/** A `sending` row older than this (no result reported) is considered wedged. */ +const DEFAULT_STALE_AFTER_MS = 10 * 60_000; + +export interface OutboxSweepConfig { + intervalMs?: number; + staleAfterMs?: number; +} + +let timer: ReturnType | null = null; +let sweeping = false; + +/** One sweep pass. Exported for tests / manual invocation. Never throws. */ +export async function sweepOnce(staleAfterMs: number = DEFAULT_STALE_AFTER_MS): Promise { + const cutoff = new Date(Date.now() - staleAfterMs).toISOString(); + try { + const reclaimed = await requeueStaleSending(getDb(), cutoff); + if (reclaimed.length > 0) { + const requeued = reclaimed.filter((r) => r.status === 'queued').length; + const failed = reclaimed.length - requeued; + logger.warn('outbox-sweep: reclaimed stale sending rows', { + total: reclaimed.length, + requeued, + hardFailed: failed, + }); + } + return reclaimed.length; + } catch (err) { + logger.error('outbox-sweep: pass failed', { + err: err instanceof Error ? err.message : String(err), + }); + return 0; + } +} + +export function startOutboxSweep(config: OutboxSweepConfig = {}): void { + const intervalMs = config.intervalMs ?? DEFAULT_INTERVAL_MS; + const staleAfterMs = config.staleAfterMs ?? DEFAULT_STALE_AFTER_MS; + if (timer) return; + + logger.info('outbox-sweep: starting', { intervalMs, staleAfterMs }); + timer = setInterval(() => { + if (sweeping) return; // never overlap a slow pass + sweeping = true; + void sweepOnce(staleAfterMs).finally(() => { + sweeping = false; + }); + }, intervalMs); + // Don't keep the process alive solely for the sweep. + if (typeof timer === 'object' && timer && 'unref' in timer) { + (timer as { unref: () => void }).unref(); + } +} + +export function stopOutboxSweep(): void { + if (timer) { + clearInterval(timer); + timer = null; + logger.info('outbox-sweep: stopped'); + } +} diff --git a/src/server/src/surfaces/my/outbox.ts b/src/server/src/surfaces/my/outbox.ts index e3174bc..d989c85 100644 --- a/src/server/src/surfaces/my/outbox.ts +++ b/src/server/src/surfaces/my/outbox.ts @@ -16,6 +16,8 @@ const enqueueSchema = z.object({ notBefore: z.string().datetime().optional(), dedupeKey: z.string().min(1).max(256).optional(), maxAttempts: z.number().int().positive().max(10).optional(), + /** Mark the recipient's thread read once this reply is confirmed sent. */ + markReadOnSend: z.boolean().optional(), }); export const outboxMyRouter = new Hono() diff --git a/src/server/src/test/outbox.repo.test.ts b/src/server/src/test/outbox.repo.test.ts index 32814e6..d7a235e 100644 --- a/src/server/src/test/outbox.repo.test.ts +++ b/src/server/src/test/outbox.repo.test.ts @@ -11,7 +11,7 @@ import { describe, expect, test } from 'bun:test'; import type { Pool } from 'pg'; -import { enqueueOutbox, listDue, markResult } from '@/entities/outbox/repo'; +import { enqueueOutbox, listDue, markResult, requeueStaleSending } from '@/entities/outbox/repo'; import type { OutboxDraft } from '@/entities/outbox'; interface Captured { @@ -108,3 +108,28 @@ describe('markResult', () => { expect(ok).toBe(false); }); }); + +describe('requeueStaleSending', () => { + test('reclaims sending rows older than cutoff; requeues unless attempts exhausted', async () => { + const pool = new FakePool([ + { id: 'a', status: 'queued' }, + { id: 'b', status: 'failed' }, + ]); + const cutoff = '2026-06-28T11:50:00.000Z'; + const reclaimed = await requeueStaleSending(pool as unknown as Pool, cutoff); + expect(reclaimed).toEqual([ + { id: 'a', status: 'queued' }, + { id: 'b', status: 'failed' }, + ]); + const { sql, params } = pool.calls[0]; + expect(sql).toContain("status = 'sending' AND updated_at < $1"); + expect(sql).toContain("WHEN attempts >= max_attempts THEN 'failed' ELSE 'queued'"); + expect(params).toEqual([cutoff]); + }); + + test('nothing stale → empty array', async () => { + const pool = new FakePool([]); + const reclaimed = await requeueStaleSending(pool as unknown as Pool, '2026-06-28T11:50:00.000Z'); + expect(reclaimed).toEqual([]); + }); +});