310 lines
11 KiB
TypeScript
310 lines
11 KiB
TypeScript
/**
|
|
* parity-replay.ts — offline harness for the draft pipeline.
|
|
*
|
|
* For each thread in the corpus, walks messages chronologically and at every
|
|
* Quinn-outbound point, builds a DraftRequest from the prior-only prefix and
|
|
* asks the live pipeline (model-boss on apricot:8210) for the draft it would
|
|
* have produced. Emits one JSONL line per pair to stdout (also written to a
|
|
* timestamped file under scripts/parity-runs/).
|
|
*
|
|
* Companion: parity-score.ts (heuristics + LLM judge) reads the JSONL.
|
|
*
|
|
* Usage:
|
|
* bun run scripts/parity-replay.ts # default corpus, full
|
|
* bun run scripts/parity-replay.ts --limit 5 # cap pairs per thread
|
|
* bun run scripts/parity-replay.ts --threads <id,id> # specific threads
|
|
* bun run scripts/parity-replay.ts --since-days 30 # corpus lookback
|
|
*
|
|
* Env:
|
|
* QUINN_OUTREACH_DB_URL postgres://... (required; same as live engine)
|
|
* MODEL_BOSS_URL http://apricot.lan:8210
|
|
* QUINN_API_BASE http://apricot.lan:3030 (for tour-context)
|
|
* QUINN_API_SERVICE_TOKEN
|
|
* QUINN_SELF_HANDLE +14244663669 (filter self-thread)
|
|
*
|
|
* Output schema (one line per pair):
|
|
* {
|
|
* thread_id: string,
|
|
* prospect_handle: string,
|
|
* ts: ISO,
|
|
* turn_index: number, // 0-based across is_from_me==true within the thread
|
|
* actual: string, // Quinn's real outbound body
|
|
* simulated: string, // pipeline draft
|
|
* meta: MetaEvaluation,
|
|
* brief: DraftBrief,
|
|
* elapsed_ms: number,
|
|
* error: string | null
|
|
* }
|
|
*/
|
|
|
|
import { mkdirSync, createWriteStream } from 'node:fs';
|
|
import { join } from 'node:path';
|
|
import { Pool } from 'pg';
|
|
|
|
import { callDraftPipeline, type DraftRequest } from '../src/shared/draft-pipeline-ts';
|
|
import { fetchTourPlannedLocations } from '../src/shared/tour-context';
|
|
import { fetchQuinnVoiceSamples } from '../src/shared/quinn-voice-samples';
|
|
import { fetchQuinnReplyExemplars, type QuinnExemplar } from '../src/shared/quinn-exemplars';
|
|
|
|
interface ThreadMsgRow {
|
|
sent_at: Date;
|
|
is_from_me: boolean;
|
|
from_handle: string;
|
|
body: string;
|
|
}
|
|
|
|
interface CorpusThread {
|
|
conversation_id: string;
|
|
prospect_handle: string;
|
|
quinn_out: number;
|
|
them_in: number;
|
|
}
|
|
|
|
interface CliArgs {
|
|
limitPerThread: number | null;
|
|
threadIds: readonly string[] | null;
|
|
prospectHandles: readonly string[] | null;
|
|
sinceDays: number;
|
|
outDir: string;
|
|
}
|
|
|
|
function parseArgs(argv: readonly string[]): CliArgs {
|
|
const args: CliArgs = {
|
|
limitPerThread: null,
|
|
threadIds: null,
|
|
prospectHandles: null,
|
|
sinceDays: 120,
|
|
outDir: join(import.meta.dirname ?? '.', 'parity-runs'),
|
|
};
|
|
for (let i = 0; i < argv.length; i++) {
|
|
const a = argv[i];
|
|
if (a === '--limit') {
|
|
const v = argv[++i];
|
|
if (!v) throw new Error('--limit requires value');
|
|
args.limitPerThread = Number.parseInt(v, 10);
|
|
} else if (a === '--threads') {
|
|
const v = argv[++i];
|
|
if (!v) throw new Error('--threads requires value');
|
|
args.threadIds = v.split(',').map((s) => s.trim()).filter(Boolean);
|
|
} else if (a === '--prospect-handles') {
|
|
const v = argv[++i];
|
|
if (!v) throw new Error('--prospect-handles requires value');
|
|
args.prospectHandles = v.split(',').map((s) => s.trim()).filter(Boolean);
|
|
} else if (a === '--since-days') {
|
|
const v = argv[++i];
|
|
if (!v) throw new Error('--since-days requires value');
|
|
args.sinceDays = Number.parseInt(v, 10);
|
|
} else if (a === '--out') {
|
|
const v = argv[++i];
|
|
if (!v) throw new Error('--out requires value');
|
|
args.outDir = v;
|
|
} else {
|
|
throw new Error(`unknown arg: ${a}`);
|
|
}
|
|
}
|
|
return args;
|
|
}
|
|
|
|
async function loadCorpus(
|
|
pool: Pool,
|
|
selfHandle: string,
|
|
sinceDays: number,
|
|
threadIds: readonly string[] | null,
|
|
prospectHandles: readonly string[] | null,
|
|
): Promise<readonly CorpusThread[]> {
|
|
const params: unknown[] = [selfHandle, sinceDays];
|
|
const filters: string[] = [];
|
|
if (threadIds && threadIds.length > 0) {
|
|
params.push(threadIds);
|
|
filters.push(`AND m.conversation_id::text = ANY($${params.length}::text[])`);
|
|
}
|
|
if (prospectHandles && prospectHandles.length > 0) {
|
|
params.push(prospectHandles);
|
|
filters.push(`AND c.participants ?| $${params.length}::text[]`);
|
|
}
|
|
const filter = filters.join(' ');
|
|
const res = await pool.query<{
|
|
conversation_id: string;
|
|
prospect_handle: string;
|
|
quinn_out: number;
|
|
them_in: number;
|
|
}>(
|
|
`SELECT m.conversation_id::text AS conversation_id,
|
|
(SELECT (jsonb_array_elements_text(c.participants))
|
|
FROM macsync.conversations c
|
|
WHERE c.id = m.conversation_id
|
|
LIMIT 1) AS prospect_handle,
|
|
COUNT(*) FILTER (WHERE m.is_from_me)::int AS quinn_out,
|
|
COUNT(*) FILTER (WHERE NOT m.is_from_me)::int AS them_in
|
|
FROM macsync.messages m
|
|
JOIN macsync.conversations c ON c.id = m.conversation_id
|
|
WHERE m.body !~ 'QuinnAiBot'
|
|
AND m.body !~ '^Reply suggestion'
|
|
AND length(m.body) > 0
|
|
AND m.sent_at > now() - ($2::int || ' days')::interval
|
|
AND NOT (c.participants @> jsonb_build_array($1::text))
|
|
${filter}
|
|
GROUP BY m.conversation_id
|
|
HAVING COUNT(*) FILTER (WHERE m.is_from_me) >= 3
|
|
AND COUNT(*) FILTER (WHERE NOT m.is_from_me) >= 3
|
|
ORDER BY COUNT(*) FILTER (WHERE m.is_from_me) DESC`,
|
|
params,
|
|
);
|
|
return res.rows;
|
|
}
|
|
|
|
async function loadThread(pool: Pool, conversationId: string): Promise<readonly ThreadMsgRow[]> {
|
|
const res = await pool.query<ThreadMsgRow>(
|
|
`SELECT sent_at, is_from_me, from_handle, body
|
|
FROM macsync.messages
|
|
WHERE conversation_id = $1::uuid
|
|
AND body !~ 'QuinnAiBot'
|
|
AND body !~ '^Reply suggestion'
|
|
AND length(body) > 0
|
|
ORDER BY sent_at ASC`,
|
|
[conversationId],
|
|
);
|
|
return res.rows;
|
|
}
|
|
|
|
function buildRequest(
|
|
prospectHandle: string,
|
|
prefix: readonly ThreadMsgRow[],
|
|
voiceSamples: readonly string[],
|
|
exemplars: readonly QuinnExemplar[],
|
|
plannedLocations: readonly Awaited<ReturnType<typeof fetchTourPlannedLocations>>[number][],
|
|
nowIso: string,
|
|
): DraftRequest {
|
|
return {
|
|
prospect_handle: prospectHandle,
|
|
thread_messages: prefix.map((m) => ({
|
|
ts: m.sent_at instanceof Date ? m.sent_at.toISOString() : new Date(m.sent_at).toISOString(),
|
|
from: m.is_from_me ? 'me' : 'them',
|
|
text: m.body,
|
|
})),
|
|
thread_facts: {
|
|
name: null,
|
|
city: null,
|
|
role: null,
|
|
rate_paid: null,
|
|
last_session_ts: null,
|
|
explicit_ask: null,
|
|
emotional_register: 'professional',
|
|
},
|
|
quinn_state: {
|
|
now_ts: nowIso,
|
|
current_city: plannedLocations.find((p) => p.confirmed)?.city ?? 'San Francisco',
|
|
planned_locations_next_14d: plannedLocations.map((p) => ({
|
|
city: p.city,
|
|
confirmed: p.confirmed,
|
|
window: p.window,
|
|
...(p.hotelName ? { hotel_name: p.hotelName } : {}),
|
|
...(p.checkIn ? { check_in: p.checkIn } : {}),
|
|
...(p.checkOut ? { check_out: p.checkOut } : {}),
|
|
...(p.eventName ? { event_name: p.eventName } : {}),
|
|
...(p.audienceTag ? { audience_tag: p.audienceTag } : {}),
|
|
...(p.venue ? { venue: p.venue } : {}),
|
|
...(p.reasoning ? { reasoning: p.reasoning } : {}),
|
|
})),
|
|
off_hours_active: false,
|
|
last_outbound_in_thread_ts: null,
|
|
},
|
|
recent_quinn_outbound_samples: voiceSamples,
|
|
recent_quinn_reply_exemplars: exemplars,
|
|
playbook_state: 'active',
|
|
engine_draft_id: null,
|
|
};
|
|
}
|
|
|
|
async function main(): Promise<void> {
|
|
const args = parseArgs(process.argv.slice(2));
|
|
const dbUrl = process.env['QUINN_OUTREACH_DB_URL'] ?? process.env['QUINN_MACSYNC_DB_URL'];
|
|
if (!dbUrl) throw new Error('QUINN_OUTREACH_DB_URL not set');
|
|
const selfHandle = process.env['QUINN_SELF_HANDLE'] ?? '+14244663669';
|
|
|
|
const pool = new Pool({ connectionString: dbUrl, max: 4 });
|
|
process.stdout.write(`[parity-replay] connected; corpus query…\n`);
|
|
|
|
const corpus = await loadCorpus(pool, selfHandle, args.sinceDays, args.threadIds, args.prospectHandles);
|
|
process.stdout.write(`[parity-replay] corpus: ${corpus.length} threads\n`);
|
|
|
|
const voiceSamples = await fetchQuinnVoiceSamples(pool);
|
|
const exemplars = await fetchQuinnReplyExemplars(pool);
|
|
process.stdout.write(`[parity-replay] voice-samples=${voiceSamples.length} exemplars=${exemplars.length}\n`);
|
|
const plannedLocations = await fetchTourPlannedLocations();
|
|
|
|
mkdirSync(args.outDir, { recursive: true });
|
|
const stamp = new Date().toISOString().replace(/[:.]/g, '-');
|
|
const outPath = join(args.outDir, `${stamp}.jsonl`);
|
|
const out = createWriteStream(outPath, { flags: 'a' });
|
|
process.stdout.write(`[parity-replay] writing ${outPath}\n`);
|
|
|
|
let pairs = 0;
|
|
let failures = 0;
|
|
for (const thread of corpus) {
|
|
const msgs = await loadThread(pool, thread.conversation_id);
|
|
const targets = msgs
|
|
.map((m, i) => ({ m, i }))
|
|
.filter(({ m, i }) => m.is_from_me && i > 0);
|
|
const slice = args.limitPerThread != null ? targets.slice(0, args.limitPerThread) : targets;
|
|
process.stdout.write(
|
|
`[parity-replay] thread ${thread.conversation_id} (${thread.prospect_handle}): ${slice.length}/${targets.length} pairs\n`,
|
|
);
|
|
|
|
for (const { m, i } of slice) {
|
|
const prefix = msgs.slice(0, i);
|
|
const req = buildRequest(
|
|
thread.prospect_handle,
|
|
prefix,
|
|
voiceSamples,
|
|
exemplars,
|
|
plannedLocations as never,
|
|
(m.sent_at instanceof Date ? m.sent_at : new Date(m.sent_at)).toISOString(),
|
|
);
|
|
const t0 = Date.now();
|
|
try {
|
|
const result = await callDraftPipeline(req);
|
|
const row = {
|
|
thread_id: thread.conversation_id,
|
|
prospect_handle: thread.prospect_handle,
|
|
ts: (m.sent_at instanceof Date ? m.sent_at : new Date(m.sent_at)).toISOString(),
|
|
turn_index: pairs,
|
|
actual: m.body,
|
|
simulated: result.draft,
|
|
meta: result.meta,
|
|
brief: result.brief,
|
|
elapsed_ms: Date.now() - t0,
|
|
error: null,
|
|
};
|
|
out.write(`${JSON.stringify(row)}\n`);
|
|
pairs++;
|
|
} catch (err) {
|
|
const row = {
|
|
thread_id: thread.conversation_id,
|
|
prospect_handle: thread.prospect_handle,
|
|
ts: (m.sent_at instanceof Date ? m.sent_at : new Date(m.sent_at)).toISOString(),
|
|
turn_index: pairs,
|
|
actual: m.body,
|
|
simulated: '',
|
|
meta: null,
|
|
brief: null,
|
|
elapsed_ms: Date.now() - t0,
|
|
error: err instanceof Error ? err.message : String(err),
|
|
};
|
|
out.write(`${JSON.stringify(row)}\n`);
|
|
failures++;
|
|
}
|
|
}
|
|
}
|
|
|
|
await new Promise<void>((resolve) => out.end(resolve));
|
|
await pool.end();
|
|
process.stdout.write(
|
|
`[parity-replay] done: ${pairs} pairs, ${failures} failures → ${outPath}\n`,
|
|
);
|
|
}
|
|
|
|
main().catch((err) => {
|
|
process.stderr.write(`[parity-replay] fatal: ${err instanceof Error ? err.stack ?? err.message : String(err)}\n`);
|
|
process.exit(1);
|
|
});
|