lilith-platform.live/codebase/@features/api/scripts/backfill-pii-extraction.ts
autocommit 19ea7a6c29 chore(api): 🔧 Update API security headers to enforce stricter CORS and HSTS policies
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-16 21:13:26 -07:00

330 lines
11 KiB
TypeScript

#!/usr/bin/env bun
// Backfill PII extractions for historical inbound messages.
// Runs regex tier on every qualifying message; LLM tier respects the same
// throttle as the real-time processor unless --force is passed.
//
// Required env: QUINN_DB_URL, QUINN_MACSYNC_DB_URL
// Optional env: MODEL_BOSS_URL, MODEL_BOSS_API_KEY, MODEL_BOSS_MODEL
//
// Usage:
// bun run scripts/backfill-pii-extraction.ts [--limit N] [--dry-run] [--force]
import { openDb, openIcloudDb } from '@/shared/db';
import * as contactRelationshipRepo from '@/entities/contact-relationship/repo';
import * as clientPiiRepo from '@/entities/client-pii-extraction/repo';
import { logger } from '@/shared/logger';
import { extractFromBody } from '@/processors/pii-extractor/regex-tier';
import { runLlmTier, createModelBossClient, type LlmExtractionMessage } from '@/processors/pii-extractor/llm-tier';
import { applyHysteresis } from '@/processors/pii-extractor/relationship-kind';
import { shouldRunLlmTier } from '@/processors/pii-extractor/throttle';
import type { RelationshipKind } from '@/entities/client-pii-extraction/types';
const args = process.argv.slice(2);
const limitArg = args.find((a) => a.startsWith('--limit='))?.split('=')[1] ?? args[args.indexOf('--limit') + 1];
const limit = limitArg ? parseInt(limitArg, 10) : 500;
const dryRun = args.includes('--dry-run');
const force = args.includes('--force');
const CONFIDENCE_THRESHOLD = 0.5;
interface InboundMessageRow {
rowid: number;
text: string;
chat_identifier: string;
date: number;
}
interface ClientPiiRow {
id: number;
display_name: string | null;
display_name_override: string | null;
relationship_kind: string | null;
relationship_kind_confidence: number | null;
relationship_kind_streak: number | null;
pii_extraction_at: string | null;
introduced_as_name: string | null;
}
async function main(): Promise<void> {
const quinnUrl = process.env['QUINN_DB_URL'];
const icloudUrl = process.env['QUINN_MACSYNC_DB_URL'];
if (!quinnUrl) throw new Error('QUINN_DB_URL required');
if (!icloudUrl) throw new Error('QUINN_MACSYNC_DB_URL required');
const quinnSql = openDb(quinnUrl);
const icloudSql = openIcloudDb(icloudUrl);
logger.info('backfill-pii-extraction starting', { limit, dryRun, force });
// Fetch recent inbound messages from iCloud DB
const messages = await icloudSql<InboundMessageRow[]>`
SELECT m.rowid, m.text, c.chat_identifier, m.date
FROM message m
JOIN chat_message_join cmj ON cmj.message_id = m.rowid
JOIN chat c ON c.rowid = cmj.chat_id
WHERE m.is_from_me = 0
AND m.text IS NOT NULL
AND length(trim(m.text)) > 0
ORDER BY m.date DESC
LIMIT ${limit}
`;
logger.info('backfill-pii-extraction: fetched messages', { count: messages.length });
let regexHits = 0;
let llmRuns = 0;
let skipped = 0;
let failed = 0;
const modelBoss = process.env['MODEL_BOSS_URL']
? createModelBossClient({
MODEL_BOSS_URL: process.env['MODEL_BOSS_URL'],
MODEL_BOSS_API_KEY: process.env['MODEL_BOSS_API_KEY'],
MODEL_BOSS_MODEL: process.env['MODEL_BOSS_MODEL'] ?? 'auto',
})
: null;
for (const msg of messages) {
const handle = msg.chat_identifier;
const relationship = await contactRelationshipRepo.findByHandleChannel(quinnSql, handle, 'imessage');
if (!relationship) {
skipped++;
continue;
}
const clientId = relationship.clientId;
const messageId = String(msg.rowid);
const body = msg.text;
// Regex tier
const regexExtractions = extractFromBody(body);
for (const extraction of regexExtractions) {
if (extraction.confidence < CONFIDENCE_THRESHOLD) continue;
regexHits++;
if (!dryRun) {
await clientPiiRepo.supersedeField(quinnSql, clientId, extraction.field);
await clientPiiRepo.insert(quinnSql, {
clientId,
field: extraction.field,
value: extraction.value,
confidence: extraction.confidence,
sourceMessageId: messageId,
source: 'regex',
});
await quinnSql`
UPDATE clients
SET introduced_as_name = ${extraction.value},
introduced_at = to_timestamp(978307200 + ${msg.date}::bigint / 1000000000),
introduction_source_message_id = ${messageId},
pii_extraction_at = now(),
contact_render_dirty = true,
updated_at = now()
WHERE id = ${clientId}
AND (introduced_as_name IS NULL OR introduced_as_name != ${extraction.value})
`;
}
logger.info('backfill-pii-extraction: regex hit', {
clientId,
field: extraction.field,
value: extraction.value,
confidence: extraction.confidence,
dryRun,
});
}
// LLM tier
if (!modelBoss) continue;
const clientRows = await quinnSql<ClientPiiRow[]>`
SELECT id, display_name, display_name_override, relationship_kind,
relationship_kind_confidence, relationship_kind_streak, pii_extraction_at,
introduced_as_name
FROM clients WHERE id = ${clientId}
`;
const clientRow = clientRows[0];
if (!clientRow) continue;
const needsLlm = force || shouldRunLlmTier({
displayName: clientRow.display_name,
displayNameOverride: clientRow.display_name_override,
relationshipKind: clientRow.relationship_kind,
piiExtractionAt: clientRow.pii_extraction_at,
} as Parameters<typeof shouldRunLlmTier>[0]);
if (!needsLlm || body.length < 30) continue;
// Fetch conversation context
const contextMessages = await icloudSql<Array<{ text: string; is_from_me: number; date: number }>>`
SELECT m.text, m.is_from_me, m.date
FROM message m
JOIN chat_message_join cmj ON cmj.message_id = m.rowid
JOIN chat c ON c.rowid = cmj.chat_id
WHERE c.chat_identifier = ${handle}
AND m.text IS NOT NULL
AND length(trim(m.text)) > 0
ORDER BY m.date DESC
LIMIT 20
`;
const llmMessages: LlmExtractionMessage[] = contextMessages
.reverse()
.map((r) => ({
body: r.text,
isFromMe: r.is_from_me === 1,
sentAt: new Date(978307200000 + Math.floor(r.date / 1e6)).toISOString(),
}));
llmRuns++;
let llmResult;
try {
llmResult = await runLlmTier(modelBoss, llmMessages);
} catch (err) {
logger.error('backfill-pii-extraction: LLM failed', { clientId, error: String(err) });
failed++;
continue;
}
logger.info('backfill-pii-extraction: LLM complete', {
clientId,
relationshipKind: llmResult.relationshipKind,
dryRun,
});
if (dryRun) continue;
const llmFields = [
{ field: 'name' as const, value: llmResult.name, confidence: llmResult.nameConfidence },
{ field: 'location' as const, value: llmResult.location, confidence: llmResult.locationConfidence },
{ field: 'organization' as const, value: llmResult.organization, confidence: llmResult.organizationConfidence },
{ field: 'role' as const, value: llmResult.role, confidence: llmResult.roleConfidence },
];
for (const { field, value, confidence } of llmFields) {
if (value === null || confidence < CONFIDENCE_THRESHOLD) continue;
await clientPiiRepo.supersedeField(quinnSql, clientId, field);
await clientPiiRepo.insert(quinnSql, {
clientId,
field,
value,
confidence,
sourceMessageId: messageId,
source: 'llm',
});
}
if (llmResult.referencesToOthers.length > 0) {
await clientPiiRepo.supersedeField(quinnSql, clientId, 'references_to_others');
await clientPiiRepo.insert(quinnSql, {
clientId,
field: 'references_to_others',
value: JSON.stringify(llmResult.referencesToOthers),
confidence: 0.8,
sourceMessageId: messageId,
source: 'llm',
});
}
if (llmResult.relationshipKindConfidence >= CONFIDENCE_THRESHOLD) {
await clientPiiRepo.supersedeField(quinnSql, clientId, 'relationship_kind');
await clientPiiRepo.insert(quinnSql, {
clientId,
field: 'relationship_kind',
value: llmResult.relationshipKind,
confidence: llmResult.relationshipKindConfidence,
sourceMessageId: messageId,
source: 'llm',
});
const hysteresis = applyHysteresis(
{
kind: (clientRow.relationship_kind as RelationshipKind | null) ?? null,
confidence: clientRow.relationship_kind_confidence ?? 0,
streak: clientRow.relationship_kind_streak ?? 0,
},
llmResult.relationshipKind,
llmResult.relationshipKindConfidence,
);
if (hysteresis.shouldUpdate) {
await quinnSql`
UPDATE clients
SET relationship_kind = ${hysteresis.nextKind},
relationship_kind_confidence = ${hysteresis.nextConfidence},
relationship_kind_streak = ${hysteresis.nextStreak},
pii_extraction_at = now(),
contact_render_dirty = true,
updated_at = now()
WHERE id = ${clientId}
`;
} else {
await quinnSql`
UPDATE clients
SET relationship_kind_streak = ${hysteresis.nextStreak},
pii_extraction_at = now(),
updated_at = now()
WHERE id = ${clientId}
`;
}
}
if (llmResult.name && llmResult.nameConfidence >= CONFIDENCE_THRESHOLD) {
await quinnSql`
UPDATE clients
SET introduced_as_name = ${llmResult.name},
pii_extraction_at = now(),
contact_render_dirty = true,
updated_at = now()
WHERE id = ${clientId}
AND introduced_as_name IS NULL
`;
}
if (llmResult.location && llmResult.locationConfidence >= CONFIDENCE_THRESHOLD) {
await quinnSql`
UPDATE clients SET extracted_location = ${llmResult.location}, updated_at = now() WHERE id = ${clientId}
`;
}
if (llmResult.organization && llmResult.organizationConfidence >= CONFIDENCE_THRESHOLD) {
await quinnSql`
UPDATE clients SET extracted_organization = ${llmResult.organization}, updated_at = now() WHERE id = ${clientId}
`;
}
if (llmResult.role && llmResult.roleConfidence >= CONFIDENCE_THRESHOLD) {
await quinnSql`
UPDATE clients SET extracted_role = ${llmResult.role}, updated_at = now() WHERE id = ${clientId}
`;
}
if (llmResult.referencesToOthers.length > 0) {
await quinnSql`
UPDATE clients
SET extracted_references = ${JSON.stringify(llmResult.referencesToOthers)}::jsonb,
updated_at = now()
WHERE id = ${clientId}
`;
}
}
logger.info('backfill-pii-extraction done', {
regexHits,
llmRuns,
skipped,
failed,
dryRun,
});
await quinnSql.end();
await icloudSql.end();
}
main().catch((err) => {
logger.error('backfill-pii-extraction fatal', { error: String(err) });
process.exit(1);
});