feat(read): conversation-read entity + read feature + my-surface endpoint & tests

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Natalie 2026-06-29 11:35:13 -04:00
parent 8d821c8c97
commit 9f7c9f4533
8 changed files with 427 additions and 0 deletions

View file

@ -0,0 +1,3 @@
export { conversationReadMigrations } from './schema';
export type { ReadSource, MarkReadInput, ConversationReadState } from './types';
export { markRead, markAllRepliedRead, getReadState } from './repo';

View file

@ -0,0 +1,130 @@
import type { Pool } from 'pg';
import { pgAll, pgGet } from '@/shared/db';
import type { ConversationReadState, MarkReadInput, ReadSource } from './types';
function wrap(err: unknown, op: string): Error {
return new Error(
`conversation-read/repo.${op} failed: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err },
);
}
/**
* Mark a handle read through an instant (defaults to now). Upsert on
* `(device_id, handle)`; `read_through` only advances (GREATEST), so re-marking
* an already-read thread is an idempotent no-op. Returns the effective
* `read_through` after the upsert.
*/
export async function markRead(pool: Pool, input: MarkReadInput): Promise<{ readThrough: string }> {
try {
const row = await pgGet<{ read_through: string }>(
pool,
`INSERT INTO macsync.conversation_read (device_id, handle, read_through, source)
VALUES ($1, $2, COALESCE($3::timestamptz, now()), $4)
ON CONFLICT (device_id, handle) DO UPDATE
SET read_through = GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through),
source = EXCLUDED.source,
read_at = now(),
updated_at = now()
RETURNING read_through`,
[input.deviceId, input.handle, input.readThrough ?? null, input.source ?? 'manual'],
);
if (!row) throw new Error('upsert returned no row');
return { readThrough: row.read_through };
} catch (err) {
throw wrap(err, 'markRead');
}
}
/**
* Mark read every thread whose latest message is outbound (i.e. Quinn has
* replied) marking it read through that reply's time. Single statement:
* per conversation, take the newest message; if it's outbound, mark the lead's
* handle (newest inbound `from_handle`) read. Idempotent via the same GREATEST
* upsert. Returns the handles affected.
*/
export async function markAllRepliedRead(pool: Pool, deviceId: string): Promise<readonly string[]> {
try {
const rows = await pgAll<{ handle: string }>(
pool,
`INSERT INTO macsync.conversation_read (device_id, handle, read_through, source)
SELECT $1, h.handle, latest.last_at, 'all_replied'
FROM (
SELECT DISTINCT ON (m.conversation_id)
m.conversation_id, m.is_from_me, m.sent_at AS last_at
FROM macsync.messages m
WHERE m.device_id = $1
ORDER BY m.conversation_id, m.sent_at DESC
) latest
JOIN LATERAL (
SELECT m2.from_handle AS handle
FROM macsync.messages m2
WHERE m2.conversation_id = latest.conversation_id
AND m2.is_from_me = false
AND m2.from_handle <> ''
ORDER BY m2.sent_at DESC
LIMIT 1
) h ON true
WHERE latest.is_from_me = true
ON CONFLICT (device_id, handle) DO UPDATE
SET read_through = GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through),
source = 'all_replied',
read_at = now(),
updated_at = now()
RETURNING handle`,
[deviceId],
);
return rows.map((r) => r.handle);
} catch (err) {
throw wrap(err, 'markAllRepliedRead');
}
}
/**
* Read-state of one handle: the stored `read_through` (if any) plus the handle's
* latest inbound time, and whether it is currently unread (inbound newer than
* `read_through`, or never marked read while inbound exists).
*/
export async function getReadState(
pool: Pool,
deviceId: string,
handle: string,
): Promise<ConversationReadState> {
try {
const row = await pgGet<{
read_through: string | null;
read_at: string | null;
source: string | null;
last_inbound_at: string | null;
}>(
pool,
`SELECT
cr.read_through,
cr.read_at,
cr.source,
(SELECT MAX(m.sent_at) FROM macsync.messages m
WHERE m.device_id = $1 AND m.from_handle = $2 AND m.is_from_me = false) AS last_inbound_at
FROM (SELECT 1) one
LEFT JOIN macsync.conversation_read cr
ON cr.device_id = $1 AND cr.handle = $2`,
[deviceId, handle],
);
const readThrough = row?.read_through ?? null;
const lastInboundAt = row?.last_inbound_at ?? null;
const unread =
lastInboundAt !== null &&
(readThrough === null || new Date(lastInboundAt) > new Date(readThrough));
return {
handle,
readThrough,
readAt: row?.read_at ?? null,
source: (row?.source as ReadSource | null) ?? null,
lastInboundAt,
unread,
};
} catch (err) {
throw wrap(err, 'getReadState');
}
}

View file

@ -0,0 +1,36 @@
import type { Migration } from '@/shared/db';
/**
* Logical read-state (Handoff 04). There is **no safe public macOS API** to mark
* a Messages conversation read, and writing `is_read`/`date_read` into Apple's
* live `chat.db` risks corruption so macsync tracks "handled/read" in its OWN
* store and the inbox/app reads from here. Apple's database is never touched.
*
* A handle is "read through" a timestamp: it is considered read iff it has no
* inbound message newer than `read_through`. A later inbound naturally
* re-surfaces it as unread. `read_through` only ever advances (GREATEST on
* upsert), so re-marking is an idempotent no-op.
*/
export const conversationReadMigrations: readonly Migration[] = [
{
id: '2026-06-28_conversation_read_initial',
sql: `
CREATE TABLE IF NOT EXISTS macsync.conversation_read (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
device_id UUID NOT NULL REFERENCES macsync.devices(id) ON DELETE CASCADE,
-- The lead's handle (phone/email), matching outbox.recipient and the
-- inbound messages.from_handle the consumer-facing thread identity.
handle TEXT NOT NULL,
read_through TIMESTAMPTZ NOT NULL,
-- manual | outbox_send | all_replied provenance for tuning/audit.
source TEXT NOT NULL DEFAULT 'manual',
read_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(device_id, handle)
);
CREATE INDEX IF NOT EXISTS idx_conv_read_device
ON macsync.conversation_read(device_id);
`,
},
];

View file

@ -0,0 +1,20 @@
export type ReadSource = 'manual' | 'outbox_send' | 'all_replied';
export interface MarkReadInput {
readonly deviceId: string;
readonly handle: string;
/** Mark read through this instant (ISO). Defaults to "now" at the repo layer. */
readonly readThrough?: string | null;
readonly source?: ReadSource;
}
/** The read-state of one handle, with whether it currently has unread inbound. */
export interface ConversationReadState {
readonly handle: string;
readonly readThrough: string | null;
readonly readAt: string | null;
readonly source: ReadSource | null;
readonly lastInboundAt: string | null;
/** true when there is inbound newer than `read_through` (or never marked read). */
readonly unread: boolean;
}

View file

@ -0,0 +1,6 @@
export {
markConversationRead,
markAllReplied,
conversationReadState,
type MarkReadOptions,
} from './service';

View file

@ -0,0 +1,65 @@
import { getDb } from '@/shared/db';
import {
markRead,
markAllRepliedRead,
getReadState,
type ConversationReadState,
type ReadSource,
} from '@/entities/conversation-read';
export interface MarkReadOptions {
/** Mark read through this instant (ISO); defaults to "now". */
readThrough?: string | null;
source?: ReadSource;
}
function wrap(err: unknown, op: string): Error {
return new Error(`read/${op} failed: ${err instanceof Error ? err.message : String(err)}`, {
cause: err,
});
}
/**
* Mark a single conversation (by the lead's handle) read. Logical-read model
* nothing is written to Apple's chat.db; the inbox reads this state. Idempotent.
*/
export async function markConversationRead(
deviceId: string,
handle: string,
opts: MarkReadOptions = {},
): Promise<{ handle: string; readThrough: string }> {
try {
const { readThrough } = await markRead(getDb(), {
deviceId,
handle,
readThrough: opts.readThrough ?? null,
source: opts.source ?? 'manual',
});
return { handle, readThrough };
} catch (err) {
throw wrap(err, 'markConversationRead');
}
}
/** Mark every replied-to thread read. Returns the handles affected. */
export async function markAllReplied(
deviceId: string,
): Promise<{ marked: number; handles: readonly string[] }> {
try {
const handles = await markAllRepliedRead(getDb(), deviceId);
return { marked: handles.length, handles };
} catch (err) {
throw wrap(err, 'markAllReplied');
}
}
export async function conversationReadState(
deviceId: string,
handle: string,
): Promise<ConversationReadState> {
try {
return await getReadState(getDb(), deviceId, handle);
} catch (err) {
throw wrap(err, 'conversationReadState');
}
}

View file

@ -0,0 +1,44 @@
import { Hono } from 'hono';
import { z } from 'zod';
import { markConversationRead, markAllReplied, conversationReadState } from '@/features/read';
const markSchema = z.object({
deviceId: z.string().uuid(),
handle: z.string().min(1),
/** Mark read through this instant; defaults to now. */
readThrough: z.string().datetime().optional(),
source: z.enum(['manual', 'outbox_send', 'all_replied']).optional(),
});
const allRepliedSchema = z.object({ deviceId: z.string().uuid() });
const deviceIdParam = z.string().uuid();
/**
* Logical conversation read-state (Handoff 04). `markRead`/`markAllRepliedRead`
* for an app/UI to flag handled threads. Never touches Apple's chat.db.
*/
export const readMyRouter = new Hono()
.post('/', async (c) => {
const parsed = markSchema.safeParse(await c.req.json().catch(() => null));
if (!parsed.success) {
return c.json({ success: false, error: { code: 'validation_error', issues: parsed.error.flatten() } }, 400);
}
const { deviceId, handle, readThrough, source } = parsed.data;
const data = await markConversationRead(deviceId, handle, { readThrough, source });
return c.json({ success: true, data });
})
.post('/all-replied', async (c) => {
const parsed = allRepliedSchema.safeParse(await c.req.json().catch(() => null));
if (!parsed.success) {
return c.json({ success: false, error: { code: 'validation_error', issues: parsed.error.flatten() } }, 400);
}
const data = await markAllReplied(parsed.data.deviceId);
return c.json({ success: true, data });
})
.get('/:handle', async (c) => {
const deviceId = deviceIdParam.parse(c.req.query('deviceId') ?? '');
const handle = z.string().min(1).parse(c.req.param('handle'));
const data = await conversationReadState(deviceId, handle);
return c.json({ success: true, data });
});

View file

@ -0,0 +1,123 @@
/**
* Unit tests for the conversation-read repo SQL (Handoff 04 logical-read
* model). Capturing FakePool, no Postgres.
*
* NOTE: like outbox.repo.test.ts / calls.repo.test.ts this imports
* `@/shared/db`, which pulls the private `@lilith/quinn-db-pg` package run
* where that registry is reachable (`bun install && bun test
* src/test/conversation-read.repo.test.ts`).
*/
import { describe, expect, test } from 'bun:test';
import type { Pool } from 'pg';
import { markRead, markAllRepliedRead, getReadState } from '@/entities/conversation-read/repo';
interface Captured {
sql: string;
params: unknown[];
}
class FakePool {
calls: Captured[] = [];
private readonly rows: Record<string, unknown>[];
constructor(rows: Record<string, unknown>[] = []) {
this.rows = rows;
}
async query<T = Record<string, unknown>>(sql: string, params: unknown[] = []) {
this.calls.push({ sql, params });
return { rows: this.rows as unknown as T[] };
}
}
describe('markRead', () => {
test('upserts with monotonic GREATEST read_through and defaults to now', async () => {
const pool = new FakePool([{ read_through: '2026-06-28T12:00:00.000Z' }]);
const res = await markRead(pool as unknown as Pool, {
deviceId: 'dev-1',
handle: '+14155550000',
});
expect(res).toEqual({ readThrough: '2026-06-28T12:00:00.000Z' });
const { sql, params } = pool.calls[0];
expect(sql).toContain('ON CONFLICT (device_id, handle) DO UPDATE');
expect(sql).toContain('GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through)');
expect(sql).toContain('COALESCE($3::timestamptz, now())');
// readThrough omitted → null (COALESCE falls back to now()); source default manual.
expect(params).toEqual(['dev-1', '+14155550000', null, 'manual']);
});
test('passes explicit readThrough and source through', async () => {
const pool = new FakePool([{ read_through: '2026-06-28T09:00:00.000Z' }]);
await markRead(pool as unknown as Pool, {
deviceId: 'dev-1',
handle: 'a@b.com',
readThrough: '2026-06-28T09:00:00.000Z',
source: 'outbox_send',
});
expect(pool.calls[0].params).toEqual(['dev-1', 'a@b.com', '2026-06-28T09:00:00.000Z', 'outbox_send']);
});
});
describe('markAllRepliedRead', () => {
test('marks threads whose latest message is outbound and returns affected handles', async () => {
const pool = new FakePool([{ handle: '+14155550000' }, { handle: 'a@b.com' }]);
const handles = await markAllRepliedRead(pool as unknown as Pool, 'dev-1');
expect(handles).toEqual(['+14155550000', 'a@b.com']);
const { sql, params } = pool.calls[0];
expect(sql).toContain('WHERE latest.is_from_me = true');
expect(sql).toContain("source = 'all_replied'");
expect(sql).toContain('GREATEST(macsync.conversation_read.read_through, EXCLUDED.read_through)');
expect(params).toEqual(['dev-1']);
});
});
describe('getReadState', () => {
test('unread when inbound is newer than read_through', async () => {
const pool = new FakePool([
{
read_through: '2026-06-28T10:00:00.000Z',
read_at: '2026-06-28T10:00:00.000Z',
source: 'manual',
last_inbound_at: '2026-06-28T11:00:00.000Z',
},
]);
const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1');
expect(st.unread).toBe(true);
expect(st.readThrough).toBe('2026-06-28T10:00:00.000Z');
});
test('read when read_through is at/after the last inbound', async () => {
const pool = new FakePool([
{
read_through: '2026-06-28T12:00:00.000Z',
read_at: '2026-06-28T12:00:00.000Z',
source: 'manual',
last_inbound_at: '2026-06-28T11:00:00.000Z',
},
]);
const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1');
expect(st.unread).toBe(false);
});
test('no inbound at all → read (nothing to handle)', async () => {
const pool = new FakePool([
{ read_through: null, read_at: null, source: null, last_inbound_at: null },
]);
const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1');
expect(st.unread).toBe(false);
expect(st.readThrough).toBeNull();
});
test('inbound exists but never marked read → unread', async () => {
const pool = new FakePool([
{
read_through: null,
read_at: null,
source: null,
last_inbound_at: '2026-06-28T11:00:00.000Z',
},
]);
const st = await getReadState(pool as unknown as Pool, 'dev-1', '+1');
expect(st.unread).toBe(true);
});
});