330 lines
11 KiB
TypeScript
330 lines
11 KiB
TypeScript
#!/usr/bin/env bun
|
|
// Backfill PII extractions for historical inbound messages.
|
|
// Runs regex tier on every qualifying message; LLM tier respects the same
|
|
// throttle as the real-time processor unless --force is passed.
|
|
//
|
|
// Required env: QUINN_DB_URL, QUINN_ICLOUD_DB_URL
|
|
// Optional env: MODEL_BOSS_URL, MODEL_BOSS_API_KEY, MODEL_BOSS_MODEL
|
|
//
|
|
// Usage:
|
|
// bun run scripts/backfill-pii-extraction.ts [--limit N] [--dry-run] [--force]
|
|
|
|
import { openDb, openIcloudDb } from '@/shared/db';
|
|
import * as contactRelationshipRepo from '@/entities/contact-relationship/repo';
|
|
import * as clientPiiRepo from '@/entities/client-pii-extraction/repo';
|
|
import { logger } from '@/shared/logger';
|
|
import { extractFromBody } from '@/processors/pii-extractor/regex-tier';
|
|
import { runLlmTier, createModelBossClient, type LlmExtractionMessage } from '@/processors/pii-extractor/llm-tier';
|
|
import { applyHysteresis } from '@/processors/pii-extractor/relationship-kind';
|
|
import { shouldRunLlmTier } from '@/processors/pii-extractor/throttle';
|
|
import type { RelationshipKind } from '@/entities/client-pii-extraction/types';
|
|
|
|
const args = process.argv.slice(2);
|
|
const limitArg = args.find((a) => a.startsWith('--limit='))?.split('=')[1] ?? args[args.indexOf('--limit') + 1];
|
|
const limit = limitArg ? parseInt(limitArg, 10) : 500;
|
|
const dryRun = args.includes('--dry-run');
|
|
const force = args.includes('--force');
|
|
|
|
const CONFIDENCE_THRESHOLD = 0.5;
|
|
|
|
interface InboundMessageRow {
|
|
rowid: number;
|
|
text: string;
|
|
chat_identifier: string;
|
|
date: number;
|
|
}
|
|
|
|
interface ClientPiiRow {
|
|
id: number;
|
|
display_name: string | null;
|
|
display_name_override: string | null;
|
|
relationship_kind: string | null;
|
|
relationship_kind_confidence: number | null;
|
|
relationship_kind_streak: number | null;
|
|
pii_extraction_at: string | null;
|
|
introduced_as_name: string | null;
|
|
}
|
|
|
|
async function main(): Promise<void> {
|
|
const quinnUrl = process.env['QUINN_DB_URL'];
|
|
const icloudUrl = process.env['QUINN_ICLOUD_DB_URL'];
|
|
if (!quinnUrl) throw new Error('QUINN_DB_URL required');
|
|
if (!icloudUrl) throw new Error('QUINN_ICLOUD_DB_URL required');
|
|
|
|
const quinnSql = openDb(quinnUrl);
|
|
const icloudSql = openIcloudDb(icloudUrl);
|
|
|
|
logger.info('backfill-pii-extraction starting', { limit, dryRun, force });
|
|
|
|
// Fetch recent inbound messages from iCloud DB
|
|
const messages = await icloudSql<InboundMessageRow[]>`
|
|
SELECT m.rowid, m.text, c.chat_identifier, m.date
|
|
FROM message m
|
|
JOIN chat_message_join cmj ON cmj.message_id = m.rowid
|
|
JOIN chat c ON c.rowid = cmj.chat_id
|
|
WHERE m.is_from_me = 0
|
|
AND m.text IS NOT NULL
|
|
AND length(trim(m.text)) > 0
|
|
ORDER BY m.date DESC
|
|
LIMIT ${limit}
|
|
`;
|
|
|
|
logger.info('backfill-pii-extraction: fetched messages', { count: messages.length });
|
|
|
|
let regexHits = 0;
|
|
let llmRuns = 0;
|
|
let skipped = 0;
|
|
let failed = 0;
|
|
|
|
const modelBoss = process.env['MODEL_BOSS_URL']
|
|
? createModelBossClient({
|
|
MODEL_BOSS_URL: process.env['MODEL_BOSS_URL'],
|
|
MODEL_BOSS_API_KEY: process.env['MODEL_BOSS_API_KEY'],
|
|
MODEL_BOSS_MODEL: process.env['MODEL_BOSS_MODEL'] ?? 'auto',
|
|
})
|
|
: null;
|
|
|
|
for (const msg of messages) {
|
|
const handle = msg.chat_identifier;
|
|
|
|
const relationship = await contactRelationshipRepo.findByHandleChannel(quinnSql, handle, 'imessage');
|
|
if (!relationship) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
|
|
const clientId = relationship.clientId;
|
|
const messageId = String(msg.rowid);
|
|
const body = msg.text;
|
|
|
|
// Regex tier
|
|
const regexExtractions = extractFromBody(body);
|
|
for (const extraction of regexExtractions) {
|
|
if (extraction.confidence < CONFIDENCE_THRESHOLD) continue;
|
|
regexHits++;
|
|
|
|
if (!dryRun) {
|
|
await clientPiiRepo.supersedeField(quinnSql, clientId, extraction.field);
|
|
await clientPiiRepo.insert(quinnSql, {
|
|
clientId,
|
|
field: extraction.field,
|
|
value: extraction.value,
|
|
confidence: extraction.confidence,
|
|
sourceMessageId: messageId,
|
|
source: 'regex',
|
|
});
|
|
|
|
await quinnSql`
|
|
UPDATE clients
|
|
SET introduced_as_name = ${extraction.value},
|
|
introduced_at = to_timestamp(978307200 + ${msg.date}::bigint / 1000000000),
|
|
introduction_source_message_id = ${messageId},
|
|
pii_extraction_at = now(),
|
|
contact_render_dirty = true,
|
|
updated_at = now()
|
|
WHERE id = ${clientId}
|
|
AND (introduced_as_name IS NULL OR introduced_as_name != ${extraction.value})
|
|
`;
|
|
}
|
|
|
|
logger.info('backfill-pii-extraction: regex hit', {
|
|
clientId,
|
|
field: extraction.field,
|
|
value: extraction.value,
|
|
confidence: extraction.confidence,
|
|
dryRun,
|
|
});
|
|
}
|
|
|
|
// LLM tier
|
|
if (!modelBoss) continue;
|
|
|
|
const clientRows = await quinnSql<ClientPiiRow[]>`
|
|
SELECT id, display_name, display_name_override, relationship_kind,
|
|
relationship_kind_confidence, relationship_kind_streak, pii_extraction_at,
|
|
introduced_as_name
|
|
FROM clients WHERE id = ${clientId}
|
|
`;
|
|
const clientRow = clientRows[0];
|
|
if (!clientRow) continue;
|
|
|
|
const needsLlm = force || shouldRunLlmTier({
|
|
displayName: clientRow.display_name,
|
|
displayNameOverride: clientRow.display_name_override,
|
|
relationshipKind: clientRow.relationship_kind,
|
|
piiExtractionAt: clientRow.pii_extraction_at,
|
|
} as Parameters<typeof shouldRunLlmTier>[0]);
|
|
|
|
if (!needsLlm || body.length < 30) continue;
|
|
|
|
// Fetch conversation context
|
|
const contextMessages = await icloudSql<Array<{ text: string; is_from_me: number; date: number }>>`
|
|
SELECT m.text, m.is_from_me, m.date
|
|
FROM message m
|
|
JOIN chat_message_join cmj ON cmj.message_id = m.rowid
|
|
JOIN chat c ON c.rowid = cmj.chat_id
|
|
WHERE c.chat_identifier = ${handle}
|
|
AND m.text IS NOT NULL
|
|
AND length(trim(m.text)) > 0
|
|
ORDER BY m.date DESC
|
|
LIMIT 20
|
|
`;
|
|
|
|
const llmMessages: LlmExtractionMessage[] = contextMessages
|
|
.reverse()
|
|
.map((r) => ({
|
|
body: r.text,
|
|
isFromMe: r.is_from_me === 1,
|
|
sentAt: new Date(978307200000 + Math.floor(r.date / 1e6)).toISOString(),
|
|
}));
|
|
|
|
llmRuns++;
|
|
|
|
let llmResult;
|
|
try {
|
|
llmResult = await runLlmTier(modelBoss, llmMessages);
|
|
} catch (err) {
|
|
logger.error('backfill-pii-extraction: LLM failed', { clientId, error: String(err) });
|
|
failed++;
|
|
continue;
|
|
}
|
|
|
|
logger.info('backfill-pii-extraction: LLM complete', {
|
|
clientId,
|
|
relationshipKind: llmResult.relationshipKind,
|
|
dryRun,
|
|
});
|
|
|
|
if (dryRun) continue;
|
|
|
|
const llmFields = [
|
|
{ field: 'name' as const, value: llmResult.name, confidence: llmResult.nameConfidence },
|
|
{ field: 'location' as const, value: llmResult.location, confidence: llmResult.locationConfidence },
|
|
{ field: 'organization' as const, value: llmResult.organization, confidence: llmResult.organizationConfidence },
|
|
{ field: 'role' as const, value: llmResult.role, confidence: llmResult.roleConfidence },
|
|
];
|
|
|
|
for (const { field, value, confidence } of llmFields) {
|
|
if (value === null || confidence < CONFIDENCE_THRESHOLD) continue;
|
|
await clientPiiRepo.supersedeField(quinnSql, clientId, field);
|
|
await clientPiiRepo.insert(quinnSql, {
|
|
clientId,
|
|
field,
|
|
value,
|
|
confidence,
|
|
sourceMessageId: messageId,
|
|
source: 'llm',
|
|
});
|
|
}
|
|
|
|
if (llmResult.referencesToOthers.length > 0) {
|
|
await clientPiiRepo.supersedeField(quinnSql, clientId, 'references_to_others');
|
|
await clientPiiRepo.insert(quinnSql, {
|
|
clientId,
|
|
field: 'references_to_others',
|
|
value: JSON.stringify(llmResult.referencesToOthers),
|
|
confidence: 0.8,
|
|
sourceMessageId: messageId,
|
|
source: 'llm',
|
|
});
|
|
}
|
|
|
|
if (llmResult.relationshipKindConfidence >= CONFIDENCE_THRESHOLD) {
|
|
await clientPiiRepo.supersedeField(quinnSql, clientId, 'relationship_kind');
|
|
await clientPiiRepo.insert(quinnSql, {
|
|
clientId,
|
|
field: 'relationship_kind',
|
|
value: llmResult.relationshipKind,
|
|
confidence: llmResult.relationshipKindConfidence,
|
|
sourceMessageId: messageId,
|
|
source: 'llm',
|
|
});
|
|
|
|
const hysteresis = applyHysteresis(
|
|
{
|
|
kind: (clientRow.relationship_kind as RelationshipKind | null) ?? null,
|
|
confidence: clientRow.relationship_kind_confidence ?? 0,
|
|
streak: clientRow.relationship_kind_streak ?? 0,
|
|
},
|
|
llmResult.relationshipKind,
|
|
llmResult.relationshipKindConfidence,
|
|
);
|
|
|
|
if (hysteresis.shouldUpdate) {
|
|
await quinnSql`
|
|
UPDATE clients
|
|
SET relationship_kind = ${hysteresis.nextKind},
|
|
relationship_kind_confidence = ${hysteresis.nextConfidence},
|
|
relationship_kind_streak = ${hysteresis.nextStreak},
|
|
pii_extraction_at = now(),
|
|
contact_render_dirty = true,
|
|
updated_at = now()
|
|
WHERE id = ${clientId}
|
|
`;
|
|
} else {
|
|
await quinnSql`
|
|
UPDATE clients
|
|
SET relationship_kind_streak = ${hysteresis.nextStreak},
|
|
pii_extraction_at = now(),
|
|
updated_at = now()
|
|
WHERE id = ${clientId}
|
|
`;
|
|
}
|
|
}
|
|
|
|
if (llmResult.name && llmResult.nameConfidence >= CONFIDENCE_THRESHOLD) {
|
|
await quinnSql`
|
|
UPDATE clients
|
|
SET introduced_as_name = ${llmResult.name},
|
|
pii_extraction_at = now(),
|
|
contact_render_dirty = true,
|
|
updated_at = now()
|
|
WHERE id = ${clientId}
|
|
AND introduced_as_name IS NULL
|
|
`;
|
|
}
|
|
|
|
if (llmResult.location && llmResult.locationConfidence >= CONFIDENCE_THRESHOLD) {
|
|
await quinnSql`
|
|
UPDATE clients SET extracted_location = ${llmResult.location}, updated_at = now() WHERE id = ${clientId}
|
|
`;
|
|
}
|
|
|
|
if (llmResult.organization && llmResult.organizationConfidence >= CONFIDENCE_THRESHOLD) {
|
|
await quinnSql`
|
|
UPDATE clients SET extracted_organization = ${llmResult.organization}, updated_at = now() WHERE id = ${clientId}
|
|
`;
|
|
}
|
|
|
|
if (llmResult.role && llmResult.roleConfidence >= CONFIDENCE_THRESHOLD) {
|
|
await quinnSql`
|
|
UPDATE clients SET extracted_role = ${llmResult.role}, updated_at = now() WHERE id = ${clientId}
|
|
`;
|
|
}
|
|
|
|
if (llmResult.referencesToOthers.length > 0) {
|
|
await quinnSql`
|
|
UPDATE clients
|
|
SET extracted_references = ${JSON.stringify(llmResult.referencesToOthers)}::jsonb,
|
|
updated_at = now()
|
|
WHERE id = ${clientId}
|
|
`;
|
|
}
|
|
}
|
|
|
|
logger.info('backfill-pii-extraction done', {
|
|
regexHits,
|
|
llmRuns,
|
|
skipped,
|
|
failed,
|
|
dryRun,
|
|
});
|
|
|
|
await quinnSql.end();
|
|
await icloudSql.end();
|
|
}
|
|
|
|
main().catch((err) => {
|
|
logger.error('backfill-pii-extraction fatal', { error: String(err) });
|
|
process.exit(1);
|
|
});
|