lilith-platform.live/codebase/@features/api/src/scripts/backfill-processors.ts
2026-06-08 05:42:55 -07:00

181 lines
6.6 KiB
TypeScript

#!/usr/bin/env bun
/**
* Replays historical macsync.messages through relationship-resolver and geo-inference
* so the pg.quinn CRM is populated from data inserted before processors were live.
*
* Processes ALL inbound messages (including attachment-only) so every sender gets
* a client + contact_relationship record regardless of whether they sent text.
* Geo-inference skips messages with no body automatically.
*
* Usage:
* QUINN_DB_URL=... QUINN_MACSYNC_DB_URL=... bun run scripts/backfill-processors.ts
* bun run scripts/backfill-processors.ts --dry-run
* bun run scripts/backfill-processors.ts --processor=relationship
* bun run scripts/backfill-processors.ts --processor=geo
* bun run scripts/backfill-processors.ts --processor=stats # reconcile inbound counts/recency from macsync
*/
import postgres from 'postgres';
import { logger } from '@/shared/logger';
import { processMessage as resolveRelationship } from '@/processors/relationship-resolver/index';
import { processMessage as inferGeo } from '@/processors/geo-inference/index';
import { processMessage as detectInterest } from '@/processors/interest-detector/index';
import { classifyHandle } from '@/processors/content-classifier/index';
import { reconcileContactStats } from '@/processors/contact-stats-reconciler/index';
const DRY_RUN = process.argv.includes('--dry-run');
// Processors: relationship | geo | interest | content | stats | all
const PROCESSOR = process.argv.find((a) => a.startsWith('--processor='))?.split('=')[1] ?? 'all';
const BATCH_SIZE = 500;
const QUINN_DB_URL = process.env.QUINN_DB_URL ?? 'postgres://quinn:devpassword@localhost:25435/quinn';
const QUINN_MACSYNC_DB_URL = process.env.QUINN_MACSYNC_DB_URL ?? 'postgres://quinn_macsync:devpassword@black.lan:25436/quinn_macsync';
const CONTENT_MODERATOR_URL = process.env.CONTENT_MODERATOR_URL ?? 'http://localhost:3501';
interface MessageRow {
id: string;
from_handle: string;
conversation_id: string;
device_id: string;
is_from_me: boolean;
sent_at: string;
body: string | null;
}
async function main(): Promise<void> {
logger.info('backfill-processors starting', { processor: PROCESSOR, dry_run: DRY_RUN });
const quinnSql = postgres(QUINN_DB_URL, { max: 5 });
const icloudSql = postgres(QUINN_MACSYNC_DB_URL, { max: 5 });
try {
const runMessageLoop = PROCESSOR === 'all' || ['relationship', 'geo', 'interest'].includes(PROCESSOR);
const runContentLoop = PROCESSOR === 'all' || PROCESSOR === 'content';
if (!runMessageLoop) {
// content-only path — jump straight to the handle loop below
} else {
const [{ total }] = await icloudSql<[{ total: number }]>`
SELECT COUNT(*)::int as total
FROM macsync.messages
WHERE NOT is_from_me
`;
logger.info('backfill-processors: total inbound messages', { total });
let processed = 0;
let errors = 0;
let offset = 0;
while (offset < total) {
const rows = await icloudSql<MessageRow[]>`
SELECT id, from_handle, conversation_id, device_id, is_from_me, sent_at::text, COALESCE(NULLIF(body, ''), body_decoded) AS body
FROM macsync.messages
WHERE NOT is_from_me
ORDER BY sent_at ASC
LIMIT ${BATCH_SIZE} OFFSET ${offset}
`;
if (rows.length === 0) break;
for (const row of rows) {
const payload = {
id: row.id,
from_handle: row.from_handle,
conversation_id: row.conversation_id,
device_id: row.device_id,
is_from_me: row.is_from_me,
sent_at: row.sent_at,
body: row.body,
};
try {
if (!DRY_RUN) {
if (PROCESSOR === 'all' || PROCESSOR === 'relationship') {
await resolveRelationship(quinnSql, icloudSql, payload);
}
if (PROCESSOR === 'all' || PROCESSOR === 'geo') {
await inferGeo(quinnSql, icloudSql, payload);
}
if (PROCESSOR === 'all' || PROCESSOR === 'interest') {
await detectInterest(quinnSql, icloudSql, payload);
}
}
processed++;
} catch (err) {
errors++;
logger.error('backfill: message failed', {
id: row.id,
handle: row.from_handle,
error: err instanceof Error ? err.message : String(err),
});
}
}
offset += rows.length;
logger.info('backfill-processors progress', {
offset,
total,
pct: Math.round((offset / total) * 100),
processed,
errors,
});
}
logger.info('backfill-processors message-loop complete', { processed, errors });
} // end runMessageLoop block
if (runContentLoop) {
const handles = await icloudSql<Array<{ from_handle: string }>>`
SELECT DISTINCT from_handle
FROM macsync.messages
WHERE NOT is_from_me AND from_handle IS NOT NULL AND from_handle != ''
ORDER BY from_handle ASC
`;
logger.info('backfill-processors content: unique handles to classify', { count: handles.length });
let cProcessed = 0;
let cErrors = 0;
for (const { from_handle } of handles) {
try {
if (!DRY_RUN) {
await classifyHandle(quinnSql, icloudSql, from_handle, 'imessage', { url: CONTENT_MODERATOR_URL });
}
cProcessed++;
if (cProcessed % 50 === 0) {
logger.info('backfill-processors content progress', {
processed: cProcessed,
total: handles.length,
errors: cErrors,
});
}
} catch (err) {
cErrors++;
logger.error('backfill content: handle failed', {
handle: from_handle,
error: err instanceof Error ? err.message : String(err),
});
}
}
logger.info('backfill-processors content complete', { processed: cProcessed, errors: cErrors });
}
if (PROCESSOR === 'all' || PROCESSOR === 'stats') {
// Reconcile per-client inbound count / first / last contact straight from
// macsync — the historical-replay half the live LISTEN/NOTIFY path lacks.
// Monotonic + idempotent; safe to run after the relationship loop above.
const result = await reconcileContactStats(quinnSql, icloudSql, { dryRun: DRY_RUN });
logger.info('backfill-processors stats complete', { ...result });
}
logger.info('backfill-processors complete');
} finally {
await quinnSql.end();
await icloudSql.end();
}
}
main().catch((err) => {
logger.error('backfill-processors fatal', { error: err instanceof Error ? err.message : String(err) });
process.exit(1);
});