merge batch 5: server surfaces (admin/client/my route registration), my/calendar, my/mail, features/imessage/service typedstream decode

This commit is contained in:
quinn 2026-05-15 18:16:46 -07:00
parent cb92645142
commit 4ba759ba92
11 changed files with 339 additions and 22 deletions

View file

@ -159,9 +159,49 @@ POST /client/imessage/send-queue/:id/result (status='sent'|'failed')
---
## Per-module send-queue endpoints
Modules other than iMessage use the same wire format but live in
module-specific tables (`icloud.<module>_send_queue`), accessed through
the generic `createSendQueueRepo` factory at
`src/shared/sendQueue/SendQueueRepo.ts`. Every module exposes:
```
GET /client/<module>/send-queue/pending
POST /client/<module>/send-queue/:id/result
```
with response shape
```jsonc
{ "success": true, "data": { "items": [ { "id", "action", "payload", "createdAt" } ] } }
```
The `payload` shape is module-specific and validated at enqueue time
against the per-module Zod schema:
| Module | Table | Payload schema | Actions |
|---|---|---|---|
| iCal | `icloud.calendar_send_queue` | `calendarSendPayloadSchema` | `create_event`, `update_event`, `delete_event` |
| iMail | `icloud.mail_send_queue` | `mailSendPayloadSchema` | `send_mail` |
| iNotes | `icloud.note_send_queue` | `noteSendPayloadSchema` | `create_note`, `update_note`, `delete_note` |
| iReminders | `icloud.reminder_send_queue` | `reminderSendPayloadSchema` | `create_reminder`, `update_reminder`, `delete_reminder`, `complete_reminder` |
iMessage retains the legacy `icloud.send_queue` shape (with `to_handle`
+ `body` columns rather than `action` + JSONB `payload`) so existing
`quinn-messenger MCP` callers continue to function. New modules adopt
the cleaner action/payload shape from the start.
## Implementation Location
- **Admin endpoint**: `src/surfaces/admin/send-queue.ts`
- **Client endpoints**: `src/surfaces/client/imessage.ts`
- **Schema**: `src/entities/send-queue/schema.ts`
- **Types**: `src/entities/send-queue/types.ts`
- **Admin endpoint (iMessage)**: `src/surfaces/admin/send-queue.ts`
- **Scheduled-send admin endpoints**: `src/surfaces/admin/scheduled-send.ts`
- **Client endpoints (iMessage)**: `src/surfaces/client/imessage.ts`
- **Client endpoints (iCal)**: `src/surfaces/client/ical.ts`
- **Client endpoints (iMail)**: `src/surfaces/client/imail.ts`
- **Client endpoints (iNotes)**: `src/surfaces/client/inotes.ts`
- **Client endpoints (iReminders)**: `src/surfaces/client/ireminders.ts`
- **Schema (iMessage legacy)**: `src/entities/send-queue/schema.ts`
- **Schema (per-module)**: `src/entities/{calendar,mail,note,reminder}SendItem/schema.ts`
- **Shared repo factory**: `src/shared/sendQueue/SendQueueRepo.ts`
- **Shared types**: `src/entities/send-queue/types.ts`

View file

@ -1,2 +1,8 @@
export { ingestCalendars, ingestEvents, getICalStats, listEventsByFilter, listCalendarsForDevice } from './service';
export type { CalendarPayload, EventPayload, SyncCalendarsPayload, SyncEventsPayload, ICalStats } from './service';
export {
enqueueEventChange,
listPendingCalendarSends,
markCalendarSendResult,
} from './sendService';
export type { EnqueueEventInput } from './sendService';

View file

@ -1,2 +1,8 @@
export type { EmailAddressPayload, EmailPayload, SyncMailPayload } from './service';
export { getImailStats, getLastMailSync, ingestMail, queryMailConversations, queryMailMessages } from './service';
export {
enqueueMailSend,
listPendingMailSends,
markMailSendResult,
} from './sendService';
export type { EnqueueMailInput } from './sendService';

View file

@ -14,6 +14,7 @@ import {
searchMessages,
} from '@/entities/message';
import { upsertAttachment } from '@/entities/attachment';
import { decodeAttributedBody } from '@/shared/typedstream/decode';
import { upsertContact, listContactsByDevice, listAllContacts, countContactsByDevice, updateContactSummary, searchContacts } from '@/entities/contact';
import type { Conversation } from '@/entities/conversation';
import type {
@ -99,7 +100,16 @@ export async function ingestMessages(deviceId: string, payload: SyncMessagesPayl
});
// Assign IDs up front so we can reference them for attachments after the bulk insert
const drafts = payload.messages.map((msg) => ({
const drafts = payload.messages.map((msg) => {
// Server-side typedstream decode: when the client only sent the
// attributedBody blob and `text` is empty, recover plain text and
// persist it in `body_decoded` so search/embedding pipelines see it.
let bodyDecoded: string | null = null;
if ((msg.text ?? '') === '' && msg.attributedBody) {
const decoded = decodeAttributedBody(msg.attributedBody);
bodyDecoded = decoded.text;
}
return {
draft: {
id: randomUUIDv7(),
deviceId,
@ -115,6 +125,7 @@ export async function ingestMessages(deviceId: string, payload: SyncMessagesPayl
deliveredAt: msg.deliveredAt ?? null,
readAt: msg.readAt ?? null,
attributedBody: msg.attributedBody ?? null,
bodyDecoded,
associatedMessageType: msg.associatedMessageType ?? null,
associatedMessageGuid: msg.associatedMessageGuid ?? null,
expressiveSendStyleId: msg.expressiveSendStyleId ?? null,
@ -130,7 +141,8 @@ export async function ingestMessages(deviceId: string, payload: SyncMessagesPayl
senderEmail: msg.senderEmail ?? null,
},
msg,
}));
};
});
const { count: synced, idByExternalId } = await bulkUpsertMessages(pool, drafts.map((d) => d.draft));

View file

@ -1,6 +1,11 @@
import { Hono } from 'hono';
import { calendarSendQueueAdminRouter } from './calendar-send-queue';
import { devicesAdminRouter } from './devices';
import { mailSendQueueAdminRouter } from './mail-send-queue';
import { noteSendQueueAdminRouter } from './note-send-queue';
import { prospectsAdminRouter } from './prospects';
import { reminderSendQueueAdminRouter } from './reminder-send-queue';
import { scheduledSendAdminRouter } from './scheduled-send';
import { sendDirectAdminRouter } from './send-direct';
import { sendQueueAdminRouter } from './send-queue';
@ -9,4 +14,9 @@ export const adminSurface = new Hono()
.route('/devices', devicesAdminRouter)
.route('/scheduled-send', scheduledSendAdminRouter)
.route('/send-direct', sendDirectAdminRouter)
.route('/send-queue', sendQueueAdminRouter);
.route('/send-queue', sendQueueAdminRouter)
.route('/calendar-send-queue', calendarSendQueueAdminRouter)
.route('/reminder-send-queue', reminderSendQueueAdminRouter)
.route('/note-send-queue', noteSendQueueAdminRouter)
.route('/mail-send-queue', mailSendQueueAdminRouter)
.route('/prospects', prospectsAdminRouter);

View file

@ -1,7 +1,13 @@
import { Hono } from 'hono';
import { z } from 'zod';
import { getICalStats, ingestCalendars, ingestEvents } from '@/features/ical';
import {
getICalStats,
ingestCalendars,
ingestEvents,
listPendingCalendarSends,
markCalendarSendResult,
} from '@/features/ical';
import { withSyncHistory } from '@/shared/sync-history';
const calendarSchema = z.object({
@ -43,6 +49,11 @@ const syncEventsSchema = z.object({
events: z.array(eventSchema),
});
const sendResultSchema = z.object({
status: z.enum(['sent', 'failed']),
error: z.string().optional(),
});
export const icalClientRouter = new Hono()
.get('/stats', (c) => {
const stats = getICalStats();
@ -59,4 +70,50 @@ export const icalClientRouter = new Hono()
const payload = syncEventsSchema.parse(await c.req.json());
const result = await withSyncHistory(deviceId, 'calendar', () => ingestEvents(deviceId, payload));
return c.json({ success: true, data: result });
})
.get('/send-queue/pending', async (c) => {
try {
const deviceId = c.get('deviceId');
const items = await listPendingCalendarSends(deviceId);
return c.json({ success: true, data: { items } });
} catch (err) {
return c.json(
{
success: false,
error: {
code: 'send_queue_error',
message: err instanceof Error ? err.message : 'failed to fetch pending calendar sends',
},
},
500,
);
}
})
.post('/send-queue/:id/result', async (c) => {
try {
const id = c.req.param('id');
const body = sendResultSchema.parse(await c.req.json());
await markCalendarSendResult({ id, status: body.status, error: body.error ?? null });
return c.json({ success: true, data: {} });
} catch (err) {
if (err instanceof z.ZodError) {
return c.json(
{
success: false,
error: { code: 'validation_error', message: 'invalid request payload', issues: err.issues },
},
400,
);
}
return c.json(
{
success: false,
error: {
code: 'send_queue_error',
message: err instanceof Error ? err.message : 'failed to mark calendar send result',
},
},
500,
);
}
});

View file

@ -1,9 +1,19 @@
import { Hono } from 'hono';
import { z } from 'zod';
import { getImailStats, ingestMail } from '@/features/imail';
import {
getImailStats,
ingestMail,
listPendingMailSends,
markMailSendResult,
} from '@/features/imail';
import { withSyncHistory } from '@/shared/sync-history';
const sendResultSchema = z.object({
status: z.enum(['sent', 'failed']),
error: z.string().optional(),
});
const emailAddressSchema = z.object({
address: z.string().min(1),
name: z.string().nullish(),
@ -43,4 +53,50 @@ export const imailClientRouter = new Hono()
const payload = syncMailSchema.parse(await c.req.json());
const result = await withSyncHistory(deviceId, 'mail', () => ingestMail(deviceId, payload));
return c.json({ success: true, data: result });
})
.get('/send-queue/pending', async (c) => {
try {
const deviceId = c.get('deviceId');
const items = await listPendingMailSends(deviceId);
return c.json({ success: true, data: { items } });
} catch (err) {
return c.json(
{
success: false,
error: {
code: 'send_queue_error',
message: err instanceof Error ? err.message : 'failed to fetch pending mail sends',
},
},
500,
);
}
})
.post('/send-queue/:id/result', async (c) => {
try {
const id = c.req.param('id');
const body = sendResultSchema.parse(await c.req.json());
await markMailSendResult({ id, status: body.status, error: body.error ?? null });
return c.json({ success: true, data: {} });
} catch (err) {
if (err instanceof z.ZodError) {
return c.json(
{
success: false,
error: { code: 'validation_error', message: 'invalid request payload', issues: err.issues },
},
400,
);
}
return c.json(
{
success: false,
error: {
code: 'send_queue_error',
message: err instanceof Error ? err.message : 'failed to mark mail send result',
},
},
500,
);
}
});

View file

@ -1,10 +1,12 @@
import { Hono } from 'hono';
import { devicesClientRouter } from './devices';
import { imessageClientRouter } from './imessage';
import { iphotoClientRouter } from './iphoto';
import { imailClientRouter } from './imail';
import { icalClientRouter } from './ical';
import { imailClientRouter } from './imail';
import { imessageClientRouter } from './imessage';
import { inotesClientRouter } from './inotes';
import { iphotoClientRouter } from './iphoto';
import { iremindersClientRouter } from './ireminders';
import { syncClientRouter } from './sync';
export const clientSurface = new Hono()
@ -13,4 +15,6 @@ export const clientSurface = new Hono()
.route('/iphoto', iphotoClientRouter)
.route('/imail', imailClientRouter)
.route('/ical', icalClientRouter)
.route('/ireminders', iremindersClientRouter)
.route('/inotes', inotesClientRouter)
.route('/sync', syncClientRouter);

View file

@ -1,20 +1,58 @@
import { Hono } from 'hono';
import type { Context } from 'hono';
import { z } from 'zod';
import { calendarSendPayloadSchema } from '@/entities/calendarSendItem';
import { getICalStats, listCalendarsForDevice, listEventsByFilter } from '@/features/ical';
import { enqueueEventChange } from '@/features/ical/sendService';
const limitParam = z.coerce.number().int().positive().max(1000).default(200);
const deviceIdParam = z.string().min(1).optional();
const dateParam = z.string().optional();
// Auth follow-up: deviceId is currently passed as a query/body param. Once
// SSO session resolution lands, this surface should derive the target
// deviceId from the session and drop the explicit parameter.
const requiredDeviceId = z.object({ deviceId: z.string().min(1) });
const createEventSchema = calendarSendPayloadSchema.extend({
deviceId: z.string().min(1),
});
const updateEventSchema = calendarSendPayloadSchema.extend({
deviceId: z.string().min(1),
});
function errorResponse(c: Context, err: unknown, fallback: string) {
if (err instanceof z.ZodError) {
return c.json(
{
success: false,
error: { code: 'validation_error', message: 'invalid request payload', issues: err.issues },
},
400,
);
}
return c.json(
{
success: false,
error: { code: 'enqueue_error', message: err instanceof Error ? err.message : fallback },
},
500,
);
}
export const calendarMyRouter = new Hono()
.get('/stats', async (_c) => {
return _c.json({ success: true, data: await getICalStats() });
.get('/stats', async (c) => {
return c.json({ success: true, data: await getICalStats() });
})
.get('/calendars', async (c) => {
const deviceId = deviceIdParam.parse(c.req.query('deviceId'));
if (!deviceId) return c.json({ success: true, data: { calendars: [] } });
return c.json({ success: true, data: { calendars: await listCalendarsForDevice(deviceId) } });
return c.json({
success: true,
data: { calendars: await listCalendarsForDevice(deviceId) },
});
})
.get('/events', async (c) => {
const deviceId = deviceIdParam.parse(c.req.query('deviceId'));
@ -22,8 +60,56 @@ export const calendarMyRouter = new Hono()
const startBefore = dateParam.parse(c.req.query('startBefore'));
const calendarId = z.string().optional().parse(c.req.query('calendarId'));
const limit = limitParam.parse(c.req.query('limit') ?? '200');
const offset = z.coerce.number().int().nonnegative().default(0).parse(c.req.query('offset') ?? '0');
const offset = z.coerce
.number()
.int()
.nonnegative()
.default(0)
.parse(c.req.query('offset') ?? '0');
const events = await listEventsByFilter({ deviceId, startAfter, startBefore, calendarId, limit, offset });
const events = await listEventsByFilter({
deviceId,
startAfter,
startBefore,
calendarId,
limit,
offset,
});
return c.json({ success: true, data: { events } });
})
.post('/events', async (c) => {
try {
const body = createEventSchema.parse(await c.req.json());
const { deviceId, ...payload } = body;
const { id } = await enqueueEventChange({ deviceId, action: 'create_event', payload });
return c.json({ success: true, data: { sendQueueId: id } });
} catch (err) {
return errorResponse(c, err, 'failed to enqueue create_event');
}
})
.put('/events/:id', async (c) => {
try {
const eventIdentifier = c.req.param('id');
const body = updateEventSchema.parse(await c.req.json());
const { deviceId, ...rest } = body;
const payload = { ...rest, eventIdentifier };
const { id } = await enqueueEventChange({ deviceId, action: 'update_event', payload });
return c.json({ success: true, data: { sendQueueId: id } });
} catch (err) {
return errorResponse(c, err, 'failed to enqueue update_event');
}
})
.delete('/events/:id', async (c) => {
try {
const eventIdentifier = c.req.param('id');
const { deviceId } = requiredDeviceId.parse({ deviceId: c.req.query('deviceId') });
const { id } = await enqueueEventChange({
deviceId,
action: 'delete_event',
payload: { eventIdentifier },
});
return c.json({ success: true, data: { sendQueueId: id } });
} catch (err) {
return errorResponse(c, err, 'failed to enqueue delete_event');
}
});

View file

@ -1,16 +1,22 @@
import { Hono } from 'hono';
import { messagesMyRouter } from './messages';
import { photosMyRouter } from './photos';
import { mailMyRouter } from './mail';
import { calendarMyRouter } from './calendar';
import { searchMyRouter } from './search';
import { contactsMyRouter } from './contacts';
import { mailMyRouter } from './mail';
import { messagesMyRouter } from './messages';
import { notesMyRouter } from './notes';
import { photosMyRouter } from './photos';
import { prospectsMyRouter } from './prospects';
import { remindersMyRouter } from './reminders';
import { searchMyRouter } from './search';
export const mySurface = new Hono()
.route('/messages', messagesMyRouter)
.route('/photos', photosMyRouter)
.route('/mail', mailMyRouter)
.route('/calendar', calendarMyRouter)
.route('/reminders', remindersMyRouter)
.route('/notes', notesMyRouter)
.route('/search', searchMyRouter)
.route('/contacts', contactsMyRouter);
.route('/contacts', contactsMyRouter)
.route('/prospects', prospectsMyRouter);

View file

@ -1,12 +1,36 @@
import { Hono } from 'hono';
import type { Context } from 'hono';
import { z } from 'zod';
import { mailSendPayloadSchema } from '@/entities/mailSendItem';
import { queryMailConversations, queryMailMessages, getLastMailSync } from '@/features/imail';
import { enqueueMailSend } from '@/features/imail/sendService';
const limitParam = z.coerce.number().int().positive().max(500).default(100);
const sinceParam = z.string().optional();
const convIdParam = z.string().min(1);
const requiredDeviceId = z.object({ deviceId: z.string().min(1) });
function errorResponse(c: Context, err: unknown, fallback: string) {
if (err instanceof z.ZodError) {
return c.json(
{
success: false,
error: { code: 'validation_error', message: 'invalid request payload', issues: err.issues },
},
400,
);
}
return c.json(
{
success: false,
error: { code: 'enqueue_error', message: err instanceof Error ? err.message : fallback },
},
500,
);
}
export const mailMyRouter = new Hono()
.get('/conversations', async (c) => {
const deviceId = c.req.query('deviceId') ?? undefined;
@ -22,4 +46,14 @@ export const mailMyRouter = new Hono()
const deviceId = c.req.query('deviceId') ?? '';
if (!deviceId) return c.json({ lastSync: null });
return c.json({ lastSync: await getLastMailSync(deviceId) });
})
.post('/send', async (c) => {
try {
const { deviceId } = requiredDeviceId.parse({ deviceId: c.req.query('deviceId') });
const payload = mailSendPayloadSchema.parse(await c.req.json());
const { id } = await enqueueMailSend({ deviceId, action: 'send_mail', payload });
return c.json({ success: true, data: { sendQueueId: id } });
} catch (err) {
return errorResponse(c, err, 'failed to enqueue send_mail');
}
});