prospector/tooling/eval/lib.py
Natalie 62288fe48b
Some checks are pending
CI / verify (push) Waiting to run
fix(prospector): burst-aware, 1:1-only extraction (shared lib.py)
Real convos aren't clean alternating turns: ~38% of message-runs are bursts
(one sender, up to 132 in a row), and 5 group chats mix senders under
is_from_me=0. New lib.py collapses bursts into turns, excludes group chats
(chat.style=45 only), and yields CLIENT->QUINN decision points with a
per-conversation cap (avoids verbose threads flooding the set). Corrected
corpus: 1623 1:1 work-era conversations, 16095 decision points (8129 at
max_per_handle=20). sweep.py now uses lib + WORKERS for vertical scaling.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 04:03:46 -04:00

81 lines
3.5 KiB
Python

"""Shared chat.db extraction — burst-aware, 1:1-only, work-era.
Real conversations are not clean alternating turns: one person often sends a
BURST of consecutive messages before the other replies (~38% of runs, up to 132
in a row), and the operator juggles several threads at once. Group chats (style
43) mix multiple senders under is_from_me=0 and must be excluded. This module is
the single correct extraction every eval script uses.
A "turn" = one sender's full consecutive burst, collapsed. A "decision point" =
a CLIENT turn immediately followed by a QUINN turn (what she replied to, and her
reply = the gold label).
"""
import sqlite3, os, datetime
WORK_START_NS = int((datetime.datetime(2025, 11, 1) - datetime.datetime(2001, 1, 1)).total_seconds() * 1e9)
CHATDB = os.path.expanduser("~/Library/Messages/chat.db")
def decode_attr(data):
if not data: return None
try:
idx = data.index(b"NSString"); p = data.index(b"\x2b", idx) + 1
except ValueError: return None
n = data[p]
if n == 0x81: n = int.from_bytes(data[p + 1:p + 3], "little"); p += 3
else: p += 1
return (data[p:p + n].decode("utf-8", "replace").strip()) or None
def _connect():
return sqlite3.connect(f"file:{CHATDB}?mode=ro", uri=True)
def load_threads(work_era_only=True):
"""All 1:1 conversations as {handle, turns:[{who,text}]}, bursts collapsed."""
db = _connect()
q = ("SELECT hd.id, m.is_from_me, m.date, m.text, m.attributedBody "
"FROM message m JOIN handle hd ON m.handle_id = hd.ROWID "
"JOIN chat_message_join cmj ON cmj.message_id = m.ROWID "
"JOIN chat c ON c.ROWID = cmj.chat_id "
"WHERE c.style = 45") # 45 = 1:1, 43 = group (excluded)
args = []
if work_era_only:
q += " AND m.date >= ?"; args.append(WORK_START_NS)
q += " ORDER BY hd.id, m.date ASC"
from collections import defaultdict
raw = defaultdict(list)
for hid, is_me, _, text, ab in db.execute(q, args):
body = (text or "").strip() or decode_attr(ab)
if body:
raw[hid].append((is_me, body))
threads = []
for hid, msgs in raw.items():
turns = []
for is_me, body in msgs:
who = "quinn" if is_me else "client"
if turns and turns[-1]["who"] == who: # collapse burst
turns[-1]["text"] += " " + body
else:
turns.append({"who": who, "text": body})
threads.append({"handle": hid, "turns": turns})
return threads
def decision_points(turns, max_per_handle=1, ctx_turns=8):
"""Yield (context_str, gold_reply) for CLIENT->QUINN transitions in a thread.
max_per_handle=1 -> only the last decision point; higher -> the most recent N
(evenly spaced) to avoid one long thread flooding the set with near-dupes.
"""
idxs = [i for i in range(len(turns) - 1)
if turns[i]["who"] == "client" and turns[i + 1]["who"] == "quinn"]
if not idxs:
return
if len(idxs) > max_per_handle:
step = len(idxs) / max_per_handle
idxs = [idxs[int(k * step)] for k in range(max_per_handle)]
if idxs[-1] != [i for i in range(len(turns) - 1)
if turns[i]["who"] == "client" and turns[i + 1]["who"] == "quinn"][-1]:
idxs[-1] = [i for i in range(len(turns) - 1)
if turns[i]["who"] == "client" and turns[i + 1]["who"] == "quinn"][-1]
for i in idxs:
ctx = turns[max(0, i - ctx_turns + 1):i + 1]
ctx_str = "\n".join(f"{t['who'].upper()}: {t['text']}" for t in ctx)
yield ctx_str, turns[i + 1]["text"]