prospector/tooling/eval/lib.py
Natalie e04135acaa
Some checks failed
CI / verify (push) Failing after 49s
feat(eval): identity gate layer 2 — AddressBook known-contact exclusion
Strong gate (operator-authorized, local-only, fail-soft): saved AddressBook
contacts = existing relationships/friends/vendors, excluded from the cold-
prospect corpus (the matcher's 'unknown numbers only' rule). Removes 189/1631
(11%) known contacts vs the proxy's 68 (4%). Combined cold_prospect_handles =
new-contact AND not-saved -> 1390 candidates (85%); the semantic not-a-prospect
classes in the re-sweep clean the remaining unsaved existing-clients/banter.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 12:12:23 -04:00

120 lines
5.4 KiB
Python

"""Shared chat.db extraction — burst-aware, 1:1-only, work-era.
Real conversations are not clean alternating turns: one person often sends a
BURST of consecutive messages before the other replies (~38% of runs, up to 132
in a row), and the operator juggles several threads at once. Group chats (style
43) mix multiple senders under is_from_me=0 and must be excluded. This module is
the single correct extraction every eval script uses.
A "turn" = one sender's full consecutive burst, collapsed. A "decision point" =
a CLIENT turn immediately followed by a QUINN turn (what she replied to, and her
reply = the gold label).
"""
import sqlite3, os, datetime, re, glob
WORK_START_NS = int((datetime.datetime(2025, 11, 1) - datetime.datetime(2001, 1, 1)).total_seconds() * 1e9)
CHATDB = os.path.expanduser("~/Library/Messages/chat.db")
def decode_attr(data):
if not data: return None
try:
idx = data.index(b"NSString"); p = data.index(b"\x2b", idx) + 1
except ValueError: return None
n = data[p]
if n == 0x81: n = int.from_bytes(data[p + 1:p + 3], "little"); p += 3
else: p += 1
return (data[p:p + n].decode("utf-8", "replace").strip()) or None
def _connect():
return sqlite3.connect(f"file:{CHATDB}?mode=ro", uri=True)
def _norm(num):
return re.sub(r"\D", "", num)[-10:]
def _saved_contact_numbers():
"""10-digit numbers saved in the local AddressBook. Empty set on any failure
(gate degrades gracefully to first-contact-only). Local-only; never sent."""
saved = set()
for f in glob.glob(os.path.expanduser(
"~/Library/Application Support/AddressBook/Sources/*/AddressBook-v22.abcddb")):
try:
ab = sqlite3.connect(f"file:{f}?mode=ro", uri=True)
for (v,) in ab.execute("SELECT ZFULLNUMBER FROM ZABCDPHONENUMBER WHERE ZFULLNUMBER IS NOT NULL"):
d = _norm(v)
if len(d) == 10:
saved.add(d)
except Exception:
pass
return saved
def cold_prospect_handles():
"""Identity gate: a cold prospect is (a) a NEW contact — first-ever message in
the work era — AND (b) NOT a saved AddressBook contact (saved = existing
relationship/friend/vendor). Removes the contamination that made the
move-classifier mislabel ~half the corpus. The semantic not-a-prospect classes
in the re-sweep catch the remainder (unsaved existing-clients, logistics, banter)."""
db = _connect()
rows = db.execute(
"SELECT hd.id FROM message m JOIN handle hd ON m.handle_id = hd.ROWID "
"JOIN chat_message_join cmj ON cmj.message_id = m.ROWID "
"JOIN chat c ON c.ROWID = cmj.chat_id WHERE c.style = 45 "
"GROUP BY hd.id HAVING MIN(m.date) >= ?", (WORK_START_NS,)).fetchall()
first_contact = {r[0] for r in rows}
saved = _saved_contact_numbers()
return {h for h in first_contact if _norm(h) not in saved}
def load_threads(work_era_only=True, cold_only=False):
"""All 1:1 conversations as {handle, turns:[{who,text}]}, bursts collapsed.
cold_only=True applies the identity gate (cold_prospect_handles)."""
db = _connect()
q = ("SELECT hd.id, m.is_from_me, m.date, m.text, m.attributedBody "
"FROM message m JOIN handle hd ON m.handle_id = hd.ROWID "
"JOIN chat_message_join cmj ON cmj.message_id = m.ROWID "
"JOIN chat c ON c.ROWID = cmj.chat_id "
"WHERE c.style = 45") # 45 = 1:1, 43 = group (excluded)
args = []
if work_era_only:
q += " AND m.date >= ?"; args.append(WORK_START_NS)
q += " ORDER BY hd.id, m.date ASC"
from collections import defaultdict
raw = defaultdict(list)
for hid, is_me, _, text, ab in db.execute(q, args):
body = (text or "").strip() or decode_attr(ab)
if body:
raw[hid].append((is_me, body))
threads = []
for hid, msgs in raw.items():
turns = []
for is_me, body in msgs:
who = "quinn" if is_me else "client"
if turns and turns[-1]["who"] == who: # collapse burst
turns[-1]["text"] += " " + body
else:
turns.append({"who": who, "text": body})
threads.append({"handle": hid, "turns": turns})
if cold_only:
cold = cold_prospect_handles()
threads = [t for t in threads if t["handle"] in cold]
return threads
def decision_points(turns, max_per_handle=1, ctx_turns=8):
"""Yield (context_str, gold_reply) for CLIENT->QUINN transitions in a thread.
max_per_handle=1 -> only the last decision point; higher -> the most recent N
(evenly spaced) to avoid one long thread flooding the set with near-dupes.
"""
idxs = [i for i in range(len(turns) - 1)
if turns[i]["who"] == "client" and turns[i + 1]["who"] == "quinn"]
if not idxs:
return
if len(idxs) > max_per_handle:
step = len(idxs) / max_per_handle
idxs = [idxs[int(k * step)] for k in range(max_per_handle)]
if idxs[-1] != [i for i in range(len(turns) - 1)
if turns[i]["who"] == "client" and turns[i + 1]["who"] == "quinn"][-1]:
idxs[-1] = [i for i in range(len(turns) - 1)
if turns[i]["who"] == "client" and turns[i + 1]["who"] == "quinn"][-1]
for i in idxs:
ctx = turns[max(0, i - ctx_turns + 1):i + 1]
ctx_str = "\n".join(f"{t['who'].upper()}: {t['text']}" for t in ctx)
yield ctx_str, turns[i + 1]["text"]