Wire the on-box (Claude-API-less) path decided with the operator: EXTRACT_BACKEND=ocr sends each screenshot to the on-box mrnumber-ocr service (raw text, no per-shot structuring); build_rating_profile uses an OpenAI-compatible LLM on a DO GPU droplet (RATING_LLM_URL) which extracts the reports from the raw OCR text AND produces the multi-axis verdict. Reports are folded back into the history so the people-signal + counts + safety flags reflect them; safety detection also scans the raw OCR lines so a LE term forces cop_flag even before structuring. vision/Claude stays the plum-dev default. +5 tests incl. full OCR→GPU→cop_flag flow. 33/33. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
890 lines
38 KiB
Python
Executable file
890 lines
38 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
mr-number-lookup
|
|
|
|
Drive an Android device (USB phone or the redroid droplet) running the Mr. Number
|
|
app (com.mrnumber.blocker), perform a phone lookup, expand + scroll-capture the
|
|
*full* community-report history, vision-extract every report, consolidate them with
|
|
the lilith claude-code-batch-sdk into a multi-axis **rating profile** (0-100 + letter
|
|
grade) for the caller, decide a screening result, save the full history, and record
|
|
the verdict as a `screening_mrnumber` person signal in the cocotte **people service**
|
|
(persons DB), keyed by the phone number. Prospector and the rest of the cocotte
|
|
ecosystem consume that signal; there is no quinn coupling.
|
|
|
|
The device automation, vision harness, and signal recorder are shared with the
|
|
sibling @whatsapp via the `redroid_client` package (pip: lilith-redroid-client); only
|
|
the Mr. Number-specific navigation, rating profile, and verdict logic live here.
|
|
|
|
Usage:
|
|
python3 mr_lookup.py --phone "+15551234567" [--ref <correlation-id>] [--dry-run]
|
|
|
|
Requires:
|
|
- adb in PATH; a device connected (USB serial, or `adb connect <host>:5555` for redroid)
|
|
with the paid Mr. Number app installed + signed in.
|
|
- PEOPLE_BASE_URL + PEOPLE_SERVICE_TOKEN in env (for recording the signal; mesh-only).
|
|
- The claude batch SDK on disk (for vision + rating consolidation).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import re
|
|
import unicodedata
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from redroid_client import (
|
|
RedroidDevice,
|
|
clean_phone as _clean_phone,
|
|
extract_backend,
|
|
extract_screenshot,
|
|
json_mode,
|
|
load_sdk,
|
|
log,
|
|
ocr_extract,
|
|
ocr_url,
|
|
openai_chat,
|
|
people_base_url,
|
|
people_service_token,
|
|
rating_llm_model,
|
|
rating_llm_url,
|
|
record_people_signal,
|
|
set_json_mode,
|
|
)
|
|
|
|
# --- Config / env
|
|
# Verdicts are recorded as person signals in the cocotte people service (persons DB),
|
|
# keyed by the phone number. Prospector and the rest of the ecosystem consume these
|
|
# signals — there is no quinn coupling. The person is auto-upserted by (handle, channel).
|
|
PEOPLE_BASE_URL = people_base_url()
|
|
PEOPLE_SERVICE_TOKEN = people_service_token()
|
|
# Phone numbers map to the 'sms' people-channel (no dedicated 'phone' channel exists).
|
|
PEOPLE_CHANNEL = "sms"
|
|
SCREENING_SIGNAL_TYPE = "screening_mrnumber"
|
|
SOURCE_FEATURE = "mr-number"
|
|
DEVICE = os.environ.get("MR_NUMBER_DEVICE", "emulator-5554")
|
|
PACKAGE = "com.mrnumber.blocker"
|
|
OUTPUT_DIR = Path(__file__).parent / "output"
|
|
HISTORY_DIR = OUTPUT_DIR / "history"
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
HISTORY_DIR.mkdir(exist_ok=True)
|
|
|
|
# Vision = fast/cheap text-from-image. Rating = reasoning over the consolidated
|
|
# history, so it defaults to a stronger model (override via env).
|
|
VISION_MODEL = os.environ.get("MR_NUMBER_VISION_MODEL", "haiku")
|
|
RATING_MODEL = os.environ.get("MR_NUMBER_RATING_MODEL", "sonnet")
|
|
MAX_SCROLL_CAPTURES = int(os.environ.get("MR_NUMBER_MAX_SCROLLS", "10"))
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Vision extraction (per screenshot)
|
|
# ----------------------------------------------------------------------------
|
|
MR_NUMBER_SYSTEM = (
|
|
"You are looking at a screenshot from the Mr. Number (caller ID + community reports) Android app. "
|
|
"Extract the information shown for the looked-up phone number. Respond ONLY with a single JSON object, no markdown."
|
|
)
|
|
|
|
|
|
def _build_vision_prompt(screenshot_path: str, phone: str) -> str:
|
|
schema = {
|
|
"phone": "the exact phone number that was searched (string)",
|
|
"report_count": "integer or null — the total number of reports the app says exist (e.g. 'View all 7 reports' -> 7), not just visible",
|
|
"reports": "array of strings — every report/comment text VISIBLE in this screenshot, verbatim (the valuable paid content)",
|
|
"classification": "string or null — the label at the top (e.g. 'Personal Line', 'Business', 'Suspected Spam')",
|
|
"red_flags": "array of strings — negative signals mentioned (no-show, ghosting, rude, cop/law-enforcement, timewaster, boundary issues, etc.)",
|
|
"summary": "short one-sentence impression from the reports visible here",
|
|
"suggested_result": "one of: approved, denied, not_found — your best guess from what's visible",
|
|
}
|
|
return (
|
|
f"Read the image file at: {screenshot_path}\n\n"
|
|
f"This is a screenshot after looking up {phone} in the Mr. Number app.\n"
|
|
"Extract the community reports and any top-level caller info VISIBLE in this image. "
|
|
"Transcribe report text verbatim — do not paraphrase. "
|
|
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
|
|
)
|
|
|
|
|
|
async def _extract_from_screenshot(screenshot_path: str, phone: str) -> dict[str, Any]:
|
|
"""Per-screenshot extraction. With EXTRACT_BACKEND=vision (plum dev) Claude returns a
|
|
structured report dict. With EXTRACT_BACKEND=ocr (the box) the on-box tesseract service
|
|
returns raw screen text — there is no per-shot structuring; the rating LLM does the
|
|
extraction + reasoning from the concatenated OCR text downstream."""
|
|
if extract_backend() == "ocr":
|
|
payload = ocr_extract(str(screenshot_path), base_url=ocr_url())
|
|
return {"reports": [], "red_flags": [], "classification": None,
|
|
"report_count": None, "raw_ocr": payload.get("text", "")}
|
|
return await extract_screenshot(
|
|
screenshot_path=str(screenshot_path),
|
|
system=MR_NUMBER_SYSTEM,
|
|
prompt=_build_vision_prompt(str(screenshot_path), phone),
|
|
model=VISION_MODEL,
|
|
)
|
|
|
|
|
|
def merge_reports(extractions: list[dict[str, Any]], phone: str) -> dict[str, Any]:
|
|
"""Consolidate per-screenshot extractions into one deduped report history."""
|
|
reports: list[str] = []
|
|
seen: set[str] = set()
|
|
red_flags: list[str] = []
|
|
red_seen: set[str] = set()
|
|
classification: str | None = None
|
|
declared_count = 0
|
|
ocr_chunks: list[str] = []
|
|
|
|
for ex in extractions:
|
|
if not isinstance(ex, dict):
|
|
continue
|
|
chunk = (ex.get("raw_ocr") or "").strip()
|
|
if chunk:
|
|
ocr_chunks.append(chunk)
|
|
if not classification and ex.get("classification"):
|
|
classification = ex.get("classification")
|
|
rc = ex.get("report_count")
|
|
if isinstance(rc, int):
|
|
declared_count = max(declared_count, rc)
|
|
for r in ex.get("reports") or []:
|
|
key = re.sub(r"\s+", " ", str(r).strip().lower())
|
|
if key and key not in seen:
|
|
seen.add(key)
|
|
reports.append(str(r).strip())
|
|
for f in ex.get("red_flags") or []:
|
|
key = re.sub(r"\s+", " ", str(f).strip().lower())
|
|
if key and key not in red_seen:
|
|
red_seen.add(key)
|
|
red_flags.append(str(f).strip())
|
|
|
|
ocr_text = "\n".join(ocr_chunks)
|
|
# Safety flags are deterministic keyword/regex over text, so for the OCR backend we
|
|
# scan the raw OCR lines too (the LE/violence signal must be caught even before the
|
|
# rating LLM structures the reports).
|
|
safety_inputs = reports + [ln.strip() for ln in ocr_text.splitlines() if ln.strip()]
|
|
return {
|
|
"phone": phone,
|
|
"reports": reports,
|
|
"red_flags": red_flags,
|
|
"classification": classification,
|
|
"ocr_text": ocr_text,
|
|
# report_count = the larger of what the app declared vs. how many we captured
|
|
"report_count": max(declared_count, len(reports)),
|
|
"captured_count": len(reports),
|
|
"declared_count": declared_count,
|
|
# Critical safety signals, promoted OUT of the flat report/flag lists so a
|
|
# human (and the verdict) never has to find them buried in row 7 of 14.
|
|
"safety_flags": detect_safety_flags(safety_inputs, red_flags),
|
|
}
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Safety-flag promotion — deterministic, surfaced ABOVE the rating profile.
|
|
#
|
|
# The rating LLM folds the single most dangerous signal (a law-enforcement sting,
|
|
# violence, robbery, coercion) into a flat `red_flags` list and can under-weight
|
|
# it — e.g. it scored "Es policía" as just one of fourteen flags. These categories
|
|
# are a direct threat to the worker's safety and liberty, so we detect them
|
|
# DETERMINISTICALLY from the report text (not trusting the model to resurface what
|
|
# it just buried), promote them to a top-level `safety_flags` array with an icon +
|
|
# the matched evidence line, and force a 'denied' regardless of the model's score.
|
|
# Matching is accent- and language-folded so the Spanish "Es policía" matches too.
|
|
# ----------------------------------------------------------------------------
|
|
SAFETY_TAXONOMY: tuple[dict[str, Any], ...] = (
|
|
{
|
|
"category": "law_enforcement",
|
|
"icon": "🚔",
|
|
"label": "Law enforcement / sting",
|
|
"severity": "critical",
|
|
"patterns": (
|
|
r"\bpolice\b", r"\bpolicia\b", r"\bpoli\b", r"\bcops?\b", r"\bleo\b",
|
|
r"\blaw enforcement\b", r"\bsting\b", r"\bundercover\b", r"\bofficer\b",
|
|
r"\bfeds?\b", r"\bdetective\b", r"\bvice\b", r"\bentrapment\b",
|
|
),
|
|
},
|
|
{
|
|
"category": "violence",
|
|
"icon": "⚠️",
|
|
"label": "Violence / weapon",
|
|
"severity": "critical",
|
|
"patterns": (
|
|
r"\bviolen", r"\bassault", r"\bweapon", r"\bgun\b", r"\bknife\b",
|
|
r"\bchok", r"\bstrangl", r"\brape", r"\bbeat me", r"\bhit me",
|
|
r"\battacked\b", r"\bhurt me", r"\bforced himself",
|
|
),
|
|
},
|
|
{
|
|
"category": "robbery",
|
|
"icon": "🚨",
|
|
"label": "Robbery / theft",
|
|
"severity": "critical",
|
|
"patterns": (
|
|
r"\brobbed\b", r"\brobbery\b", r"\bmugg", r"\bstole\b", r"\bstolen\b",
|
|
r"\btheft\b", r"\bheld up\b",
|
|
),
|
|
},
|
|
{
|
|
"category": "coercion",
|
|
"icon": "🛑",
|
|
"label": "Coercion / threat",
|
|
"severity": "critical",
|
|
"patterns": (
|
|
r"\bthreaten", r"\bthreat\b", r"\bblackmail", r"\bextort", r"\bcoerce",
|
|
r"\bheld me\b", r"\bwouldn t let me\b", r"\bwould not let me\b",
|
|
),
|
|
},
|
|
)
|
|
|
|
|
|
def _fold(text: str) -> str:
|
|
"""Lowercase + strip accents (NFKD) + collapse non-alnum to single spaces, so
|
|
'Es policía' folds to 'es policia' and word-boundary patterns match across
|
|
punctuation, emoji, and accented Spanish."""
|
|
decomposed = unicodedata.normalize("NFKD", text)
|
|
stripped = "".join(c for c in decomposed if not unicodedata.combining(c))
|
|
return re.sub(r"\s+", " ", re.sub(r"[^a-z0-9]+", " ", stripped.lower())).strip()
|
|
|
|
|
|
def detect_safety_flags(reports: list[str], red_flags: list[str]) -> list[dict[str, Any]]:
|
|
"""Scan every report/flag line for critical-safety categories and return a
|
|
deduped, taxonomy-ordered list of {category, icon, label, severity, evidence}.
|
|
`evidence` holds the original lines that tripped the category."""
|
|
lines = [str(x).strip() for x in (*reports, *red_flags) if str(x).strip()]
|
|
folded = [(line, _fold(line)) for line in lines]
|
|
flags: list[dict[str, Any]] = []
|
|
for entry in SAFETY_TAXONOMY:
|
|
evidence: list[str] = []
|
|
seen: set[str] = set()
|
|
for original, fold in folded:
|
|
if original.lower() in seen:
|
|
continue
|
|
if any(re.search(p, fold) for p in entry["patterns"]):
|
|
evidence.append(original)
|
|
seen.add(original.lower())
|
|
if evidence:
|
|
flags.append({
|
|
"category": entry["category"],
|
|
"icon": entry["icon"],
|
|
"label": entry["label"],
|
|
"severity": entry["severity"],
|
|
"evidence": evidence,
|
|
})
|
|
return flags
|
|
|
|
|
|
def has_critical_safety_flag(safety_flags: list[dict[str, Any]] | None) -> bool:
|
|
return any((f or {}).get("severity") == "critical" for f in (safety_flags or []))
|
|
|
|
|
|
def apply_safety_override(result: str, safety_flags: list[dict[str, Any]] | None) -> str:
|
|
"""A critical safety flag forces 'denied' no matter what the rating produced —
|
|
the hard floor that does not depend on the LLM scoring the signal correctly."""
|
|
return "denied" if has_critical_safety_flag(safety_flags) else result
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Rating profile (consolidation via the batch SDK)
|
|
# ----------------------------------------------------------------------------
|
|
RATING_SYSTEM = (
|
|
"You are a trust-and-safety analyst for an independent adult-industry provider (legal, "
|
|
"regulated). You read crowdsourced caller reports from Mr. Number and produce a structured "
|
|
"rating profile for the caller — how safe and worthwhile they are as a potential client. "
|
|
"Respond ONLY with a single JSON object, no markdown.\n\n"
|
|
"DOMAIN NUANCE — read signals like an insider, not literally:\n"
|
|
"- DEPOSITS ARE GOOD. A report mentioning the caller 'paid a deposit', 'sent a deposit', "
|
|
"'offered/asked to send a deposit', or 'always deposits' is a STRONG POSITIVE — deposit-payers "
|
|
"are serious, vetted, low-risk clients. Weight this heavily toward A/B. Only 'refused/won't pay "
|
|
"a deposit' or 'chargeback' is negative.\n"
|
|
"- 'Get a deposit' / 'make him deposit' written as advice from another provider means the caller "
|
|
"is known to follow through once a deposit is taken — treat as a manageable/positive signal, NOT a red flag.\n"
|
|
"- RELIABILITY: no-show, ghosting, flaking, cancelling last-minute → negative.\n"
|
|
"- SAFETY (critical): law enforcement / cop / sting / 'asks weird LE questions', violence, coercion, "
|
|
"robbery, attempts to remove agency → severe negative; if present, recommend denied regardless of other axes.\n"
|
|
"- RESPECT: rude, pushy, haggling, boundary-pushing → negative.\n"
|
|
"- MIXED REVIEWS: when reports conflict, do NOT average blindly — score each axis on its own evidence "
|
|
"and explain the split.\n\n"
|
|
"SCORING: 0-100 overall (higher = safer/better client). Grade A>=85, B 70-84, C 55-69, D 40-54, F<40."
|
|
)
|
|
|
|
|
|
def _build_rating_prompt(history: dict[str, Any]) -> str:
|
|
schema = {
|
|
"score": "integer 0-100 — overall safety/desirability as a client",
|
|
"grade": "one of A,B,C,D,F (A>=85, B 70-84, C 55-69, D 40-54, F<40)",
|
|
"is_mixed": "boolean — true if the reports conflict / are genuinely mixed",
|
|
"axes": {
|
|
"reliability": {"score": "0-100", "note": "shows up vs no-shows/ghosting/flaking"},
|
|
"payment": {"score": "0-100", "note": "deposits (GOOD), pays agreed rate, no haggling/chargebacks"},
|
|
"respect": {"score": "0-100", "note": "politeness, respects boundaries, not pushy"},
|
|
"safety": {"score": "0-100", "note": "no law-enforcement/violence/coercion signals"},
|
|
},
|
|
"reports": "array of strings — the verbatim community report texts (extract them from the raw OCR when given raw text)",
|
|
"positive_signals": "array of strings — concrete positives found (quote/paraphrase the report)",
|
|
"negative_signals": "array of strings — concrete negatives found",
|
|
"nuanced_notes": "array of strings — where you read a signal NON-literally (e.g. deposit mentions as positive)",
|
|
"summary": "2-3 sentence consolidated profile of this caller",
|
|
"recommended_result": "one of: approved, denied, pending, not_found",
|
|
}
|
|
safety_flags = history.get("safety_flags") or []
|
|
safety_block = ""
|
|
if safety_flags:
|
|
promoted = "; ".join(f"{f['label']} ({'; '.join(f['evidence'])})" for f in safety_flags)
|
|
safety_block = (
|
|
f"PROMOTED CRITICAL SAFETY FLAGS (already detected — score the safety axis "
|
|
f"at/near 0 and recommend denied): {promoted}\n\n"
|
|
)
|
|
# OCR backend gives raw screen text (no pre-structured reports) — ask the model to
|
|
# extract the reports first; the vision backend already provides a clean report list.
|
|
ocr_text = (history.get("ocr_text") or "").strip()
|
|
if not (history.get("reports")) and ocr_text:
|
|
content_block = (
|
|
"Raw OCR text of the Mr. Number report screen(s) — noisy (UI chrome, partial "
|
|
"lines). FIRST extract the genuine community report texts (ignore buttons, "
|
|
"headers, nav), then rate:\n"
|
|
f"<<<OCR\n{ocr_text}\nOCR>>>\n\n"
|
|
)
|
|
else:
|
|
reports_block = "\n".join(f"- {r}" for r in history.get("reports") or []) or "(no report text captured)"
|
|
content_block = f"All captured community reports:\n{reports_block}\n\n"
|
|
return (
|
|
f"Caller: {history.get('phone')}\n"
|
|
f"App classification: {history.get('classification')}\n"
|
|
f"Reports the app says exist: {history.get('report_count')} "
|
|
f"(captured {history.get('captured_count')})\n\n"
|
|
f"{content_block}"
|
|
f"{safety_block}"
|
|
f"Flagged terms: {', '.join(history.get('red_flags') or []) or '(none)'}\n\n"
|
|
"Produce the caller's rating profile. Apply the domain nuance from the system prompt "
|
|
"(especially: deposits are a positive signal; law-enforcement signals force denied). "
|
|
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
|
|
)
|
|
|
|
|
|
def _extract_json(text: str) -> dict[str, Any] | None:
|
|
"""Pull the first JSON object out of an LLM response (handles ```json fences / prose)."""
|
|
if not text:
|
|
return None
|
|
fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
|
candidate = fence.group(1) if fence else None
|
|
if candidate is None:
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
candidate = text[start:end + 1] if start != -1 and end > start else None
|
|
if not candidate:
|
|
return None
|
|
try:
|
|
obj = json.loads(candidate)
|
|
return obj if isinstance(obj, dict) else None
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _normalize_profile(parsed: dict[str, Any] | None) -> dict[str, Any] | None:
|
|
if not isinstance(parsed, dict):
|
|
return None
|
|
score = parsed.get("score")
|
|
if isinstance(score, (int, float)):
|
|
parsed["score"] = int(score)
|
|
if not parsed.get("grade"):
|
|
parsed["grade"] = grade_from_score(parsed["score"])
|
|
return parsed
|
|
|
|
|
|
async def build_rating_profile(history: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""Consolidate the report history into a multi-axis rating profile. Backend is
|
|
env-selected: RATING_LLM_URL set → an OpenAI-compatible LLM on the DO GPU droplet
|
|
(also does report extraction from raw OCR); else the Claude batch SDK (plum dev)."""
|
|
if not (history.get("reports") or history.get("ocr_text")):
|
|
return None
|
|
prompt = _build_rating_prompt(history)
|
|
|
|
gpu_url = rating_llm_url()
|
|
if gpu_url:
|
|
content = openai_chat(base_url=gpu_url, model=rating_llm_model(),
|
|
system=RATING_SYSTEM, user=prompt)
|
|
return _normalize_profile(_extract_json(content))
|
|
|
|
ClaudeClient, parse_json_response = load_sdk()
|
|
client = ClaudeClient(model=RATING_MODEL, max_concurrent=1)
|
|
resp = await client.generate(system=RATING_SYSTEM, user=prompt, cwd=str(OUTPUT_DIR), allowed_tools=[])
|
|
if not resp:
|
|
return None
|
|
return _normalize_profile(parse_json_response(resp))
|
|
|
|
|
|
def grade_from_score(score: int | float | None) -> str:
|
|
if score is None:
|
|
return "?"
|
|
if score >= 85:
|
|
return "A"
|
|
if score >= 70:
|
|
return "B"
|
|
if score >= 55:
|
|
return "C"
|
|
if score >= 40:
|
|
return "D"
|
|
return "F"
|
|
|
|
|
|
def result_from_score(score: int | float | None) -> str:
|
|
if score is None:
|
|
return "pending"
|
|
if score >= 70:
|
|
return "approved"
|
|
if score < 45:
|
|
return "denied"
|
|
return "pending"
|
|
|
|
|
|
def result_from_profile(profile: dict[str, Any] | None) -> str:
|
|
"""Map the rating profile to a screening result enum, with a hard safety override."""
|
|
if not profile:
|
|
return "pending"
|
|
axes = profile.get("axes") or {}
|
|
safety = axes.get("safety") or {}
|
|
s_score = safety.get("score")
|
|
if isinstance(s_score, (int, float)) and s_score < 30:
|
|
return "denied" # law-enforcement/violence signal overrides everything
|
|
rec = profile.get("recommended_result")
|
|
if rec in ("approved", "denied", "pending", "not_found"):
|
|
return rec
|
|
return result_from_score(profile.get("score"))
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# adb device control — Mr. Number specifics over the shared RedroidDevice base
|
|
# ----------------------------------------------------------------------------
|
|
class MrNumberEmulator(RedroidDevice):
|
|
"""The Mr. Number device controller. The generic adb/UI surface is inherited
|
|
from `RedroidDevice`; the Mr. Number-specific navigation lives in the
|
|
module-level functions below (they compose the inherited primitives)."""
|
|
|
|
ui_dump_remote = "/sdcard/mr_ui.xml"
|
|
ui_dump_local = "/tmp/mr_ui.xml"
|
|
screenshot_remote = "/sdcard/mr_result.png"
|
|
screenshot_prefix = "mr-number"
|
|
|
|
def __init__(self, device: str | None = None, package: str | None = None):
|
|
super().__init__(device or DEVICE, package or PACKAGE, OUTPUT_DIR)
|
|
|
|
|
|
# Module-level shims (existing call sites + patchability in tests)
|
|
_emulator: MrNumberEmulator | None = None
|
|
|
|
|
|
def _get_emulator() -> MrNumberEmulator:
|
|
global _emulator
|
|
if _emulator is None:
|
|
_emulator = MrNumberEmulator()
|
|
return _emulator
|
|
|
|
|
|
def adb(args: list[str], check: bool = True) -> str:
|
|
return _get_emulator().adb(args, check)
|
|
|
|
|
|
def adb_text(text: str) -> None:
|
|
_get_emulator().adb_text(text)
|
|
|
|
|
|
def adb_keyevent(code: int) -> None:
|
|
_get_emulator().adb_keyevent(code)
|
|
|
|
|
|
def get_ui_dump() -> str:
|
|
return _get_emulator().get_ui_dump()
|
|
|
|
|
|
def find_and_tap_text(target_texts: list[str]) -> bool:
|
|
return _get_emulator().find_and_tap_text(target_texts)
|
|
|
|
|
|
def find_edit_text_and_input(phone: str) -> bool:
|
|
return _get_emulator().find_edit_text_and_input(phone)
|
|
|
|
|
|
def launch_app() -> None:
|
|
_get_emulator().launch_app()
|
|
|
|
|
|
def take_screenshot(phone: str, tag: str = "") -> Path:
|
|
return _get_emulator().take_screenshot(phone, tag)
|
|
|
|
|
|
_DETAIL_MARKERS = ("recent reports", "report caller", "user reports", "view all", "block number", "block caller")
|
|
|
|
|
|
def _has_search_field() -> bool:
|
|
try:
|
|
root = ET.fromstring(get_ui_dump())
|
|
except Exception:
|
|
return False
|
|
for node in root.iter("node"):
|
|
if node.get("class", "").endswith("EditText") or "search" in (node.get("resource-id") or "").lower():
|
|
return True
|
|
return False
|
|
|
|
|
|
def go_to_search(max_back: int = 5) -> bool:
|
|
"""Return the app to a screen with the search field, dismissing any open detail/list
|
|
left over from a previous lookup. Without this a fresh lookup silently re-reads the
|
|
previous caller's page."""
|
|
emu = _get_emulator()
|
|
for _ in range(max_back):
|
|
if _has_search_field():
|
|
return True
|
|
emu.adb_keyevent(4) # BACK
|
|
time.sleep(1.0)
|
|
launch_app() # last resort: relaunch to home
|
|
time.sleep(1.5)
|
|
return _has_search_field()
|
|
|
|
|
|
def detail_state(input_phone: str) -> str:
|
|
"""Classify the current screen:
|
|
'match' — a caller detail page AND our number is visible (best)
|
|
'wrong' — a detail page but a DIFFERENT number is visibly shown
|
|
'unknown_detail' — a detail page; our number isn't printed (app shows 'Personal Line'
|
|
with no number). Safe to accept because the caller always reaches
|
|
here via go_to_search() + a fresh search.
|
|
'no_detail' — not a report detail page (search/home/empty)
|
|
"""
|
|
try:
|
|
dump = get_ui_dump()
|
|
except Exception:
|
|
return "no_detail"
|
|
low = dump.lower()
|
|
is_detail = any(m in low for m in _DETAIL_MARKERS)
|
|
nat = re.sub(r"\D", "", input_phone)[-10:]
|
|
digits = re.sub(r"\D", "", dump)
|
|
if len(nat) == 10 and nat in digits:
|
|
return "match"
|
|
if not is_detail:
|
|
return "no_detail"
|
|
return "unknown_detail"
|
|
|
|
|
|
def open_report_detail(input_phone: str) -> bool:
|
|
"""Ensure we're on the requested caller's report detail. Relies on go_to_search()
|
|
having reset us to the search screen first, so a detail page reached afterwards is
|
|
the fresh lookup (the number itself isn't always printed on the page)."""
|
|
st = detail_state(input_phone)
|
|
if st in ("match", "unknown_detail"):
|
|
return True
|
|
# Landed on the 'Recent lookups' list (or wrong page) — tap this number's row.
|
|
digits = re.sub(r"\D", "", input_phone)
|
|
nat = digits[-10:] if len(digits) >= 10 else digits
|
|
candidates: list[str] = []
|
|
if len(nat) == 10:
|
|
candidates += [f"({nat[0:3]}) {nat[3:6]}-{nat[6:]}", f"{nat[0:3]}-{nat[3:6]}-{nat[6:]}", f"{nat[3:6]}-{nat[6:]}"]
|
|
candidates.append(digits)
|
|
if find_and_tap_text(candidates):
|
|
time.sleep(3.0)
|
|
return detail_state(input_phone) in ("match", "unknown_detail")
|
|
return False
|
|
|
|
|
|
def expand_all_reports() -> bool:
|
|
"""Tap the 'View all N reports' row so the full history is on screen to scroll."""
|
|
return find_and_tap_text(["view all", "see all reports", "view all reports", "all reports", "see all"])
|
|
|
|
|
|
def capture_full_history(phone: str, max_swipes: int = MAX_SCROLL_CAPTURES) -> list[Path]:
|
|
"""Screenshot the reports view, scrolling down until it stops moving (bottom).
|
|
Returns the list of screenshot paths (top → bottom)."""
|
|
emu = _get_emulator()
|
|
w, h = emu.screen_size()
|
|
x, y_from, y_to = w // 2, int(h * 0.78), int(h * 0.28)
|
|
shots = [emu.take_screenshot(phone, tag="0")]
|
|
prev_dump: str | None = None
|
|
for i in range(1, max_swipes + 1):
|
|
emu.adb_swipe(x, y_from, x, y_to, 450)
|
|
time.sleep(0.9)
|
|
try:
|
|
dump = emu.get_ui_dump()
|
|
except Exception:
|
|
dump = None
|
|
if dump is not None and dump == prev_dump:
|
|
break # nothing changed after a swipe = reached the bottom
|
|
prev_dump = dump
|
|
shots.append(emu.take_screenshot(phone, tag=str(i)))
|
|
return shots
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Verdict (deterministic fallback when the SDK profile is unavailable)
|
|
# ----------------------------------------------------------------------------
|
|
_NEG_KEYWORDS = (
|
|
"no show", "no-show", "noshow", "ghost", "flake", "flaked", "stood me up",
|
|
"rude", "aggressive", "harass", "boundary", "pushy", "haggl",
|
|
"cop", "leo", "police", "law enforcement", "sting", "officer",
|
|
"time waster", "timewaster", "timewaste", "scam", "robbery", "violent", "unsafe", "danger",
|
|
"chargeback", "refused deposit", "wouldn't pay", "wont pay",
|
|
)
|
|
|
|
|
|
def _normalize(text: str) -> str:
|
|
return re.sub(r"[^a-z0-9 ]+", " ", text.lower())
|
|
|
|
|
|
def decide_result(extracted: dict[str, Any]) -> str:
|
|
"""Deterministic fallback heuristic (used only if the SDK rating profile fails).
|
|
Never returns 'approved' over a model 'denied' or a red flag, and it matches
|
|
punctuation-variant phrasing (no-show == no show)."""
|
|
blob = _normalize(" ".join((extracted.get("reports") or []) + (extracted.get("red_flags") or [])))
|
|
suggested = extracted.get("suggested_result")
|
|
negative = any(_normalize(kw) in blob for kw in _NEG_KEYWORDS)
|
|
|
|
if suggested == "denied" or negative:
|
|
return "denied"
|
|
if suggested in ("approved", "denied", "not_found"):
|
|
return suggested
|
|
if extracted.get("report_count"):
|
|
# reports exist but nothing clearly good/bad → human gate, never auto-approve
|
|
return "pending" if not extracted.get("reports") else "approved"
|
|
return "pending"
|
|
|
|
|
|
def clean_phone(p: str) -> str:
|
|
r"""Leading + (if present) followed by digits (^\+?\d+$)."""
|
|
return _clean_phone(p)
|
|
|
|
|
|
def save_history(phone: str, history_obj: dict[str, Any]) -> Path:
|
|
"""Persist the full consolidated history + profile to a per-caller JSON file."""
|
|
ts = int(time.time())
|
|
path = HISTORY_DIR / f"{clean_phone(phone).replace('+', '')}-{ts}.json"
|
|
path.write_text(json.dumps(history_obj, indent=2))
|
|
return path
|
|
|
|
|
|
def screening_signal_value(result: str, safety_flags: list[dict[str, Any]] | None) -> str | None:
|
|
"""Map the verdict to the bare ``valueText`` that consumers (Prospector's
|
|
``verdictFromSignal``) switch on: ``denied | cop_flag | approved | error``. A
|
|
critical **law-enforcement** flag is the distinct ``cop_flag`` (a cop overrides
|
|
approval); other critical flags ride the 'denied' result. 'pending'/'not_found'
|
|
map to ``None`` → consumers read that as 'not_screened'."""
|
|
for f in safety_flags or []:
|
|
if f.get("category") == "law_enforcement" and f.get("severity") == "critical":
|
|
return "cop_flag"
|
|
if result in ("denied", "approved", "error"):
|
|
return result
|
|
return None
|
|
|
|
|
|
def record_screening(
|
|
phone: str, result: str, raw_obj: dict[str, Any], safety_flags: list[dict[str, Any]] | None, ref: str | None,
|
|
) -> dict[str, Any]:
|
|
"""Record the verdict as a ``screening_mrnumber`` person signal in the people
|
|
service, keyed by the phone number (auto-upserts the person). The bare verdict is
|
|
``valueText``; the full record is ``valueJsonb``; ``ref`` (the requester's
|
|
correlation id, if any) is carried in ``sourceHandle``."""
|
|
value_text = screening_signal_value(result, safety_flags)
|
|
score = (raw_obj.get("rating_profile") or {}).get("score")
|
|
confidence = round(score / 100, 2) if isinstance(score, (int, float)) else None
|
|
return record_people_signal(
|
|
base_url=PEOPLE_BASE_URL,
|
|
token=PEOPLE_SERVICE_TOKEN,
|
|
handle=phone,
|
|
channel=PEOPLE_CHANNEL,
|
|
signal_type=SCREENING_SIGNAL_TYPE,
|
|
source_feature=SOURCE_FEATURE,
|
|
value_text=value_text,
|
|
value_jsonb=raw_obj,
|
|
confidence=confidence,
|
|
source_handle=ref,
|
|
occurred_at=raw_obj.get("decided_at"),
|
|
)
|
|
|
|
|
|
async def main_async(phone: str, ref: str | None, dry_run: bool, dump_ui: bool = False) -> dict[str, Any]:
|
|
log(f"[mr-number] Starting lookup for {phone} on {DEVICE} (ref={ref}, dry_run={dry_run})")
|
|
|
|
input_phone = clean_phone(phone)
|
|
if input_phone != phone:
|
|
log(f"[mr-number] Cleaned phone for input: {input_phone} (from {phone})")
|
|
|
|
# 1. Launch, then return to a CLEAN search screen (dismiss any leftover page from a
|
|
# previous lookup — otherwise a fresh search silently re-reads the old caller).
|
|
launch_app()
|
|
time.sleep(1.5)
|
|
if not go_to_search():
|
|
log("[mr-number] WARNING: could not find the search field; proceeding best-effort.")
|
|
if dump_ui:
|
|
log("[mr-number] UI dump after reaching search:")
|
|
log(get_ui_dump()[:1500])
|
|
|
|
if not find_edit_text_and_input(input_phone):
|
|
adb_text(input_phone)
|
|
time.sleep(1.5)
|
|
if not find_and_tap_text([f"look up {input_phone}", "look up"]):
|
|
adb_keyevent(66)
|
|
time.sleep(9.0) # let the paid reports load
|
|
|
|
# 1b. Confirm we're on the CORRECT caller's detail (number visible on screen).
|
|
# Abort rather than rate a stale/wrong page.
|
|
if not open_report_detail(input_phone):
|
|
shot = take_screenshot(input_phone, tag="nomatch")
|
|
msg = (f"could not load the report detail for {input_phone} "
|
|
f"(wrong/empty screen — not rating to avoid stale data); screenshot {shot}")
|
|
log(f"[mr-number] ERROR: {msg}")
|
|
return {
|
|
"phone": phone, "inputPhone": input_phone, "result": "error",
|
|
"error": "detail_not_loaded", "message": msg,
|
|
"screenshots": [str(shot)], "recorded": {"skipped": "error"},
|
|
}
|
|
log("[mr-number] On the correct report detail page (number verified).")
|
|
|
|
# 2. Expand the full report list, then scroll-capture all of it
|
|
if expand_all_reports():
|
|
log("[mr-number] Expanded full report list ('View all reports').")
|
|
time.sleep(2.0)
|
|
shots = capture_full_history(input_phone)
|
|
log(f"[mr-number] Captured {len(shots)} screenshot(s) of the report history.")
|
|
|
|
# 3. Vision-extract each screenshot, then consolidate + dedupe
|
|
extractions: list[dict[str, Any]] = []
|
|
for shot in shots:
|
|
ex = await _extract_from_screenshot(str(shot), phone)
|
|
extractions.append(ex)
|
|
history = merge_reports(extractions, phone)
|
|
if history.get("ocr_text") and not history.get("reports"):
|
|
log(f"[mr-number] OCR backend: {len(history['ocr_text'])} chars of raw text "
|
|
f"from {len(shots)} screenshot(s) — reports extracted during rating.")
|
|
else:
|
|
log(f"[mr-number] Consolidated {history['captured_count']} unique reports "
|
|
f"(app declares {history['declared_count']}).")
|
|
|
|
# 3b. Surface critical safety flags ABOVE everything else — a human reading the
|
|
# log (and the verdict) must see 'law enforcement' before any axis score.
|
|
safety_flags = history.get("safety_flags") or []
|
|
for f in safety_flags:
|
|
log(f"[mr-number] {f['icon']} SAFETY FLAG [{f['severity'].upper()}]: {f['label']} "
|
|
f"— evidence: {'; '.join(f['evidence'])}")
|
|
|
|
# 4. Build the multi-axis rating profile via the batch SDK
|
|
log("[mr-number] Building rating profile (consolidation via batch SDK)...")
|
|
profile = await build_rating_profile(history)
|
|
if profile:
|
|
# OCR path: the rating LLM extracted the reports — fold them back so the signal
|
|
# record + counts reflect them (and re-run safety detection over the real reports).
|
|
if not history.get("reports") and isinstance(profile.get("reports"), list):
|
|
history["reports"] = [str(r).strip() for r in profile["reports"] if str(r).strip()]
|
|
history["captured_count"] = len(history["reports"])
|
|
history["report_count"] = max(history.get("report_count") or 0, len(history["reports"]))
|
|
history["safety_flags"] = detect_safety_flags(
|
|
history["reports"] + [ln.strip() for ln in (history.get("ocr_text") or "").splitlines() if ln.strip()],
|
|
history.get("red_flags") or [],
|
|
)
|
|
safety_flags = history["safety_flags"]
|
|
result = result_from_profile(profile)
|
|
log(f"[mr-number] Rating: {profile.get('score')}/100 grade {profile.get('grade')} "
|
|
f"→ result '{result}' ({profile.get('summary', '')})")
|
|
else:
|
|
result = decide_result(history)
|
|
log(f"[mr-number] Rating profile unavailable; fallback heuristic → '{result}'")
|
|
|
|
# 4b. Hard floor: a critical safety flag forces 'denied' regardless of the
|
|
# model's score — the verdict cannot depend on the LLM weighting an LE/
|
|
# violence/robbery/coercion signal correctly.
|
|
overridden = apply_safety_override(result, safety_flags)
|
|
if overridden != result:
|
|
log(f"[mr-number] Safety override: '{result}' → 'denied' "
|
|
f"(critical flag: {', '.join(f['category'] for f in safety_flags if f['severity'] == 'critical')})")
|
|
result = overridden
|
|
|
|
# 5. Save full history + profile, build the raw record
|
|
raw_obj = {
|
|
"source": "mr-number",
|
|
"phone": phone,
|
|
"classification": history.get("classification"),
|
|
"reports": history.get("reports"),
|
|
"red_flags": history.get("red_flags"),
|
|
"safety_flags": safety_flags,
|
|
"report_count": history.get("report_count"),
|
|
"captured_count": history.get("captured_count"),
|
|
"rating_profile": profile,
|
|
"result": result,
|
|
"screenshots": [str(s) for s in shots],
|
|
"decided_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
}
|
|
history_path = save_history(phone, raw_obj)
|
|
log(f"[mr-number] Saved full history → {history_path}")
|
|
raw_response = json.dumps(raw_obj, indent=2)
|
|
|
|
# 6. Record the verdict as a people-service signal (keyed by phone). A real lookup
|
|
# always records — there is no per-client gate; the person is auto-upserted.
|
|
recorded: dict[str, Any] | None
|
|
if not dry_run:
|
|
try:
|
|
recorded = record_screening(input_phone, result, raw_obj, safety_flags, ref)
|
|
log(f"[mr-number] Recorded screening_mrnumber signal "
|
|
f"(verdict={screening_signal_value(result, safety_flags)}):", recorded)
|
|
except Exception as e:
|
|
recorded = {"error": str(e)}
|
|
log(f"[mr-number] Recording to people service failed: {e}")
|
|
else:
|
|
recorded = {"skipped": "dry_run"}
|
|
log("[mr-number] Dry run — not recording the people signal.")
|
|
if not json_mode():
|
|
log("Raw record (would be the signal's valueJsonb):")
|
|
log(raw_response)
|
|
|
|
log("[mr-number] Done.")
|
|
|
|
return {
|
|
"phone": phone,
|
|
"inputPhone": input_phone,
|
|
"result": result,
|
|
"score": (profile or {}).get("score"),
|
|
"grade": (profile or {}).get("grade"),
|
|
"ratingProfile": profile,
|
|
"reports": history.get("reports"),
|
|
"safetyFlags": safety_flags,
|
|
"classification": history.get("classification"),
|
|
"reportCount": history.get("report_count"),
|
|
"capturedCount": history.get("captured_count"),
|
|
"screenshots": [str(s) for s in shots],
|
|
"historyFile": str(history_path),
|
|
"decidedAt": raw_obj["decided_at"],
|
|
"rawResponse": raw_response,
|
|
"recorded": recorded,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
global DEVICE
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--phone", required=True, help="Phone number to look up (any format)")
|
|
parser.add_argument("--ref", help="Optional requester correlation id (carried into the signal's sourceHandle).")
|
|
parser.add_argument("--dry-run", action="store_true", help="Do lookup + vision + rating but do not record the people-service signal")
|
|
parser.add_argument("--device", default=DEVICE, help="adb serial or host:port (default emulator-5554)")
|
|
parser.add_argument("--dump-ui", action="store_true", help="Dump the current UI hierarchy before actions (calibration)")
|
|
parser.add_argument("--json", action="store_true", help="Emit one JSON result object on stdout (progress to stderr). Used by the MCP.")
|
|
args = parser.parse_args()
|
|
|
|
DEVICE = args.device
|
|
set_json_mode(args.json)
|
|
|
|
try:
|
|
adb(["shell", "echo", "ok"], check=True)
|
|
except Exception as e:
|
|
msg = f"Cannot talk to device via adb on {DEVICE}. Is it connected/authorized? {e}"
|
|
if args.json:
|
|
print(json.dumps({"error": "adb_unavailable", "message": msg}))
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.dump_ui:
|
|
log("[mr-number] --dump-ui requested. Current UI hierarchy:")
|
|
log(get_ui_dump()[:2000] + "\n... (truncated)")
|
|
|
|
import asyncio
|
|
result = asyncio.run(main_async(args.phone, args.ref, args.dry_run, args.dump_ui))
|
|
if args.json:
|
|
print(json.dumps(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|