redroid-mrnumber/client/mr_lookup.py
Natalie 15e7348413 feat(box): OCR extraction + GPU (OpenAI-compatible) rating backend, env-selectable
Wire the on-box (Claude-API-less) path decided with the operator: EXTRACT_BACKEND=ocr
sends each screenshot to the on-box mrnumber-ocr service (raw text, no per-shot
structuring); build_rating_profile uses an OpenAI-compatible LLM on a DO GPU droplet
(RATING_LLM_URL) which extracts the reports from the raw OCR text AND produces the
multi-axis verdict. Reports are folded back into the history so the people-signal +
counts + safety flags reflect them; safety detection also scans the raw OCR lines so a
LE term forces cop_flag even before structuring. vision/Claude stays the plum-dev
default. +5 tests incl. full OCR→GPU→cop_flag flow. 33/33.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 00:39:06 -04:00

890 lines
38 KiB
Python
Executable file

#!/usr/bin/env python3
"""
mr-number-lookup
Drive an Android device (USB phone or the redroid droplet) running the Mr. Number
app (com.mrnumber.blocker), perform a phone lookup, expand + scroll-capture the
*full* community-report history, vision-extract every report, consolidate them with
the lilith claude-code-batch-sdk into a multi-axis **rating profile** (0-100 + letter
grade) for the caller, decide a screening result, save the full history, and record
the verdict as a `screening_mrnumber` person signal in the cocotte **people service**
(persons DB), keyed by the phone number. Prospector and the rest of the cocotte
ecosystem consume that signal; there is no quinn coupling.
The device automation, vision harness, and signal recorder are shared with the
sibling @whatsapp via the `redroid_client` package (pip: lilith-redroid-client); only
the Mr. Number-specific navigation, rating profile, and verdict logic live here.
Usage:
python3 mr_lookup.py --phone "+15551234567" [--ref <correlation-id>] [--dry-run]
Requires:
- adb in PATH; a device connected (USB serial, or `adb connect <host>:5555` for redroid)
with the paid Mr. Number app installed + signed in.
- PEOPLE_BASE_URL + PEOPLE_SERVICE_TOKEN in env (for recording the signal; mesh-only).
- The claude batch SDK on disk (for vision + rating consolidation).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import re
import unicodedata
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
from redroid_client import (
RedroidDevice,
clean_phone as _clean_phone,
extract_backend,
extract_screenshot,
json_mode,
load_sdk,
log,
ocr_extract,
ocr_url,
openai_chat,
people_base_url,
people_service_token,
rating_llm_model,
rating_llm_url,
record_people_signal,
set_json_mode,
)
# --- Config / env
# Verdicts are recorded as person signals in the cocotte people service (persons DB),
# keyed by the phone number. Prospector and the rest of the ecosystem consume these
# signals — there is no quinn coupling. The person is auto-upserted by (handle, channel).
PEOPLE_BASE_URL = people_base_url()
PEOPLE_SERVICE_TOKEN = people_service_token()
# Phone numbers map to the 'sms' people-channel (no dedicated 'phone' channel exists).
PEOPLE_CHANNEL = "sms"
SCREENING_SIGNAL_TYPE = "screening_mrnumber"
SOURCE_FEATURE = "mr-number"
DEVICE = os.environ.get("MR_NUMBER_DEVICE", "emulator-5554")
PACKAGE = "com.mrnumber.blocker"
OUTPUT_DIR = Path(__file__).parent / "output"
HISTORY_DIR = OUTPUT_DIR / "history"
OUTPUT_DIR.mkdir(exist_ok=True)
HISTORY_DIR.mkdir(exist_ok=True)
# Vision = fast/cheap text-from-image. Rating = reasoning over the consolidated
# history, so it defaults to a stronger model (override via env).
VISION_MODEL = os.environ.get("MR_NUMBER_VISION_MODEL", "haiku")
RATING_MODEL = os.environ.get("MR_NUMBER_RATING_MODEL", "sonnet")
MAX_SCROLL_CAPTURES = int(os.environ.get("MR_NUMBER_MAX_SCROLLS", "10"))
# ----------------------------------------------------------------------------
# Vision extraction (per screenshot)
# ----------------------------------------------------------------------------
MR_NUMBER_SYSTEM = (
"You are looking at a screenshot from the Mr. Number (caller ID + community reports) Android app. "
"Extract the information shown for the looked-up phone number. Respond ONLY with a single JSON object, no markdown."
)
def _build_vision_prompt(screenshot_path: str, phone: str) -> str:
schema = {
"phone": "the exact phone number that was searched (string)",
"report_count": "integer or null — the total number of reports the app says exist (e.g. 'View all 7 reports' -> 7), not just visible",
"reports": "array of strings — every report/comment text VISIBLE in this screenshot, verbatim (the valuable paid content)",
"classification": "string or null — the label at the top (e.g. 'Personal Line', 'Business', 'Suspected Spam')",
"red_flags": "array of strings — negative signals mentioned (no-show, ghosting, rude, cop/law-enforcement, timewaster, boundary issues, etc.)",
"summary": "short one-sentence impression from the reports visible here",
"suggested_result": "one of: approved, denied, not_found — your best guess from what's visible",
}
return (
f"Read the image file at: {screenshot_path}\n\n"
f"This is a screenshot after looking up {phone} in the Mr. Number app.\n"
"Extract the community reports and any top-level caller info VISIBLE in this image. "
"Transcribe report text verbatim — do not paraphrase. "
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
)
async def _extract_from_screenshot(screenshot_path: str, phone: str) -> dict[str, Any]:
"""Per-screenshot extraction. With EXTRACT_BACKEND=vision (plum dev) Claude returns a
structured report dict. With EXTRACT_BACKEND=ocr (the box) the on-box tesseract service
returns raw screen text — there is no per-shot structuring; the rating LLM does the
extraction + reasoning from the concatenated OCR text downstream."""
if extract_backend() == "ocr":
payload = ocr_extract(str(screenshot_path), base_url=ocr_url())
return {"reports": [], "red_flags": [], "classification": None,
"report_count": None, "raw_ocr": payload.get("text", "")}
return await extract_screenshot(
screenshot_path=str(screenshot_path),
system=MR_NUMBER_SYSTEM,
prompt=_build_vision_prompt(str(screenshot_path), phone),
model=VISION_MODEL,
)
def merge_reports(extractions: list[dict[str, Any]], phone: str) -> dict[str, Any]:
"""Consolidate per-screenshot extractions into one deduped report history."""
reports: list[str] = []
seen: set[str] = set()
red_flags: list[str] = []
red_seen: set[str] = set()
classification: str | None = None
declared_count = 0
ocr_chunks: list[str] = []
for ex in extractions:
if not isinstance(ex, dict):
continue
chunk = (ex.get("raw_ocr") or "").strip()
if chunk:
ocr_chunks.append(chunk)
if not classification and ex.get("classification"):
classification = ex.get("classification")
rc = ex.get("report_count")
if isinstance(rc, int):
declared_count = max(declared_count, rc)
for r in ex.get("reports") or []:
key = re.sub(r"\s+", " ", str(r).strip().lower())
if key and key not in seen:
seen.add(key)
reports.append(str(r).strip())
for f in ex.get("red_flags") or []:
key = re.sub(r"\s+", " ", str(f).strip().lower())
if key and key not in red_seen:
red_seen.add(key)
red_flags.append(str(f).strip())
ocr_text = "\n".join(ocr_chunks)
# Safety flags are deterministic keyword/regex over text, so for the OCR backend we
# scan the raw OCR lines too (the LE/violence signal must be caught even before the
# rating LLM structures the reports).
safety_inputs = reports + [ln.strip() for ln in ocr_text.splitlines() if ln.strip()]
return {
"phone": phone,
"reports": reports,
"red_flags": red_flags,
"classification": classification,
"ocr_text": ocr_text,
# report_count = the larger of what the app declared vs. how many we captured
"report_count": max(declared_count, len(reports)),
"captured_count": len(reports),
"declared_count": declared_count,
# Critical safety signals, promoted OUT of the flat report/flag lists so a
# human (and the verdict) never has to find them buried in row 7 of 14.
"safety_flags": detect_safety_flags(safety_inputs, red_flags),
}
# ----------------------------------------------------------------------------
# Safety-flag promotion — deterministic, surfaced ABOVE the rating profile.
#
# The rating LLM folds the single most dangerous signal (a law-enforcement sting,
# violence, robbery, coercion) into a flat `red_flags` list and can under-weight
# it — e.g. it scored "Es policía" as just one of fourteen flags. These categories
# are a direct threat to the worker's safety and liberty, so we detect them
# DETERMINISTICALLY from the report text (not trusting the model to resurface what
# it just buried), promote them to a top-level `safety_flags` array with an icon +
# the matched evidence line, and force a 'denied' regardless of the model's score.
# Matching is accent- and language-folded so the Spanish "Es policía" matches too.
# ----------------------------------------------------------------------------
SAFETY_TAXONOMY: tuple[dict[str, Any], ...] = (
{
"category": "law_enforcement",
"icon": "🚔",
"label": "Law enforcement / sting",
"severity": "critical",
"patterns": (
r"\bpolice\b", r"\bpolicia\b", r"\bpoli\b", r"\bcops?\b", r"\bleo\b",
r"\blaw enforcement\b", r"\bsting\b", r"\bundercover\b", r"\bofficer\b",
r"\bfeds?\b", r"\bdetective\b", r"\bvice\b", r"\bentrapment\b",
),
},
{
"category": "violence",
"icon": "⚠️",
"label": "Violence / weapon",
"severity": "critical",
"patterns": (
r"\bviolen", r"\bassault", r"\bweapon", r"\bgun\b", r"\bknife\b",
r"\bchok", r"\bstrangl", r"\brape", r"\bbeat me", r"\bhit me",
r"\battacked\b", r"\bhurt me", r"\bforced himself",
),
},
{
"category": "robbery",
"icon": "🚨",
"label": "Robbery / theft",
"severity": "critical",
"patterns": (
r"\brobbed\b", r"\brobbery\b", r"\bmugg", r"\bstole\b", r"\bstolen\b",
r"\btheft\b", r"\bheld up\b",
),
},
{
"category": "coercion",
"icon": "🛑",
"label": "Coercion / threat",
"severity": "critical",
"patterns": (
r"\bthreaten", r"\bthreat\b", r"\bblackmail", r"\bextort", r"\bcoerce",
r"\bheld me\b", r"\bwouldn t let me\b", r"\bwould not let me\b",
),
},
)
def _fold(text: str) -> str:
"""Lowercase + strip accents (NFKD) + collapse non-alnum to single spaces, so
'Es policía' folds to 'es policia' and word-boundary patterns match across
punctuation, emoji, and accented Spanish."""
decomposed = unicodedata.normalize("NFKD", text)
stripped = "".join(c for c in decomposed if not unicodedata.combining(c))
return re.sub(r"\s+", " ", re.sub(r"[^a-z0-9]+", " ", stripped.lower())).strip()
def detect_safety_flags(reports: list[str], red_flags: list[str]) -> list[dict[str, Any]]:
"""Scan every report/flag line for critical-safety categories and return a
deduped, taxonomy-ordered list of {category, icon, label, severity, evidence}.
`evidence` holds the original lines that tripped the category."""
lines = [str(x).strip() for x in (*reports, *red_flags) if str(x).strip()]
folded = [(line, _fold(line)) for line in lines]
flags: list[dict[str, Any]] = []
for entry in SAFETY_TAXONOMY:
evidence: list[str] = []
seen: set[str] = set()
for original, fold in folded:
if original.lower() in seen:
continue
if any(re.search(p, fold) for p in entry["patterns"]):
evidence.append(original)
seen.add(original.lower())
if evidence:
flags.append({
"category": entry["category"],
"icon": entry["icon"],
"label": entry["label"],
"severity": entry["severity"],
"evidence": evidence,
})
return flags
def has_critical_safety_flag(safety_flags: list[dict[str, Any]] | None) -> bool:
return any((f or {}).get("severity") == "critical" for f in (safety_flags or []))
def apply_safety_override(result: str, safety_flags: list[dict[str, Any]] | None) -> str:
"""A critical safety flag forces 'denied' no matter what the rating produced —
the hard floor that does not depend on the LLM scoring the signal correctly."""
return "denied" if has_critical_safety_flag(safety_flags) else result
# ----------------------------------------------------------------------------
# Rating profile (consolidation via the batch SDK)
# ----------------------------------------------------------------------------
RATING_SYSTEM = (
"You are a trust-and-safety analyst for an independent adult-industry provider (legal, "
"regulated). You read crowdsourced caller reports from Mr. Number and produce a structured "
"rating profile for the caller — how safe and worthwhile they are as a potential client. "
"Respond ONLY with a single JSON object, no markdown.\n\n"
"DOMAIN NUANCE — read signals like an insider, not literally:\n"
"- DEPOSITS ARE GOOD. A report mentioning the caller 'paid a deposit', 'sent a deposit', "
"'offered/asked to send a deposit', or 'always deposits' is a STRONG POSITIVE — deposit-payers "
"are serious, vetted, low-risk clients. Weight this heavily toward A/B. Only 'refused/won't pay "
"a deposit' or 'chargeback' is negative.\n"
"- 'Get a deposit' / 'make him deposit' written as advice from another provider means the caller "
"is known to follow through once a deposit is taken — treat as a manageable/positive signal, NOT a red flag.\n"
"- RELIABILITY: no-show, ghosting, flaking, cancelling last-minute → negative.\n"
"- SAFETY (critical): law enforcement / cop / sting / 'asks weird LE questions', violence, coercion, "
"robbery, attempts to remove agency → severe negative; if present, recommend denied regardless of other axes.\n"
"- RESPECT: rude, pushy, haggling, boundary-pushing → negative.\n"
"- MIXED REVIEWS: when reports conflict, do NOT average blindly — score each axis on its own evidence "
"and explain the split.\n\n"
"SCORING: 0-100 overall (higher = safer/better client). Grade A>=85, B 70-84, C 55-69, D 40-54, F<40."
)
def _build_rating_prompt(history: dict[str, Any]) -> str:
schema = {
"score": "integer 0-100 — overall safety/desirability as a client",
"grade": "one of A,B,C,D,F (A>=85, B 70-84, C 55-69, D 40-54, F<40)",
"is_mixed": "boolean — true if the reports conflict / are genuinely mixed",
"axes": {
"reliability": {"score": "0-100", "note": "shows up vs no-shows/ghosting/flaking"},
"payment": {"score": "0-100", "note": "deposits (GOOD), pays agreed rate, no haggling/chargebacks"},
"respect": {"score": "0-100", "note": "politeness, respects boundaries, not pushy"},
"safety": {"score": "0-100", "note": "no law-enforcement/violence/coercion signals"},
},
"reports": "array of strings — the verbatim community report texts (extract them from the raw OCR when given raw text)",
"positive_signals": "array of strings — concrete positives found (quote/paraphrase the report)",
"negative_signals": "array of strings — concrete negatives found",
"nuanced_notes": "array of strings — where you read a signal NON-literally (e.g. deposit mentions as positive)",
"summary": "2-3 sentence consolidated profile of this caller",
"recommended_result": "one of: approved, denied, pending, not_found",
}
safety_flags = history.get("safety_flags") or []
safety_block = ""
if safety_flags:
promoted = "; ".join(f"{f['label']} ({'; '.join(f['evidence'])})" for f in safety_flags)
safety_block = (
f"PROMOTED CRITICAL SAFETY FLAGS (already detected — score the safety axis "
f"at/near 0 and recommend denied): {promoted}\n\n"
)
# OCR backend gives raw screen text (no pre-structured reports) — ask the model to
# extract the reports first; the vision backend already provides a clean report list.
ocr_text = (history.get("ocr_text") or "").strip()
if not (history.get("reports")) and ocr_text:
content_block = (
"Raw OCR text of the Mr. Number report screen(s) — noisy (UI chrome, partial "
"lines). FIRST extract the genuine community report texts (ignore buttons, "
"headers, nav), then rate:\n"
f"<<<OCR\n{ocr_text}\nOCR>>>\n\n"
)
else:
reports_block = "\n".join(f"- {r}" for r in history.get("reports") or []) or "(no report text captured)"
content_block = f"All captured community reports:\n{reports_block}\n\n"
return (
f"Caller: {history.get('phone')}\n"
f"App classification: {history.get('classification')}\n"
f"Reports the app says exist: {history.get('report_count')} "
f"(captured {history.get('captured_count')})\n\n"
f"{content_block}"
f"{safety_block}"
f"Flagged terms: {', '.join(history.get('red_flags') or []) or '(none)'}\n\n"
"Produce the caller's rating profile. Apply the domain nuance from the system prompt "
"(especially: deposits are a positive signal; law-enforcement signals force denied). "
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
)
def _extract_json(text: str) -> dict[str, Any] | None:
"""Pull the first JSON object out of an LLM response (handles ```json fences / prose)."""
if not text:
return None
fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
candidate = fence.group(1) if fence else None
if candidate is None:
start = text.find("{")
end = text.rfind("}")
candidate = text[start:end + 1] if start != -1 and end > start else None
if not candidate:
return None
try:
obj = json.loads(candidate)
return obj if isinstance(obj, dict) else None
except json.JSONDecodeError:
return None
def _normalize_profile(parsed: dict[str, Any] | None) -> dict[str, Any] | None:
if not isinstance(parsed, dict):
return None
score = parsed.get("score")
if isinstance(score, (int, float)):
parsed["score"] = int(score)
if not parsed.get("grade"):
parsed["grade"] = grade_from_score(parsed["score"])
return parsed
async def build_rating_profile(history: dict[str, Any]) -> dict[str, Any] | None:
"""Consolidate the report history into a multi-axis rating profile. Backend is
env-selected: RATING_LLM_URL set → an OpenAI-compatible LLM on the DO GPU droplet
(also does report extraction from raw OCR); else the Claude batch SDK (plum dev)."""
if not (history.get("reports") or history.get("ocr_text")):
return None
prompt = _build_rating_prompt(history)
gpu_url = rating_llm_url()
if gpu_url:
content = openai_chat(base_url=gpu_url, model=rating_llm_model(),
system=RATING_SYSTEM, user=prompt)
return _normalize_profile(_extract_json(content))
ClaudeClient, parse_json_response = load_sdk()
client = ClaudeClient(model=RATING_MODEL, max_concurrent=1)
resp = await client.generate(system=RATING_SYSTEM, user=prompt, cwd=str(OUTPUT_DIR), allowed_tools=[])
if not resp:
return None
return _normalize_profile(parse_json_response(resp))
def grade_from_score(score: int | float | None) -> str:
if score is None:
return "?"
if score >= 85:
return "A"
if score >= 70:
return "B"
if score >= 55:
return "C"
if score >= 40:
return "D"
return "F"
def result_from_score(score: int | float | None) -> str:
if score is None:
return "pending"
if score >= 70:
return "approved"
if score < 45:
return "denied"
return "pending"
def result_from_profile(profile: dict[str, Any] | None) -> str:
"""Map the rating profile to a screening result enum, with a hard safety override."""
if not profile:
return "pending"
axes = profile.get("axes") or {}
safety = axes.get("safety") or {}
s_score = safety.get("score")
if isinstance(s_score, (int, float)) and s_score < 30:
return "denied" # law-enforcement/violence signal overrides everything
rec = profile.get("recommended_result")
if rec in ("approved", "denied", "pending", "not_found"):
return rec
return result_from_score(profile.get("score"))
# ----------------------------------------------------------------------------
# adb device control — Mr. Number specifics over the shared RedroidDevice base
# ----------------------------------------------------------------------------
class MrNumberEmulator(RedroidDevice):
"""The Mr. Number device controller. The generic adb/UI surface is inherited
from `RedroidDevice`; the Mr. Number-specific navigation lives in the
module-level functions below (they compose the inherited primitives)."""
ui_dump_remote = "/sdcard/mr_ui.xml"
ui_dump_local = "/tmp/mr_ui.xml"
screenshot_remote = "/sdcard/mr_result.png"
screenshot_prefix = "mr-number"
def __init__(self, device: str | None = None, package: str | None = None):
super().__init__(device or DEVICE, package or PACKAGE, OUTPUT_DIR)
# Module-level shims (existing call sites + patchability in tests)
_emulator: MrNumberEmulator | None = None
def _get_emulator() -> MrNumberEmulator:
global _emulator
if _emulator is None:
_emulator = MrNumberEmulator()
return _emulator
def adb(args: list[str], check: bool = True) -> str:
return _get_emulator().adb(args, check)
def adb_text(text: str) -> None:
_get_emulator().adb_text(text)
def adb_keyevent(code: int) -> None:
_get_emulator().adb_keyevent(code)
def get_ui_dump() -> str:
return _get_emulator().get_ui_dump()
def find_and_tap_text(target_texts: list[str]) -> bool:
return _get_emulator().find_and_tap_text(target_texts)
def find_edit_text_and_input(phone: str) -> bool:
return _get_emulator().find_edit_text_and_input(phone)
def launch_app() -> None:
_get_emulator().launch_app()
def take_screenshot(phone: str, tag: str = "") -> Path:
return _get_emulator().take_screenshot(phone, tag)
_DETAIL_MARKERS = ("recent reports", "report caller", "user reports", "view all", "block number", "block caller")
def _has_search_field() -> bool:
try:
root = ET.fromstring(get_ui_dump())
except Exception:
return False
for node in root.iter("node"):
if node.get("class", "").endswith("EditText") or "search" in (node.get("resource-id") or "").lower():
return True
return False
def go_to_search(max_back: int = 5) -> bool:
"""Return the app to a screen with the search field, dismissing any open detail/list
left over from a previous lookup. Without this a fresh lookup silently re-reads the
previous caller's page."""
emu = _get_emulator()
for _ in range(max_back):
if _has_search_field():
return True
emu.adb_keyevent(4) # BACK
time.sleep(1.0)
launch_app() # last resort: relaunch to home
time.sleep(1.5)
return _has_search_field()
def detail_state(input_phone: str) -> str:
"""Classify the current screen:
'match' — a caller detail page AND our number is visible (best)
'wrong' — a detail page but a DIFFERENT number is visibly shown
'unknown_detail' — a detail page; our number isn't printed (app shows 'Personal Line'
with no number). Safe to accept because the caller always reaches
here via go_to_search() + a fresh search.
'no_detail' — not a report detail page (search/home/empty)
"""
try:
dump = get_ui_dump()
except Exception:
return "no_detail"
low = dump.lower()
is_detail = any(m in low for m in _DETAIL_MARKERS)
nat = re.sub(r"\D", "", input_phone)[-10:]
digits = re.sub(r"\D", "", dump)
if len(nat) == 10 and nat in digits:
return "match"
if not is_detail:
return "no_detail"
return "unknown_detail"
def open_report_detail(input_phone: str) -> bool:
"""Ensure we're on the requested caller's report detail. Relies on go_to_search()
having reset us to the search screen first, so a detail page reached afterwards is
the fresh lookup (the number itself isn't always printed on the page)."""
st = detail_state(input_phone)
if st in ("match", "unknown_detail"):
return True
# Landed on the 'Recent lookups' list (or wrong page) — tap this number's row.
digits = re.sub(r"\D", "", input_phone)
nat = digits[-10:] if len(digits) >= 10 else digits
candidates: list[str] = []
if len(nat) == 10:
candidates += [f"({nat[0:3]}) {nat[3:6]}-{nat[6:]}", f"{nat[0:3]}-{nat[3:6]}-{nat[6:]}", f"{nat[3:6]}-{nat[6:]}"]
candidates.append(digits)
if find_and_tap_text(candidates):
time.sleep(3.0)
return detail_state(input_phone) in ("match", "unknown_detail")
return False
def expand_all_reports() -> bool:
"""Tap the 'View all N reports' row so the full history is on screen to scroll."""
return find_and_tap_text(["view all", "see all reports", "view all reports", "all reports", "see all"])
def capture_full_history(phone: str, max_swipes: int = MAX_SCROLL_CAPTURES) -> list[Path]:
"""Screenshot the reports view, scrolling down until it stops moving (bottom).
Returns the list of screenshot paths (top → bottom)."""
emu = _get_emulator()
w, h = emu.screen_size()
x, y_from, y_to = w // 2, int(h * 0.78), int(h * 0.28)
shots = [emu.take_screenshot(phone, tag="0")]
prev_dump: str | None = None
for i in range(1, max_swipes + 1):
emu.adb_swipe(x, y_from, x, y_to, 450)
time.sleep(0.9)
try:
dump = emu.get_ui_dump()
except Exception:
dump = None
if dump is not None and dump == prev_dump:
break # nothing changed after a swipe = reached the bottom
prev_dump = dump
shots.append(emu.take_screenshot(phone, tag=str(i)))
return shots
# ----------------------------------------------------------------------------
# Verdict (deterministic fallback when the SDK profile is unavailable)
# ----------------------------------------------------------------------------
_NEG_KEYWORDS = (
"no show", "no-show", "noshow", "ghost", "flake", "flaked", "stood me up",
"rude", "aggressive", "harass", "boundary", "pushy", "haggl",
"cop", "leo", "police", "law enforcement", "sting", "officer",
"time waster", "timewaster", "timewaste", "scam", "robbery", "violent", "unsafe", "danger",
"chargeback", "refused deposit", "wouldn't pay", "wont pay",
)
def _normalize(text: str) -> str:
return re.sub(r"[^a-z0-9 ]+", " ", text.lower())
def decide_result(extracted: dict[str, Any]) -> str:
"""Deterministic fallback heuristic (used only if the SDK rating profile fails).
Never returns 'approved' over a model 'denied' or a red flag, and it matches
punctuation-variant phrasing (no-show == no show)."""
blob = _normalize(" ".join((extracted.get("reports") or []) + (extracted.get("red_flags") or [])))
suggested = extracted.get("suggested_result")
negative = any(_normalize(kw) in blob for kw in _NEG_KEYWORDS)
if suggested == "denied" or negative:
return "denied"
if suggested in ("approved", "denied", "not_found"):
return suggested
if extracted.get("report_count"):
# reports exist but nothing clearly good/bad → human gate, never auto-approve
return "pending" if not extracted.get("reports") else "approved"
return "pending"
def clean_phone(p: str) -> str:
r"""Leading + (if present) followed by digits (^\+?\d+$)."""
return _clean_phone(p)
def save_history(phone: str, history_obj: dict[str, Any]) -> Path:
"""Persist the full consolidated history + profile to a per-caller JSON file."""
ts = int(time.time())
path = HISTORY_DIR / f"{clean_phone(phone).replace('+', '')}-{ts}.json"
path.write_text(json.dumps(history_obj, indent=2))
return path
def screening_signal_value(result: str, safety_flags: list[dict[str, Any]] | None) -> str | None:
"""Map the verdict to the bare ``valueText`` that consumers (Prospector's
``verdictFromSignal``) switch on: ``denied | cop_flag | approved | error``. A
critical **law-enforcement** flag is the distinct ``cop_flag`` (a cop overrides
approval); other critical flags ride the 'denied' result. 'pending'/'not_found'
map to ``None`` → consumers read that as 'not_screened'."""
for f in safety_flags or []:
if f.get("category") == "law_enforcement" and f.get("severity") == "critical":
return "cop_flag"
if result in ("denied", "approved", "error"):
return result
return None
def record_screening(
phone: str, result: str, raw_obj: dict[str, Any], safety_flags: list[dict[str, Any]] | None, ref: str | None,
) -> dict[str, Any]:
"""Record the verdict as a ``screening_mrnumber`` person signal in the people
service, keyed by the phone number (auto-upserts the person). The bare verdict is
``valueText``; the full record is ``valueJsonb``; ``ref`` (the requester's
correlation id, if any) is carried in ``sourceHandle``."""
value_text = screening_signal_value(result, safety_flags)
score = (raw_obj.get("rating_profile") or {}).get("score")
confidence = round(score / 100, 2) if isinstance(score, (int, float)) else None
return record_people_signal(
base_url=PEOPLE_BASE_URL,
token=PEOPLE_SERVICE_TOKEN,
handle=phone,
channel=PEOPLE_CHANNEL,
signal_type=SCREENING_SIGNAL_TYPE,
source_feature=SOURCE_FEATURE,
value_text=value_text,
value_jsonb=raw_obj,
confidence=confidence,
source_handle=ref,
occurred_at=raw_obj.get("decided_at"),
)
async def main_async(phone: str, ref: str | None, dry_run: bool, dump_ui: bool = False) -> dict[str, Any]:
log(f"[mr-number] Starting lookup for {phone} on {DEVICE} (ref={ref}, dry_run={dry_run})")
input_phone = clean_phone(phone)
if input_phone != phone:
log(f"[mr-number] Cleaned phone for input: {input_phone} (from {phone})")
# 1. Launch, then return to a CLEAN search screen (dismiss any leftover page from a
# previous lookup — otherwise a fresh search silently re-reads the old caller).
launch_app()
time.sleep(1.5)
if not go_to_search():
log("[mr-number] WARNING: could not find the search field; proceeding best-effort.")
if dump_ui:
log("[mr-number] UI dump after reaching search:")
log(get_ui_dump()[:1500])
if not find_edit_text_and_input(input_phone):
adb_text(input_phone)
time.sleep(1.5)
if not find_and_tap_text([f"look up {input_phone}", "look up"]):
adb_keyevent(66)
time.sleep(9.0) # let the paid reports load
# 1b. Confirm we're on the CORRECT caller's detail (number visible on screen).
# Abort rather than rate a stale/wrong page.
if not open_report_detail(input_phone):
shot = take_screenshot(input_phone, tag="nomatch")
msg = (f"could not load the report detail for {input_phone} "
f"(wrong/empty screen — not rating to avoid stale data); screenshot {shot}")
log(f"[mr-number] ERROR: {msg}")
return {
"phone": phone, "inputPhone": input_phone, "result": "error",
"error": "detail_not_loaded", "message": msg,
"screenshots": [str(shot)], "recorded": {"skipped": "error"},
}
log("[mr-number] On the correct report detail page (number verified).")
# 2. Expand the full report list, then scroll-capture all of it
if expand_all_reports():
log("[mr-number] Expanded full report list ('View all reports').")
time.sleep(2.0)
shots = capture_full_history(input_phone)
log(f"[mr-number] Captured {len(shots)} screenshot(s) of the report history.")
# 3. Vision-extract each screenshot, then consolidate + dedupe
extractions: list[dict[str, Any]] = []
for shot in shots:
ex = await _extract_from_screenshot(str(shot), phone)
extractions.append(ex)
history = merge_reports(extractions, phone)
if history.get("ocr_text") and not history.get("reports"):
log(f"[mr-number] OCR backend: {len(history['ocr_text'])} chars of raw text "
f"from {len(shots)} screenshot(s) — reports extracted during rating.")
else:
log(f"[mr-number] Consolidated {history['captured_count']} unique reports "
f"(app declares {history['declared_count']}).")
# 3b. Surface critical safety flags ABOVE everything else — a human reading the
# log (and the verdict) must see 'law enforcement' before any axis score.
safety_flags = history.get("safety_flags") or []
for f in safety_flags:
log(f"[mr-number] {f['icon']} SAFETY FLAG [{f['severity'].upper()}]: {f['label']} "
f"— evidence: {'; '.join(f['evidence'])}")
# 4. Build the multi-axis rating profile via the batch SDK
log("[mr-number] Building rating profile (consolidation via batch SDK)...")
profile = await build_rating_profile(history)
if profile:
# OCR path: the rating LLM extracted the reports — fold them back so the signal
# record + counts reflect them (and re-run safety detection over the real reports).
if not history.get("reports") and isinstance(profile.get("reports"), list):
history["reports"] = [str(r).strip() for r in profile["reports"] if str(r).strip()]
history["captured_count"] = len(history["reports"])
history["report_count"] = max(history.get("report_count") or 0, len(history["reports"]))
history["safety_flags"] = detect_safety_flags(
history["reports"] + [ln.strip() for ln in (history.get("ocr_text") or "").splitlines() if ln.strip()],
history.get("red_flags") or [],
)
safety_flags = history["safety_flags"]
result = result_from_profile(profile)
log(f"[mr-number] Rating: {profile.get('score')}/100 grade {profile.get('grade')} "
f"→ result '{result}' ({profile.get('summary', '')})")
else:
result = decide_result(history)
log(f"[mr-number] Rating profile unavailable; fallback heuristic → '{result}'")
# 4b. Hard floor: a critical safety flag forces 'denied' regardless of the
# model's score — the verdict cannot depend on the LLM weighting an LE/
# violence/robbery/coercion signal correctly.
overridden = apply_safety_override(result, safety_flags)
if overridden != result:
log(f"[mr-number] Safety override: '{result}''denied' "
f"(critical flag: {', '.join(f['category'] for f in safety_flags if f['severity'] == 'critical')})")
result = overridden
# 5. Save full history + profile, build the raw record
raw_obj = {
"source": "mr-number",
"phone": phone,
"classification": history.get("classification"),
"reports": history.get("reports"),
"red_flags": history.get("red_flags"),
"safety_flags": safety_flags,
"report_count": history.get("report_count"),
"captured_count": history.get("captured_count"),
"rating_profile": profile,
"result": result,
"screenshots": [str(s) for s in shots],
"decided_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
history_path = save_history(phone, raw_obj)
log(f"[mr-number] Saved full history → {history_path}")
raw_response = json.dumps(raw_obj, indent=2)
# 6. Record the verdict as a people-service signal (keyed by phone). A real lookup
# always records — there is no per-client gate; the person is auto-upserted.
recorded: dict[str, Any] | None
if not dry_run:
try:
recorded = record_screening(input_phone, result, raw_obj, safety_flags, ref)
log(f"[mr-number] Recorded screening_mrnumber signal "
f"(verdict={screening_signal_value(result, safety_flags)}):", recorded)
except Exception as e:
recorded = {"error": str(e)}
log(f"[mr-number] Recording to people service failed: {e}")
else:
recorded = {"skipped": "dry_run"}
log("[mr-number] Dry run — not recording the people signal.")
if not json_mode():
log("Raw record (would be the signal's valueJsonb):")
log(raw_response)
log("[mr-number] Done.")
return {
"phone": phone,
"inputPhone": input_phone,
"result": result,
"score": (profile or {}).get("score"),
"grade": (profile or {}).get("grade"),
"ratingProfile": profile,
"reports": history.get("reports"),
"safetyFlags": safety_flags,
"classification": history.get("classification"),
"reportCount": history.get("report_count"),
"capturedCount": history.get("captured_count"),
"screenshots": [str(s) for s in shots],
"historyFile": str(history_path),
"decidedAt": raw_obj["decided_at"],
"rawResponse": raw_response,
"recorded": recorded,
}
def main() -> None:
global DEVICE
parser = argparse.ArgumentParser()
parser.add_argument("--phone", required=True, help="Phone number to look up (any format)")
parser.add_argument("--ref", help="Optional requester correlation id (carried into the signal's sourceHandle).")
parser.add_argument("--dry-run", action="store_true", help="Do lookup + vision + rating but do not record the people-service signal")
parser.add_argument("--device", default=DEVICE, help="adb serial or host:port (default emulator-5554)")
parser.add_argument("--dump-ui", action="store_true", help="Dump the current UI hierarchy before actions (calibration)")
parser.add_argument("--json", action="store_true", help="Emit one JSON result object on stdout (progress to stderr). Used by the MCP.")
args = parser.parse_args()
DEVICE = args.device
set_json_mode(args.json)
try:
adb(["shell", "echo", "ok"], check=True)
except Exception as e:
msg = f"Cannot talk to device via adb on {DEVICE}. Is it connected/authorized? {e}"
if args.json:
print(json.dumps({"error": "adb_unavailable", "message": msg}))
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
if args.dump_ui:
log("[mr-number] --dump-ui requested. Current UI hierarchy:")
log(get_ui_dump()[:2000] + "\n... (truncated)")
import asyncio
result = asyncio.run(main_async(args.phone, args.ref, args.dry_run, args.dump_ui))
if args.json:
print(json.dumps(result))
if __name__ == "__main__":
main()