feat(outbox): outbox entity, service, sweep + my-surface endpoint & tests

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Natalie 2026-06-29 11:35:13 -04:00
parent 53c900b3ee
commit 8d821c8c97
9 changed files with 188 additions and 6 deletions

View file

@ -14,5 +14,6 @@ export {
markSending,
markResult,
getOutbox,
requeueStaleSending,
countByStatus,
} from './repo';

View file

@ -25,6 +25,7 @@ interface Row {
error: string | null;
dedupe_key: string | null;
schedule_reason: string | null;
mark_read_on_send: boolean;
created_at: string;
updated_at: string;
sent_at: string | null;
@ -45,6 +46,7 @@ const hydrate = (r: Row): OutboxRecord => ({
error: r.error,
dedupeKey: r.dedupe_key,
scheduleReason: r.schedule_reason,
markReadOnSend: r.mark_read_on_send,
createdAt: r.created_at,
updatedAt: r.updated_at,
sentAt: r.sent_at,
@ -66,8 +68,8 @@ export async function enqueueOutbox(pool: Pool, draft: OutboxDraft): Promise<{ i
const inserted = await pgGet<{ id: string }>(
pool,
`INSERT INTO macsync.outbox
(device_id, recipient, body, priority, channel_pref, not_before, max_attempts, dedupe_key, schedule_reason)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
(device_id, recipient, body, priority, channel_pref, not_before, max_attempts, dedupe_key, schedule_reason, mark_read_on_send)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (device_id, dedupe_key) WHERE dedupe_key IS NOT NULL DO NOTHING
RETURNING id`,
[
@ -80,6 +82,7 @@ export async function enqueueOutbox(pool: Pool, draft: OutboxDraft): Promise<{ i
draft.maxAttempts ?? 3,
draft.dedupeKey ?? null,
draft.scheduleReason ?? null,
draft.markReadOnSend ?? false,
],
);
if (inserted) return { id: inserted.id, deduped: false };
@ -192,6 +195,35 @@ export async function getOutbox(pool: Pool, id: string): Promise<OutboxRecord |
}
}
/**
* Retry backstop: reclaim rows wedged in `sending` because the Mac client
* claimed them (queued sending, attempt counted) but never reported a result
* e.g. its result POST failed after the claim. `listDue` is queued-only so it
* never refetches them on its own. Re-queue while attempts remain, else
* hard-fail (same policy as `markResult`). `olderThan` is an ISO cutoff on
* `updated_at`. Returns the affected ids + their new status.
*/
export async function requeueStaleSending(
pool: Pool,
olderThan: string,
): Promise<readonly { id: string; status: OutboxStatus }[]> {
try {
const rows = await pgAll<{ id: string; status: string }>(
pool,
`UPDATE macsync.outbox
SET status = CASE WHEN attempts >= max_attempts THEN 'failed' ELSE 'queued' END,
error = COALESCE(error, 'stale send reclaimed: no result reported'),
updated_at = now()
WHERE status = 'sending' AND updated_at < $1
RETURNING id, status`,
[olderThan],
);
return rows.map((r) => ({ id: r.id, status: r.status as OutboxStatus }));
} catch (err) {
throw wrap(err, 'requeueStaleSending');
}
}
export async function countByStatus(pool: Pool, deviceId: string, status: OutboxStatus): Promise<number> {
try {
const row = await pgGet<{ n: string }>(

View file

@ -39,4 +39,13 @@ export const outboxMigrations: readonly Migration[] = [
ON macsync.outbox (device_id, dedupe_key) WHERE dedupe_key IS NOT NULL;
`,
},
{
// Handoff 04: when set, marking this reply sent also marks the recipient's
// conversation read (logical-read model — see entities/conversation-read).
id: '2026-06-28_outbox_mark_read_on_send',
sql: `
ALTER TABLE macsync.outbox
ADD COLUMN IF NOT EXISTS mark_read_on_send BOOLEAN NOT NULL DEFAULT false;
`,
},
];

View file

@ -17,6 +17,8 @@ export interface OutboxRecord {
readonly error: string | null;
readonly dedupeKey: string | null;
readonly scheduleReason: string | null;
/** When true, marking this reply sent also marks the recipient's thread read. */
readonly markReadOnSend: boolean;
readonly createdAt: string;
readonly updatedAt: string;
readonly sentAt: string | null;
@ -32,6 +34,7 @@ export interface OutboxDraft {
readonly maxAttempts?: number;
readonly dedupeKey?: string | null;
readonly scheduleReason?: string | null;
readonly markReadOnSend?: boolean;
}
/** A due item handed to the Mac client to send. */

View file

@ -20,3 +20,4 @@ export {
type SchedulerConfig,
} from './scheduler';
export { timezoneForHandle, areaCodeFromHandle, DEFAULT_TIMEZONE } from './timezone';
export { startOutboxSweep, stopOutboxSweep, sweepOnce, type OutboxSweepConfig } from './sweep';

View file

@ -9,6 +9,7 @@ import {
type ChannelPref,
type OutboxResultInput,
} from '@/entities/outbox';
import { markRead } from '@/entities/conversation-read';
import { schedule, type Priority } from './scheduler';
import { timezoneForHandle } from './timezone';
@ -33,6 +34,8 @@ export interface EnqueueReplyInput {
/** Dedupe key; defaults to recipient+body-hash+local-day. */
dedupeKey?: string | null;
maxAttempts?: number;
/** When true, a confirmed send also marks the recipient's thread read. */
markReadOnSend?: boolean;
}
export interface EnqueueReplyResult {
@ -90,6 +93,7 @@ export async function enqueueReply(
maxAttempts: input.maxAttempts,
dedupeKey,
scheduleReason: decision.reason,
markReadOnSend: input.markReadOnSend ?? false,
});
return { id, deduped, priority, notBefore, reason: decision.reason };
@ -112,9 +116,33 @@ export function claimForSend(id: string, deviceId: string): Promise<boolean> {
return markSending(getDb(), id, deviceId);
}
/** Returns false when the row isn't owned by `input.deviceId` (surface maps to 404). */
export function recordResult(input: OutboxResultInput): Promise<boolean> {
return markResult(getDb(), input);
/**
* Record a send result. Returns false when the row isn't owned by
* `input.deviceId` (surface maps to 404). On a confirmed send of a row flagged
* `mark_read_on_send`, also marks the recipient's thread read (logical-read
* model). The read-mark is best-effort: it must never fail the result write
* (Handoff 04 "never block sends on it").
*/
export async function recordResult(input: OutboxResultInput): Promise<boolean> {
try {
const db = getDb();
const ok = await markResult(db, input);
if (ok && (input.status === 'sent' || input.status === 'delivered')) {
const row = await getOutbox(db, input.id);
if (row?.markReadOnSend) {
await markRead(db, { deviceId: input.deviceId, handle: row.recipient, source: 'outbox_send' }).catch(
(err) => {
console.warn(`outbox: mark-read-on-send failed for ${input.id}: ${String(err)}`);
},
);
}
}
return ok;
} catch (err) {
throw new Error(`recordResult failed for ${input.id}: ${err instanceof Error ? err.message : String(err)}`, {
cause: err,
});
}
}
export function getReply(id: string) {

View file

@ -0,0 +1,81 @@
/**
* Outbox stale-`sending` sweep the retry backstop noted in Handoff 03.
*
* The Mac client claims a row (queued sending) *before* sending so a
* concurrent tick can't double-send. If its result POST then fails, the row is
* stranded in `sending` and `listDue` (queued-only) never refetches it. This
* periodic sweep reclaims any row stuck in `sending` past `staleAfterMs`,
* re-queueing it while attempts remain (else hard-failing it) so the drain can
* pick it up again.
*
* Call `startOutboxSweep()` once at app startup; it runs for the process
* lifetime. A single tick can never overlap itself, and a failed tick is logged
* and retried on the next interval (never throws into the timer).
*/
import { logger } from '@/shared/logger';
import { getDb } from '@/shared/db';
import { requeueStaleSending } from '@/entities/outbox';
/** How often to sweep. */
const DEFAULT_INTERVAL_MS = 5 * 60_000;
/** A `sending` row older than this (no result reported) is considered wedged. */
const DEFAULT_STALE_AFTER_MS = 10 * 60_000;
export interface OutboxSweepConfig {
intervalMs?: number;
staleAfterMs?: number;
}
let timer: ReturnType<typeof setInterval> | null = null;
let sweeping = false;
/** One sweep pass. Exported for tests / manual invocation. Never throws. */
export async function sweepOnce(staleAfterMs: number = DEFAULT_STALE_AFTER_MS): Promise<number> {
const cutoff = new Date(Date.now() - staleAfterMs).toISOString();
try {
const reclaimed = await requeueStaleSending(getDb(), cutoff);
if (reclaimed.length > 0) {
const requeued = reclaimed.filter((r) => r.status === 'queued').length;
const failed = reclaimed.length - requeued;
logger.warn('outbox-sweep: reclaimed stale sending rows', {
total: reclaimed.length,
requeued,
hardFailed: failed,
});
}
return reclaimed.length;
} catch (err) {
logger.error('outbox-sweep: pass failed', {
err: err instanceof Error ? err.message : String(err),
});
return 0;
}
}
export function startOutboxSweep(config: OutboxSweepConfig = {}): void {
const intervalMs = config.intervalMs ?? DEFAULT_INTERVAL_MS;
const staleAfterMs = config.staleAfterMs ?? DEFAULT_STALE_AFTER_MS;
if (timer) return;
logger.info('outbox-sweep: starting', { intervalMs, staleAfterMs });
timer = setInterval(() => {
if (sweeping) return; // never overlap a slow pass
sweeping = true;
void sweepOnce(staleAfterMs).finally(() => {
sweeping = false;
});
}, intervalMs);
// Don't keep the process alive solely for the sweep.
if (typeof timer === 'object' && timer && 'unref' in timer) {
(timer as { unref: () => void }).unref();
}
}
export function stopOutboxSweep(): void {
if (timer) {
clearInterval(timer);
timer = null;
logger.info('outbox-sweep: stopped');
}
}

View file

@ -16,6 +16,8 @@ const enqueueSchema = z.object({
notBefore: z.string().datetime().optional(),
dedupeKey: z.string().min(1).max(256).optional(),
maxAttempts: z.number().int().positive().max(10).optional(),
/** Mark the recipient's thread read once this reply is confirmed sent. */
markReadOnSend: z.boolean().optional(),
});
export const outboxMyRouter = new Hono()

View file

@ -11,7 +11,7 @@
import { describe, expect, test } from 'bun:test';
import type { Pool } from 'pg';
import { enqueueOutbox, listDue, markResult } from '@/entities/outbox/repo';
import { enqueueOutbox, listDue, markResult, requeueStaleSending } from '@/entities/outbox/repo';
import type { OutboxDraft } from '@/entities/outbox';
interface Captured {
@ -108,3 +108,28 @@ describe('markResult', () => {
expect(ok).toBe(false);
});
});
describe('requeueStaleSending', () => {
test('reclaims sending rows older than cutoff; requeues unless attempts exhausted', async () => {
const pool = new FakePool([
{ id: 'a', status: 'queued' },
{ id: 'b', status: 'failed' },
]);
const cutoff = '2026-06-28T11:50:00.000Z';
const reclaimed = await requeueStaleSending(pool as unknown as Pool, cutoff);
expect(reclaimed).toEqual([
{ id: 'a', status: 'queued' },
{ id: 'b', status: 'failed' },
]);
const { sql, params } = pool.calls[0];
expect(sql).toContain("status = 'sending' AND updated_at < $1");
expect(sql).toContain("WHEN attempts >= max_attempts THEN 'failed' ELSE 'queued'");
expect(params).toEqual([cutoff]);
});
test('nothing stale → empty array', async () => {
const pool = new FakePool([]);
const reclaimed = await requeueStaleSending(pool as unknown as Pool, '2026-06-28T11:50:00.000Z');
expect(reclaimed).toEqual([]);
});
});