From 9f7c9f4533083f477af48c96bfeb9ce31d7b414c Mon Sep 17 00:00:00 2001 From: Natalie Date: Mon, 29 Jun 2026 11:35:13 -0400 Subject: [PATCH] feat(read): conversation-read entity + read feature + my-surface endpoint & tests Co-Authored-By: Claude Opus 4.8 --- .../src/entities/conversation-read/index.ts | 3 + .../src/entities/conversation-read/repo.ts | 130 ++++++++++++++++++ .../src/entities/conversation-read/schema.ts | 36 +++++ .../src/entities/conversation-read/types.ts | 20 +++ src/server/src/features/read/index.ts | 6 + src/server/src/features/read/service.ts | 65 +++++++++ src/server/src/surfaces/my/read.ts | 44 ++++++ .../src/test/conversation-read.repo.test.ts | 123 +++++++++++++++++ 8 files changed, 427 insertions(+) create mode 100644 src/server/src/entities/conversation-read/index.ts create mode 100644 src/server/src/entities/conversation-read/repo.ts create mode 100644 src/server/src/entities/conversation-read/schema.ts create mode 100644 src/server/src/entities/conversation-read/types.ts create mode 100644 src/server/src/features/read/index.ts create mode 100644 src/server/src/features/read/service.ts create mode 100644 src/server/src/surfaces/my/read.ts create mode 100644 src/server/src/test/conversation-read.repo.test.ts diff --git a/src/server/src/entities/conversation-read/index.ts b/src/server/src/entities/conversation-read/index.ts new file mode 100644 index 0000000..050d12a --- /dev/null +++ b/src/server/src/entities/conversation-read/index.ts @@ -0,0 +1,3 @@ +export { conversationReadMigrations } from './schema'; +export type { ReadSource, MarkReadInput, ConversationReadState } from './types'; +export { markRead, markAllRepliedRead, getReadState } from './repo'; diff --git a/src/server/src/entities/conversation-read/repo.ts b/src/server/src/entities/conversation-read/repo.ts new file mode 100644 index 0000000..8199fcf --- /dev/null +++ b/src/server/src/entities/conversation-read/repo.ts @@ -0,0 +1,130 @@ +import type { Pool } from 'pg'; + +import { pgAll, pgGet } from '@/shared/db'; + +import type { ConversationReadState, MarkReadInput, ReadSource } from './types'; + +function wrap(err: unknown, op: string): Error { + return new Error( + `conversation-read/repo.${op} failed: ${err instanceof Error ? err.message : String(err)}`, + { cause: err }, + ); +} + +/** + * Mark a handle read through an instant (defaults to now). Upsert on + * `(device_id, handle)`; `read_through` only advances (GREATEST), so re-marking + * an already-read thread is an idempotent no-op. Returns the effective + * `read_through` after the upsert. + */ +export async function markRead(pool: Pool, input: MarkReadInput): Promise<{ readThrough: string }> { + try { + const row = await pgGet<{ read_through: string }>( + pool, + `INSERT INTO macsync.conversation_read (device_id, handle, read_through, source) + VALUES ($1, $2, COALESCE($3::timestamptz, now()), $4) + ON CONFLICT (device_id, handle) DO UPDATE + SET read_through = GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through), + source = EXCLUDED.source, + read_at = now(), + updated_at = now() + RETURNING read_through`, + [input.deviceId, input.handle, input.readThrough ?? null, input.source ?? 'manual'], + ); + if (!row) throw new Error('upsert returned no row'); + return { readThrough: row.read_through }; + } catch (err) { + throw wrap(err, 'markRead'); + } +} + +/** + * Mark read every thread whose latest message is outbound (i.e. Quinn has + * replied) — marking it read through that reply's time. Single statement: + * per conversation, take the newest message; if it's outbound, mark the lead's + * handle (newest inbound `from_handle`) read. Idempotent via the same GREATEST + * upsert. Returns the handles affected. + */ +export async function markAllRepliedRead(pool: Pool, deviceId: string): Promise { + try { + const rows = await pgAll<{ handle: string }>( + pool, + `INSERT INTO macsync.conversation_read (device_id, handle, read_through, source) + SELECT $1, h.handle, latest.last_at, 'all_replied' + FROM ( + SELECT DISTINCT ON (m.conversation_id) + m.conversation_id, m.is_from_me, m.sent_at AS last_at + FROM macsync.messages m + WHERE m.device_id = $1 + ORDER BY m.conversation_id, m.sent_at DESC + ) latest + JOIN LATERAL ( + SELECT m2.from_handle AS handle + FROM macsync.messages m2 + WHERE m2.conversation_id = latest.conversation_id + AND m2.is_from_me = false + AND m2.from_handle <> '' + ORDER BY m2.sent_at DESC + LIMIT 1 + ) h ON true + WHERE latest.is_from_me = true + ON CONFLICT (device_id, handle) DO UPDATE + SET read_through = GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through), + source = 'all_replied', + read_at = now(), + updated_at = now() + RETURNING handle`, + [deviceId], + ); + return rows.map((r) => r.handle); + } catch (err) { + throw wrap(err, 'markAllRepliedRead'); + } +} + +/** + * Read-state of one handle: the stored `read_through` (if any) plus the handle's + * latest inbound time, and whether it is currently unread (inbound newer than + * `read_through`, or never marked read while inbound exists). + */ +export async function getReadState( + pool: Pool, + deviceId: string, + handle: string, +): Promise { + try { + const row = await pgGet<{ + read_through: string | null; + read_at: string | null; + source: string | null; + last_inbound_at: string | null; + }>( + pool, + `SELECT + cr.read_through, + cr.read_at, + cr.source, + (SELECT MAX(m.sent_at) FROM macsync.messages m + WHERE m.device_id = $1 AND m.from_handle = $2 AND m.is_from_me = false) AS last_inbound_at + FROM (SELECT 1) one + LEFT JOIN macsync.conversation_read cr + ON cr.device_id = $1 AND cr.handle = $2`, + [deviceId, handle], + ); + const readThrough = row?.read_through ?? null; + const lastInboundAt = row?.last_inbound_at ?? null; + const unread = + lastInboundAt !== null && + (readThrough === null || new Date(lastInboundAt) > new Date(readThrough)); + return { + handle, + readThrough, + readAt: row?.read_at ?? null, + source: (row?.source as ReadSource | null) ?? null, + lastInboundAt, + unread, + }; + } catch (err) { + throw wrap(err, 'getReadState'); + } +} diff --git a/src/server/src/entities/conversation-read/schema.ts b/src/server/src/entities/conversation-read/schema.ts new file mode 100644 index 0000000..6246eef --- /dev/null +++ b/src/server/src/entities/conversation-read/schema.ts @@ -0,0 +1,36 @@ +import type { Migration } from '@/shared/db'; + +/** + * Logical read-state (Handoff 04). There is **no safe public macOS API** to mark + * a Messages conversation read, and writing `is_read`/`date_read` into Apple's + * live `chat.db` risks corruption — so macsync tracks "handled/read" in its OWN + * store and the inbox/app reads from here. Apple's database is never touched. + * + * A handle is "read through" a timestamp: it is considered read iff it has no + * inbound message newer than `read_through`. A later inbound naturally + * re-surfaces it as unread. `read_through` only ever advances (GREATEST on + * upsert), so re-marking is an idempotent no-op. + */ +export const conversationReadMigrations: readonly Migration[] = [ + { + id: '2026-06-28_conversation_read_initial', + sql: ` + CREATE TABLE IF NOT EXISTS macsync.conversation_read ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + device_id UUID NOT NULL REFERENCES macsync.devices(id) ON DELETE CASCADE, + -- The lead's handle (phone/email), matching outbox.recipient and the + -- inbound messages.from_handle — the consumer-facing thread identity. + handle TEXT NOT NULL, + read_through TIMESTAMPTZ NOT NULL, + -- manual | outbox_send | all_replied — provenance for tuning/audit. + source TEXT NOT NULL DEFAULT 'manual', + read_at TIMESTAMPTZ NOT NULL DEFAULT now(), + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE(device_id, handle) + ); + CREATE INDEX IF NOT EXISTS idx_conv_read_device + ON macsync.conversation_read(device_id); + `, + }, +]; diff --git a/src/server/src/entities/conversation-read/types.ts b/src/server/src/entities/conversation-read/types.ts new file mode 100644 index 0000000..c038e76 --- /dev/null +++ b/src/server/src/entities/conversation-read/types.ts @@ -0,0 +1,20 @@ +export type ReadSource = 'manual' | 'outbox_send' | 'all_replied'; + +export interface MarkReadInput { + readonly deviceId: string; + readonly handle: string; + /** Mark read through this instant (ISO). Defaults to "now" at the repo layer. */ + readonly readThrough?: string | null; + readonly source?: ReadSource; +} + +/** The read-state of one handle, with whether it currently has unread inbound. */ +export interface ConversationReadState { + readonly handle: string; + readonly readThrough: string | null; + readonly readAt: string | null; + readonly source: ReadSource | null; + readonly lastInboundAt: string | null; + /** true when there is inbound newer than `read_through` (or never marked read). */ + readonly unread: boolean; +} diff --git a/src/server/src/features/read/index.ts b/src/server/src/features/read/index.ts new file mode 100644 index 0000000..e3ebc6b --- /dev/null +++ b/src/server/src/features/read/index.ts @@ -0,0 +1,6 @@ +export { + markConversationRead, + markAllReplied, + conversationReadState, + type MarkReadOptions, +} from './service'; diff --git a/src/server/src/features/read/service.ts b/src/server/src/features/read/service.ts new file mode 100644 index 0000000..3831e7c --- /dev/null +++ b/src/server/src/features/read/service.ts @@ -0,0 +1,65 @@ +import { getDb } from '@/shared/db'; +import { + markRead, + markAllRepliedRead, + getReadState, + type ConversationReadState, + type ReadSource, +} from '@/entities/conversation-read'; + +export interface MarkReadOptions { + /** Mark read through this instant (ISO); defaults to "now". */ + readThrough?: string | null; + source?: ReadSource; +} + +function wrap(err: unknown, op: string): Error { + return new Error(`read/${op} failed: ${err instanceof Error ? err.message : String(err)}`, { + cause: err, + }); +} + +/** + * Mark a single conversation (by the lead's handle) read. Logical-read model — + * nothing is written to Apple's chat.db; the inbox reads this state. Idempotent. + */ +export async function markConversationRead( + deviceId: string, + handle: string, + opts: MarkReadOptions = {}, +): Promise<{ handle: string; readThrough: string }> { + try { + const { readThrough } = await markRead(getDb(), { + deviceId, + handle, + readThrough: opts.readThrough ?? null, + source: opts.source ?? 'manual', + }); + return { handle, readThrough }; + } catch (err) { + throw wrap(err, 'markConversationRead'); + } +} + +/** Mark every replied-to thread read. Returns the handles affected. */ +export async function markAllReplied( + deviceId: string, +): Promise<{ marked: number; handles: readonly string[] }> { + try { + const handles = await markAllRepliedRead(getDb(), deviceId); + return { marked: handles.length, handles }; + } catch (err) { + throw wrap(err, 'markAllReplied'); + } +} + +export async function conversationReadState( + deviceId: string, + handle: string, +): Promise { + try { + return await getReadState(getDb(), deviceId, handle); + } catch (err) { + throw wrap(err, 'conversationReadState'); + } +} diff --git a/src/server/src/surfaces/my/read.ts b/src/server/src/surfaces/my/read.ts new file mode 100644 index 0000000..112d7d8 --- /dev/null +++ b/src/server/src/surfaces/my/read.ts @@ -0,0 +1,44 @@ +import { Hono } from 'hono'; +import { z } from 'zod'; + +import { markConversationRead, markAllReplied, conversationReadState } from '@/features/read'; + +const markSchema = z.object({ + deviceId: z.string().uuid(), + handle: z.string().min(1), + /** Mark read through this instant; defaults to now. */ + readThrough: z.string().datetime().optional(), + source: z.enum(['manual', 'outbox_send', 'all_replied']).optional(), +}); + +const allRepliedSchema = z.object({ deviceId: z.string().uuid() }); +const deviceIdParam = z.string().uuid(); + +/** + * Logical conversation read-state (Handoff 04). `markRead`/`markAllRepliedRead` + * for an app/UI to flag handled threads. Never touches Apple's chat.db. + */ +export const readMyRouter = new Hono() + .post('/', async (c) => { + const parsed = markSchema.safeParse(await c.req.json().catch(() => null)); + if (!parsed.success) { + return c.json({ success: false, error: { code: 'validation_error', issues: parsed.error.flatten() } }, 400); + } + const { deviceId, handle, readThrough, source } = parsed.data; + const data = await markConversationRead(deviceId, handle, { readThrough, source }); + return c.json({ success: true, data }); + }) + .post('/all-replied', async (c) => { + const parsed = allRepliedSchema.safeParse(await c.req.json().catch(() => null)); + if (!parsed.success) { + return c.json({ success: false, error: { code: 'validation_error', issues: parsed.error.flatten() } }, 400); + } + const data = await markAllReplied(parsed.data.deviceId); + return c.json({ success: true, data }); + }) + .get('/:handle', async (c) => { + const deviceId = deviceIdParam.parse(c.req.query('deviceId') ?? ''); + const handle = z.string().min(1).parse(c.req.param('handle')); + const data = await conversationReadState(deviceId, handle); + return c.json({ success: true, data }); + }); diff --git a/src/server/src/test/conversation-read.repo.test.ts b/src/server/src/test/conversation-read.repo.test.ts new file mode 100644 index 0000000..ccdcc0b --- /dev/null +++ b/src/server/src/test/conversation-read.repo.test.ts @@ -0,0 +1,123 @@ +/** + * Unit tests for the conversation-read repo SQL (Handoff 04 — logical-read + * model). Capturing FakePool, no Postgres. + * + * NOTE: like outbox.repo.test.ts / calls.repo.test.ts this imports + * `@/shared/db`, which pulls the private `@lilith/quinn-db-pg` package — run + * where that registry is reachable (`bun install && bun test + * src/test/conversation-read.repo.test.ts`). + */ + +import { describe, expect, test } from 'bun:test'; +import type { Pool } from 'pg'; + +import { markRead, markAllRepliedRead, getReadState } from '@/entities/conversation-read/repo'; + +interface Captured { + sql: string; + params: unknown[]; +} + +class FakePool { + calls: Captured[] = []; + private readonly rows: Record[]; + constructor(rows: Record[] = []) { + this.rows = rows; + } + async query>(sql: string, params: unknown[] = []) { + this.calls.push({ sql, params }); + return { rows: this.rows as unknown as T[] }; + } +} + +describe('markRead', () => { + test('upserts with monotonic GREATEST read_through and defaults to now', async () => { + const pool = new FakePool([{ read_through: '2026-06-28T12:00:00.000Z' }]); + const res = await markRead(pool as unknown as Pool, { + deviceId: 'dev-1', + handle: '+14155550000', + }); + expect(res).toEqual({ readThrough: '2026-06-28T12:00:00.000Z' }); + const { sql, params } = pool.calls[0]; + expect(sql).toContain('ON CONFLICT (device_id, handle) DO UPDATE'); + expect(sql).toContain('GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through)'); + expect(sql).toContain('COALESCE($3::timestamptz, now())'); + // readThrough omitted → null (COALESCE falls back to now()); source default manual. + expect(params).toEqual(['dev-1', '+14155550000', null, 'manual']); + }); + + test('passes explicit readThrough and source through', async () => { + const pool = new FakePool([{ read_through: '2026-06-28T09:00:00.000Z' }]); + await markRead(pool as unknown as Pool, { + deviceId: 'dev-1', + handle: 'a@b.com', + readThrough: '2026-06-28T09:00:00.000Z', + source: 'outbox_send', + }); + expect(pool.calls[0].params).toEqual(['dev-1', 'a@b.com', '2026-06-28T09:00:00.000Z', 'outbox_send']); + }); +}); + +describe('markAllRepliedRead', () => { + test('marks threads whose latest message is outbound and returns affected handles', async () => { + const pool = new FakePool([{ handle: '+14155550000' }, { handle: 'a@b.com' }]); + const handles = await markAllRepliedRead(pool as unknown as Pool, 'dev-1'); + expect(handles).toEqual(['+14155550000', 'a@b.com']); + const { sql, params } = pool.calls[0]; + expect(sql).toContain('WHERE latest.is_from_me = true'); + expect(sql).toContain("source = 'all_replied'"); + expect(sql).toContain('GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through)'); + expect(params).toEqual(['dev-1']); + }); +}); + +describe('getReadState', () => { + test('unread when inbound is newer than read_through', async () => { + const pool = new FakePool([ + { + read_through: '2026-06-28T10:00:00.000Z', + read_at: '2026-06-28T10:00:00.000Z', + source: 'manual', + last_inbound_at: '2026-06-28T11:00:00.000Z', + }, + ]); + const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1'); + expect(st.unread).toBe(true); + expect(st.readThrough).toBe('2026-06-28T10:00:00.000Z'); + }); + + test('read when read_through is at/after the last inbound', async () => { + const pool = new FakePool([ + { + read_through: '2026-06-28T12:00:00.000Z', + read_at: '2026-06-28T12:00:00.000Z', + source: 'manual', + last_inbound_at: '2026-06-28T11:00:00.000Z', + }, + ]); + const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1'); + expect(st.unread).toBe(false); + }); + + test('no inbound at all → read (nothing to handle)', async () => { + const pool = new FakePool([ + { read_through: null, read_at: null, source: null, last_inbound_at: null }, + ]); + const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1'); + expect(st.unread).toBe(false); + expect(st.readThrough).toBeNull(); + }); + + test('inbound exists but never marked read → unread', async () => { + const pool = new FakePool([ + { + read_through: null, + read_at: null, + source: null, + last_inbound_at: '2026-06-28T11:00:00.000Z', + }, + ]); + const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1'); + expect(st.unread).toBe(true); + }); +});