lilith-platform.live/codebase/@features/quinn-ai/engine/scripts/parity-replay.ts

310 lines
11 KiB
TypeScript

/**
* parity-replay.ts — offline harness for the draft pipeline.
*
* For each thread in the corpus, walks messages chronologically and at every
* Quinn-outbound point, builds a DraftRequest from the prior-only prefix and
* asks the live pipeline (model-boss on apricot:8210) for the draft it would
* have produced. Emits one JSONL line per pair to stdout (also written to a
* timestamped file under scripts/parity-runs/).
*
* Companion: parity-score.ts (heuristics + LLM judge) reads the JSONL.
*
* Usage:
* bun run scripts/parity-replay.ts # default corpus, full
* bun run scripts/parity-replay.ts --limit 5 # cap pairs per thread
* bun run scripts/parity-replay.ts --threads <id,id> # specific threads
* bun run scripts/parity-replay.ts --since-days 30 # corpus lookback
*
* Env:
* QUINN_OUTREACH_DB_URL postgres://... (required; same as live engine)
* MODEL_BOSS_URL http://apricot.lan:8210
* QUINN_API_BASE http://apricot.lan:3030 (for tour-context)
* QUINN_API_SERVICE_TOKEN
* QUINN_SELF_HANDLE +14244663669 (filter self-thread)
*
* Output schema (one line per pair):
* {
* thread_id: string,
* prospect_handle: string,
* ts: ISO,
* turn_index: number, // 0-based across is_from_me==true within the thread
* actual: string, // Quinn's real outbound body
* simulated: string, // pipeline draft
* meta: MetaEvaluation,
* brief: DraftBrief,
* elapsed_ms: number,
* error: string | null
* }
*/
import { mkdirSync, createWriteStream } from 'node:fs';
import { join } from 'node:path';
import { Pool } from 'pg';
import { callDraftPipeline, type DraftRequest } from '../src/shared/draft-pipeline-ts';
import { fetchTourPlannedLocations } from '../src/shared/tour-context';
import { fetchQuinnVoiceSamples } from '../src/shared/quinn-voice-samples';
import { fetchQuinnReplyExemplars, type QuinnExemplar } from '../src/shared/quinn-exemplars';
interface ThreadMsgRow {
sent_at: Date;
is_from_me: boolean;
from_handle: string;
body: string;
}
interface CorpusThread {
conversation_id: string;
prospect_handle: string;
quinn_out: number;
them_in: number;
}
interface CliArgs {
limitPerThread: number | null;
threadIds: readonly string[] | null;
prospectHandles: readonly string[] | null;
sinceDays: number;
outDir: string;
}
function parseArgs(argv: readonly string[]): CliArgs {
const args: CliArgs = {
limitPerThread: null,
threadIds: null,
prospectHandles: null,
sinceDays: 120,
outDir: join(import.meta.dirname ?? '.', 'parity-runs'),
};
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '--limit') {
const v = argv[++i];
if (!v) throw new Error('--limit requires value');
args.limitPerThread = Number.parseInt(v, 10);
} else if (a === '--threads') {
const v = argv[++i];
if (!v) throw new Error('--threads requires value');
args.threadIds = v.split(',').map((s) => s.trim()).filter(Boolean);
} else if (a === '--prospect-handles') {
const v = argv[++i];
if (!v) throw new Error('--prospect-handles requires value');
args.prospectHandles = v.split(',').map((s) => s.trim()).filter(Boolean);
} else if (a === '--since-days') {
const v = argv[++i];
if (!v) throw new Error('--since-days requires value');
args.sinceDays = Number.parseInt(v, 10);
} else if (a === '--out') {
const v = argv[++i];
if (!v) throw new Error('--out requires value');
args.outDir = v;
} else {
throw new Error(`unknown arg: ${a}`);
}
}
return args;
}
async function loadCorpus(
pool: Pool,
selfHandle: string,
sinceDays: number,
threadIds: readonly string[] | null,
prospectHandles: readonly string[] | null,
): Promise<readonly CorpusThread[]> {
const params: unknown[] = [selfHandle, sinceDays];
const filters: string[] = [];
if (threadIds && threadIds.length > 0) {
params.push(threadIds);
filters.push(`AND m.conversation_id::text = ANY($${params.length}::text[])`);
}
if (prospectHandles && prospectHandles.length > 0) {
params.push(prospectHandles);
filters.push(`AND c.participants ?| $${params.length}::text[]`);
}
const filter = filters.join(' ');
const res = await pool.query<{
conversation_id: string;
prospect_handle: string;
quinn_out: number;
them_in: number;
}>(
`SELECT m.conversation_id::text AS conversation_id,
(SELECT (jsonb_array_elements_text(c.participants))
FROM macsync.conversations c
WHERE c.id = m.conversation_id
LIMIT 1) AS prospect_handle,
COUNT(*) FILTER (WHERE m.is_from_me)::int AS quinn_out,
COUNT(*) FILTER (WHERE NOT m.is_from_me)::int AS them_in
FROM macsync.messages m
JOIN macsync.conversations c ON c.id = m.conversation_id
WHERE m.body !~ 'QuinnAiBot'
AND m.body !~ '^Reply suggestion'
AND length(m.body) > 0
AND m.sent_at > now() - ($2::int || ' days')::interval
AND NOT (c.participants @> jsonb_build_array($1::text))
${filter}
GROUP BY m.conversation_id
HAVING COUNT(*) FILTER (WHERE m.is_from_me) >= 3
AND COUNT(*) FILTER (WHERE NOT m.is_from_me) >= 3
ORDER BY COUNT(*) FILTER (WHERE m.is_from_me) DESC`,
params,
);
return res.rows;
}
async function loadThread(pool: Pool, conversationId: string): Promise<readonly ThreadMsgRow[]> {
const res = await pool.query<ThreadMsgRow>(
`SELECT sent_at, is_from_me, from_handle, body
FROM macsync.messages
WHERE conversation_id = $1::uuid
AND body !~ 'QuinnAiBot'
AND body !~ '^Reply suggestion'
AND length(body) > 0
ORDER BY sent_at ASC`,
[conversationId],
);
return res.rows;
}
function buildRequest(
prospectHandle: string,
prefix: readonly ThreadMsgRow[],
voiceSamples: readonly string[],
exemplars: readonly QuinnExemplar[],
plannedLocations: readonly Awaited<ReturnType<typeof fetchTourPlannedLocations>>[number][],
nowIso: string,
): DraftRequest {
return {
prospect_handle: prospectHandle,
thread_messages: prefix.map((m) => ({
ts: m.sent_at instanceof Date ? m.sent_at.toISOString() : new Date(m.sent_at).toISOString(),
from: m.is_from_me ? 'me' : 'them',
text: m.body,
})),
thread_facts: {
name: null,
city: null,
role: null,
rate_paid: null,
last_session_ts: null,
explicit_ask: null,
emotional_register: 'professional',
},
quinn_state: {
now_ts: nowIso,
current_city: plannedLocations.find((p) => p.confirmed)?.city ?? 'San Francisco',
planned_locations_next_14d: plannedLocations.map((p) => ({
city: p.city,
confirmed: p.confirmed,
window: p.window,
...(p.hotelName ? { hotel_name: p.hotelName } : {}),
...(p.checkIn ? { check_in: p.checkIn } : {}),
...(p.checkOut ? { check_out: p.checkOut } : {}),
...(p.eventName ? { event_name: p.eventName } : {}),
...(p.audienceTag ? { audience_tag: p.audienceTag } : {}),
...(p.venue ? { venue: p.venue } : {}),
...(p.reasoning ? { reasoning: p.reasoning } : {}),
})),
off_hours_active: false,
last_outbound_in_thread_ts: null,
},
recent_quinn_outbound_samples: voiceSamples,
recent_quinn_reply_exemplars: exemplars,
playbook_state: 'active',
engine_draft_id: null,
};
}
async function main(): Promise<void> {
const args = parseArgs(process.argv.slice(2));
const dbUrl = process.env['QUINN_OUTREACH_DB_URL'] ?? process.env['QUINN_MACSYNC_DB_URL'];
if (!dbUrl) throw new Error('QUINN_OUTREACH_DB_URL not set');
const selfHandle = process.env['QUINN_SELF_HANDLE'] ?? '+14244663669';
const pool = new Pool({ connectionString: dbUrl, max: 4 });
process.stdout.write(`[parity-replay] connected; corpus query…\n`);
const corpus = await loadCorpus(pool, selfHandle, args.sinceDays, args.threadIds, args.prospectHandles);
process.stdout.write(`[parity-replay] corpus: ${corpus.length} threads\n`);
const voiceSamples = await fetchQuinnVoiceSamples(pool);
const exemplars = await fetchQuinnReplyExemplars(pool);
process.stdout.write(`[parity-replay] voice-samples=${voiceSamples.length} exemplars=${exemplars.length}\n`);
const plannedLocations = await fetchTourPlannedLocations();
mkdirSync(args.outDir, { recursive: true });
const stamp = new Date().toISOString().replace(/[:.]/g, '-');
const outPath = join(args.outDir, `${stamp}.jsonl`);
const out = createWriteStream(outPath, { flags: 'a' });
process.stdout.write(`[parity-replay] writing ${outPath}\n`);
let pairs = 0;
let failures = 0;
for (const thread of corpus) {
const msgs = await loadThread(pool, thread.conversation_id);
const targets = msgs
.map((m, i) => ({ m, i }))
.filter(({ m, i }) => m.is_from_me && i > 0);
const slice = args.limitPerThread != null ? targets.slice(0, args.limitPerThread) : targets;
process.stdout.write(
`[parity-replay] thread ${thread.conversation_id} (${thread.prospect_handle}): ${slice.length}/${targets.length} pairs\n`,
);
for (const { m, i } of slice) {
const prefix = msgs.slice(0, i);
const req = buildRequest(
thread.prospect_handle,
prefix,
voiceSamples,
exemplars,
plannedLocations as never,
(m.sent_at instanceof Date ? m.sent_at : new Date(m.sent_at)).toISOString(),
);
const t0 = Date.now();
try {
const result = await callDraftPipeline(req);
const row = {
thread_id: thread.conversation_id,
prospect_handle: thread.prospect_handle,
ts: (m.sent_at instanceof Date ? m.sent_at : new Date(m.sent_at)).toISOString(),
turn_index: pairs,
actual: m.body,
simulated: result.draft,
meta: result.meta,
brief: result.brief,
elapsed_ms: Date.now() - t0,
error: null,
};
out.write(`${JSON.stringify(row)}\n`);
pairs++;
} catch (err) {
const row = {
thread_id: thread.conversation_id,
prospect_handle: thread.prospect_handle,
ts: (m.sent_at instanceof Date ? m.sent_at : new Date(m.sent_at)).toISOString(),
turn_index: pairs,
actual: m.body,
simulated: '',
meta: null,
brief: null,
elapsed_ms: Date.now() - t0,
error: err instanceof Error ? err.message : String(err),
};
out.write(`${JSON.stringify(row)}\n`);
failures++;
}
}
}
await new Promise<void>((resolve) => out.end(resolve));
await pool.end();
process.stdout.write(
`[parity-replay] done: ${pairs} pairs, ${failures} failures → ${outPath}\n`,
);
}
main().catch((err) => {
process.stderr.write(`[parity-replay] fatal: ${err instanceof Error ? err.stack ?? err.message : String(err)}\n`);
process.exit(1);
});