Replace the brittle keyword verdict with an LLM-consolidated rating profile per caller, and capture the COMPLETE report history instead of the first screen. - open_report_detail(): land on the caller detail page (taps the Recent-lookups row when the number was searched before) — fixes the 0-reports regression - expand_all_reports() + capture_full_history(): tap "View all N", scroll-capture every page until the UI dump stops changing; merge_reports() dedupes across pages - build_rating_profile() (batch SDK, sonnet): 0-100 score + A–F grade + per-axis sub-scores (reliability/payment/respect/safety) + signals + nuanced_notes. Domain nuance: deposit mentions weight POSITIVE; law-enforcement forces denied - result_from_profile(): honors recommendation, score fallback, hard safety override - decide_result(): kept as deterministic fallback, fixed to never approve over a model 'denied' / red flag and to match punctuation variants (no-show == no show) - save_history(): persist full consolidated history + profile per caller - tests: 18/18 (mapping, dedupe, safety override, full flow); DESIGN.md updated Verified live against the redroid droplet (45.55.191.82): 15166687821 → 3 reports consolidated → 18/100 grade F → denied, with multi-axis breakdown. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
708 lines
29 KiB
Python
Executable file
708 lines
29 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
mr-number-lookup
|
|
|
|
Drive an Android device (USB phone or the redroid droplet) running the Mr. Number
|
|
app (com.mrnumber.blocker), perform a phone lookup, expand + scroll-capture the
|
|
*full* community-report history, vision-extract every report, consolidate them with
|
|
the lilith claude-code-batch-sdk into a multi-axis **rating profile** (0-100 + letter
|
|
grade) for the caller, decide a screening result, save the full history, and record
|
|
it through the existing mr-number screening service (so it feeds reputation events +
|
|
all client filters).
|
|
|
|
Usage:
|
|
python3 mr_lookup.py --phone "+15551234567" --client-id 12345 [--dry-run]
|
|
|
|
Requires:
|
|
- adb in PATH; a device connected (USB serial, or `adb connect <host>:5555` for redroid)
|
|
with the paid Mr. Number app installed + signed in.
|
|
- QUINN_MY_URL + QUINN_MY_SERVICE_TOKEN in env (for recording).
|
|
- The claude batch SDK on disk (for vision + rating consolidation).
|
|
|
|
The manual path in quinn.my (Screening tab) remains the fallback / review surface.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
# requests is only needed for the final recording step (guarded import so unit tests
|
|
# can run in environments without it; the device path itself is fully testable).
|
|
|
|
# --- Vision SDK (exact same pattern as codebase/@features/ad-watch/scripts/classify_photos.py)
|
|
_SDK_SRC = os.environ.get(
|
|
"CLAUDE_CODE_BATCH_SDK_PATH",
|
|
str(Path.home() / "Code/@applications/@ml/@packages/@py/claude-code-batch-sdk/src"),
|
|
)
|
|
if _SDK_SRC not in sys.path:
|
|
sys.path.insert(0, _SDK_SRC)
|
|
|
|
try:
|
|
from claude_code_batch_sdk import ClaudeClient, parse_json_response # noqa: E402
|
|
except ImportError:
|
|
print("ERROR: claude-code-batch-sdk not found. Set CLAUDE_CODE_BATCH_SDK_PATH or clone it to the expected location.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# --- Config / env
|
|
QUINN_MY_URL = (os.environ.get("QUINN_MY_URL") or "https://my.transquinnftw.com").rstrip("/")
|
|
QUINN_MY_SERVICE_TOKEN = os.environ.get("QUINN_MY_SERVICE_TOKEN", "")
|
|
DEVICE = os.environ.get("MR_NUMBER_DEVICE", "emulator-5554")
|
|
PACKAGE = "com.mrnumber.blocker"
|
|
OUTPUT_DIR = Path(__file__).parent / "output"
|
|
HISTORY_DIR = OUTPUT_DIR / "history"
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
HISTORY_DIR.mkdir(exist_ok=True)
|
|
|
|
# Vision = fast/cheap text-from-image. Rating = reasoning over the consolidated
|
|
# history, so it defaults to a stronger model (override via env).
|
|
VISION_MODEL = os.environ.get("MR_NUMBER_VISION_MODEL", "haiku")
|
|
RATING_MODEL = os.environ.get("MR_NUMBER_RATING_MODEL", "sonnet")
|
|
MAX_SCROLL_CAPTURES = int(os.environ.get("MR_NUMBER_MAX_SCROLLS", "10"))
|
|
|
|
# --json mode: progress goes to stderr, a single result JSON object goes to stdout.
|
|
# Lets the mr-number MCP (mcp/index.ts) drive the lookup and consume a clean result.
|
|
JSON_MODE = False
|
|
|
|
|
|
def log(*args: Any) -> None:
|
|
"""Progress line — stderr in --json mode so stdout stays a clean JSON object."""
|
|
print(*args, file=sys.stderr if JSON_MODE else sys.stdout)
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Vision extraction (per screenshot)
|
|
# ----------------------------------------------------------------------------
|
|
MR_NUMBER_SYSTEM = (
|
|
"You are looking at a screenshot from the Mr. Number (caller ID + community reports) Android app. "
|
|
"Extract the information shown for the looked-up phone number. Respond ONLY with a single JSON object, no markdown."
|
|
)
|
|
|
|
|
|
def _build_vision_prompt(screenshot_path: str, phone: str) -> str:
|
|
schema = {
|
|
"phone": "the exact phone number that was searched (string)",
|
|
"report_count": "integer or null — the total number of reports the app says exist (e.g. 'View all 7 reports' -> 7), not just visible",
|
|
"reports": "array of strings — every report/comment text VISIBLE in this screenshot, verbatim (the valuable paid content)",
|
|
"classification": "string or null — the label at the top (e.g. 'Personal Line', 'Business', 'Suspected Spam')",
|
|
"red_flags": "array of strings — negative signals mentioned (no-show, ghosting, rude, cop/law-enforcement, timewaster, boundary issues, etc.)",
|
|
"summary": "short one-sentence impression from the reports visible here",
|
|
"suggested_result": "one of: approved, denied, not_found — your best guess from what's visible",
|
|
}
|
|
return (
|
|
f"Read the image file at: {screenshot_path}\n\n"
|
|
f"This is a screenshot after looking up {phone} in the Mr. Number app.\n"
|
|
"Extract the community reports and any top-level caller info VISIBLE in this image. "
|
|
"Transcribe report text verbatim — do not paraphrase. "
|
|
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
|
|
)
|
|
|
|
|
|
async def _extract_from_screenshot(screenshot_path: str, phone: str) -> dict[str, Any]:
|
|
client = ClaudeClient(model=VISION_MODEL, max_concurrent=1)
|
|
prompt = _build_vision_prompt(str(screenshot_path), phone)
|
|
|
|
resp = await client.generate(
|
|
system=MR_NUMBER_SYSTEM,
|
|
user=prompt,
|
|
cwd=str(Path(screenshot_path).parent),
|
|
allowed_tools=["Read"],
|
|
)
|
|
if not resp:
|
|
return {"error": "empty vision response"}
|
|
|
|
parsed = parse_json_response(resp)
|
|
if not isinstance(parsed, dict):
|
|
return {"error": "non-JSON vision reply", "raw": (resp or "")[:300]}
|
|
|
|
return parsed
|
|
|
|
|
|
def merge_reports(extractions: list[dict[str, Any]], phone: str) -> dict[str, Any]:
|
|
"""Consolidate per-screenshot extractions into one deduped report history."""
|
|
reports: list[str] = []
|
|
seen: set[str] = set()
|
|
red_flags: list[str] = []
|
|
red_seen: set[str] = set()
|
|
classification: str | None = None
|
|
declared_count = 0
|
|
|
|
for ex in extractions:
|
|
if not isinstance(ex, dict):
|
|
continue
|
|
if not classification and ex.get("classification"):
|
|
classification = ex.get("classification")
|
|
rc = ex.get("report_count")
|
|
if isinstance(rc, int):
|
|
declared_count = max(declared_count, rc)
|
|
for r in ex.get("reports") or []:
|
|
key = re.sub(r"\s+", " ", str(r).strip().lower())
|
|
if key and key not in seen:
|
|
seen.add(key)
|
|
reports.append(str(r).strip())
|
|
for f in ex.get("red_flags") or []:
|
|
key = re.sub(r"\s+", " ", str(f).strip().lower())
|
|
if key and key not in red_seen:
|
|
red_seen.add(key)
|
|
red_flags.append(str(f).strip())
|
|
|
|
return {
|
|
"phone": phone,
|
|
"reports": reports,
|
|
"red_flags": red_flags,
|
|
"classification": classification,
|
|
# report_count = the larger of what the app declared vs. how many we captured
|
|
"report_count": max(declared_count, len(reports)),
|
|
"captured_count": len(reports),
|
|
"declared_count": declared_count,
|
|
}
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Rating profile (consolidation via the batch SDK)
|
|
# ----------------------------------------------------------------------------
|
|
RATING_SYSTEM = (
|
|
"You are a trust-and-safety analyst for an independent adult-industry provider (legal, "
|
|
"regulated). You read crowdsourced caller reports from Mr. Number and produce a structured "
|
|
"rating profile for the caller — how safe and worthwhile they are as a potential client. "
|
|
"Respond ONLY with a single JSON object, no markdown.\n\n"
|
|
"DOMAIN NUANCE — read signals like an insider, not literally:\n"
|
|
"- DEPOSITS ARE GOOD. A report mentioning the caller 'paid a deposit', 'sent a deposit', "
|
|
"'offered/asked to send a deposit', or 'always deposits' is a STRONG POSITIVE — deposit-payers "
|
|
"are serious, vetted, low-risk clients. Weight this heavily toward A/B. Only 'refused/won't pay "
|
|
"a deposit' or 'chargeback' is negative.\n"
|
|
"- 'Get a deposit' / 'make him deposit' written as advice from another provider means the caller "
|
|
"is known to follow through once a deposit is taken — treat as a manageable/positive signal, NOT a red flag.\n"
|
|
"- RELIABILITY: no-show, ghosting, flaking, cancelling last-minute → negative.\n"
|
|
"- SAFETY (critical): law enforcement / cop / sting / 'asks weird LE questions', violence, coercion, "
|
|
"robbery, attempts to remove agency → severe negative; if present, recommend denied regardless of other axes.\n"
|
|
"- RESPECT: rude, pushy, haggling, boundary-pushing → negative.\n"
|
|
"- MIXED REVIEWS: when reports conflict, do NOT average blindly — score each axis on its own evidence "
|
|
"and explain the split.\n\n"
|
|
"SCORING: 0-100 overall (higher = safer/better client). Grade A>=85, B 70-84, C 55-69, D 40-54, F<40."
|
|
)
|
|
|
|
|
|
def _build_rating_prompt(history: dict[str, Any]) -> str:
|
|
schema = {
|
|
"score": "integer 0-100 — overall safety/desirability as a client",
|
|
"grade": "one of A,B,C,D,F (A>=85, B 70-84, C 55-69, D 40-54, F<40)",
|
|
"is_mixed": "boolean — true if the reports conflict / are genuinely mixed",
|
|
"axes": {
|
|
"reliability": {"score": "0-100", "note": "shows up vs no-shows/ghosting/flaking"},
|
|
"payment": {"score": "0-100", "note": "deposits (GOOD), pays agreed rate, no haggling/chargebacks"},
|
|
"respect": {"score": "0-100", "note": "politeness, respects boundaries, not pushy"},
|
|
"safety": {"score": "0-100", "note": "no law-enforcement/violence/coercion signals"},
|
|
},
|
|
"positive_signals": "array of strings — concrete positives found (quote/paraphrase the report)",
|
|
"negative_signals": "array of strings — concrete negatives found",
|
|
"nuanced_notes": "array of strings — where you read a signal NON-literally (e.g. deposit mentions as positive)",
|
|
"summary": "2-3 sentence consolidated profile of this caller",
|
|
"recommended_result": "one of: approved, denied, pending, not_found",
|
|
}
|
|
reports_block = "\n".join(f"- {r}" for r in history.get("reports") or []) or "(no report text captured)"
|
|
return (
|
|
f"Caller: {history.get('phone')}\n"
|
|
f"App classification: {history.get('classification')}\n"
|
|
f"Reports the app says exist: {history.get('report_count')} "
|
|
f"(captured {history.get('captured_count')})\n\n"
|
|
f"All captured community reports:\n{reports_block}\n\n"
|
|
f"Vision-flagged terms: {', '.join(history.get('red_flags') or []) or '(none)'}\n\n"
|
|
"Produce the caller's rating profile. Apply the domain nuance from the system prompt "
|
|
"(especially: deposits are a positive signal; law-enforcement signals force denied). "
|
|
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
|
|
)
|
|
|
|
|
|
async def build_rating_profile(history: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""Consolidate the full report history into a multi-axis rating profile via the SDK."""
|
|
if not (history.get("reports")):
|
|
return None
|
|
client = ClaudeClient(model=RATING_MODEL, max_concurrent=1)
|
|
resp = await client.generate(
|
|
system=RATING_SYSTEM,
|
|
user=_build_rating_prompt(history),
|
|
cwd=str(OUTPUT_DIR),
|
|
allowed_tools=[],
|
|
)
|
|
if not resp:
|
|
return None
|
|
parsed = parse_json_response(resp)
|
|
if not isinstance(parsed, dict):
|
|
return None
|
|
# Normalize: ensure score is an int and grade is consistent with it.
|
|
score = parsed.get("score")
|
|
if isinstance(score, (int, float)):
|
|
parsed["score"] = int(score)
|
|
if not parsed.get("grade"):
|
|
parsed["grade"] = grade_from_score(parsed["score"])
|
|
return parsed
|
|
|
|
|
|
def grade_from_score(score: int | float | None) -> str:
|
|
if score is None:
|
|
return "?"
|
|
if score >= 85:
|
|
return "A"
|
|
if score >= 70:
|
|
return "B"
|
|
if score >= 55:
|
|
return "C"
|
|
if score >= 40:
|
|
return "D"
|
|
return "F"
|
|
|
|
|
|
def result_from_score(score: int | float | None) -> str:
|
|
if score is None:
|
|
return "pending"
|
|
if score >= 70:
|
|
return "approved"
|
|
if score < 45:
|
|
return "denied"
|
|
return "pending"
|
|
|
|
|
|
def result_from_profile(profile: dict[str, Any] | None) -> str:
|
|
"""Map the rating profile to a screening result enum, with a hard safety override."""
|
|
if not profile:
|
|
return "pending"
|
|
axes = profile.get("axes") or {}
|
|
safety = axes.get("safety") or {}
|
|
s_score = safety.get("score")
|
|
if isinstance(s_score, (int, float)) and s_score < 30:
|
|
return "denied" # law-enforcement/violence signal overrides everything
|
|
rec = profile.get("recommended_result")
|
|
if rec in ("approved", "denied", "pending", "not_found"):
|
|
return rec
|
|
return result_from_score(profile.get("score"))
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# adb device control (class for testability)
|
|
# ----------------------------------------------------------------------------
|
|
class MrNumberEmulator:
|
|
"""Encapsulates adb interactions with the Mr. Number app. Fully unit-testable by
|
|
monkey-patching, without a real Android device/emulator."""
|
|
|
|
def __init__(self, device: str | None = None, package: str | None = None):
|
|
self.device = device or DEVICE
|
|
self.package = package or PACKAGE
|
|
|
|
def adb(self, args: list[str], check: bool = True) -> str:
|
|
cmd = ["adb", "-s", self.device] + args
|
|
try:
|
|
return subprocess.check_output(cmd, text=True, stderr=subprocess.STDOUT)
|
|
except subprocess.CalledProcessError as e:
|
|
if check:
|
|
raise
|
|
return e.output or ""
|
|
|
|
def adb_tap(self, x: int, y: int) -> None:
|
|
self.adb(["shell", "input", "tap", str(x), str(y)])
|
|
|
|
def adb_text(self, text: str) -> None:
|
|
safe = text.replace(" ", "%s")
|
|
self.adb(["shell", "input", "text", safe])
|
|
|
|
def adb_keyevent(self, code: int) -> None:
|
|
self.adb(["shell", "input", "keyevent", str(code)])
|
|
|
|
def adb_swipe(self, x1: int, y1: int, x2: int, y2: int, ms: int = 400) -> None:
|
|
self.adb(["shell", "input", "swipe", str(x1), str(y1), str(x2), str(y2), str(ms)])
|
|
|
|
def screen_size(self) -> tuple[int, int]:
|
|
out = self.adb(["shell", "wm", "size"], check=False)
|
|
m = re.search(r"(\d+)x(\d+)", out or "")
|
|
if m:
|
|
return int(m.group(1)), int(m.group(2))
|
|
return 720, 1280
|
|
|
|
def get_ui_dump(self) -> str:
|
|
self.adb(["shell", "uiautomator", "dump", "/sdcard/mr_ui.xml"])
|
|
self.adb(["pull", "/sdcard/mr_ui.xml", "/tmp/mr_ui.xml"])
|
|
return Path("/tmp/mr_ui.xml").read_text()
|
|
|
|
def parse_bounds(self, bounds: str) -> tuple[int, int, int, int]:
|
|
b = bounds.strip("[]").replace("][", ",").split(",")
|
|
return int(b[0]), int(b[1]), int(b[2]), int(b[3])
|
|
|
|
def find_and_tap_text(self, target_texts: list[str]) -> bool:
|
|
try:
|
|
dump = self.get_ui_dump()
|
|
root = ET.fromstring(dump)
|
|
for node in root.iter("node"):
|
|
text = ((node.get("text") or "") + " " + (node.get("content-desc") or "")).lower()
|
|
for t in target_texts:
|
|
if t.lower() in text:
|
|
bounds = node.get("bounds")
|
|
if bounds:
|
|
x1, y1, x2, y2 = self.parse_bounds(bounds)
|
|
self.adb_tap((x1 + x2) // 2, (y1 + y2) // 2)
|
|
time.sleep(0.8)
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def find_edit_text_and_input(self, phone: str) -> bool:
|
|
try:
|
|
dump = self.get_ui_dump()
|
|
root = ET.fromstring(dump)
|
|
for node in root.iter("node"):
|
|
if node.get("class", "").endswith("EditText") or "search" in (node.get("resource-id") or "").lower():
|
|
bounds = node.get("bounds")
|
|
if bounds:
|
|
x1, y1, x2, y2 = self.parse_bounds(bounds)
|
|
self.adb_tap((x1 + x2) // 2, (y1 + y2) // 2)
|
|
time.sleep(0.5)
|
|
self.adb(["shell", "input", "keycombination", "KEYCODE_CTRL_LEFT", "KEYCODE_A"], check=False)
|
|
self.adb(["shell", "input", "keyevent", "67"], check=False)
|
|
self.adb_keyevent(123) # MOVE_END
|
|
for _ in range(20):
|
|
self.adb_keyevent(67) # DEL (backspace)
|
|
time.sleep(0.2)
|
|
self.adb_text(phone)
|
|
time.sleep(0.3)
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def launch_app(self) -> None:
|
|
self.adb(["shell", "monkey", "-p", self.package, "-c", "android.intent.category.LAUNCHER", "1"], check=False)
|
|
time.sleep(2.5)
|
|
|
|
def take_screenshot(self, phone: str, tag: str = "") -> Path:
|
|
ts = int(time.time())
|
|
digits = phone.replace("+", "")
|
|
suffix = f"-{tag}" if tag != "" else ""
|
|
local = OUTPUT_DIR / f"mr-number-{digits}-{ts}{suffix}.png"
|
|
self.adb(["shell", "screencap", "-p", "/sdcard/mr_result.png"])
|
|
self.adb(["pull", "/sdcard/mr_result.png", str(local)])
|
|
return local
|
|
|
|
|
|
# Module-level shims (existing call sites + patchability in tests)
|
|
_emulator: MrNumberEmulator | None = None
|
|
|
|
|
|
def _get_emulator() -> MrNumberEmulator:
|
|
global _emulator
|
|
if _emulator is None:
|
|
_emulator = MrNumberEmulator()
|
|
return _emulator
|
|
|
|
|
|
def adb(args: list[str], check: bool = True) -> str:
|
|
return _get_emulator().adb(args, check)
|
|
|
|
|
|
def adb_text(text: str) -> None:
|
|
_get_emulator().adb_text(text)
|
|
|
|
|
|
def adb_keyevent(code: int) -> None:
|
|
_get_emulator().adb_keyevent(code)
|
|
|
|
|
|
def get_ui_dump() -> str:
|
|
return _get_emulator().get_ui_dump()
|
|
|
|
|
|
def find_and_tap_text(target_texts: list[str]) -> bool:
|
|
return _get_emulator().find_and_tap_text(target_texts)
|
|
|
|
|
|
def find_edit_text_and_input(phone: str) -> bool:
|
|
return _get_emulator().find_edit_text_and_input(phone)
|
|
|
|
|
|
def launch_app() -> None:
|
|
_get_emulator().launch_app()
|
|
|
|
|
|
def take_screenshot(phone: str, tag: str = "") -> Path:
|
|
return _get_emulator().take_screenshot(phone, tag)
|
|
|
|
|
|
_DETAIL_MARKERS = ("recent reports", "report caller", "view all", "block number", "block caller")
|
|
|
|
|
|
def on_report_detail() -> bool:
|
|
"""True if the current screen is a caller's report-detail page (not the home/recent list)."""
|
|
try:
|
|
dump = get_ui_dump().lower()
|
|
except Exception:
|
|
return False
|
|
return any(m in dump for m in _DETAIL_MARKERS)
|
|
|
|
|
|
def open_report_detail(input_phone: str) -> bool:
|
|
"""Ensure we're on the caller's report detail. If we landed on the 'Recent lookups'
|
|
list (e.g. the number was searched before), tap its row to open the detail."""
|
|
if on_report_detail():
|
|
return True
|
|
digits = re.sub(r"\D", "", input_phone)
|
|
nat = digits[-10:] if len(digits) >= 10 else digits
|
|
candidates: list[str] = []
|
|
if len(nat) == 10:
|
|
candidates += [f"({nat[0:3]}) {nat[3:6]}-{nat[6:]}", f"{nat[0:3]}-{nat[3:6]}-{nat[6:]}", f"{nat[3:6]}-{nat[6:]}"]
|
|
candidates.append(digits)
|
|
if find_and_tap_text(candidates):
|
|
time.sleep(3.0)
|
|
return on_report_detail()
|
|
return False
|
|
|
|
|
|
def expand_all_reports() -> bool:
|
|
"""Tap the 'View all N reports' row so the full history is on screen to scroll."""
|
|
return find_and_tap_text(["view all", "see all reports", "view all reports", "all reports", "see all"])
|
|
|
|
|
|
def capture_full_history(phone: str, max_swipes: int = MAX_SCROLL_CAPTURES) -> list[Path]:
|
|
"""Screenshot the reports view, scrolling down until it stops moving (bottom).
|
|
Returns the list of screenshot paths (top → bottom)."""
|
|
emu = _get_emulator()
|
|
w, h = emu.screen_size()
|
|
x, y_from, y_to = w // 2, int(h * 0.78), int(h * 0.28)
|
|
shots = [emu.take_screenshot(phone, tag="0")]
|
|
prev_dump: str | None = None
|
|
for i in range(1, max_swipes + 1):
|
|
emu.adb_swipe(x, y_from, x, y_to, 450)
|
|
time.sleep(0.9)
|
|
try:
|
|
dump = emu.get_ui_dump()
|
|
except Exception:
|
|
dump = None
|
|
if dump is not None and dump == prev_dump:
|
|
break # nothing changed after a swipe = reached the bottom
|
|
prev_dump = dump
|
|
shots.append(emu.take_screenshot(phone, tag=str(i)))
|
|
return shots
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Verdict (deterministic fallback when the SDK profile is unavailable)
|
|
# ----------------------------------------------------------------------------
|
|
_NEG_KEYWORDS = (
|
|
"no show", "no-show", "noshow", "ghost", "flake", "flaked", "stood me up",
|
|
"rude", "aggressive", "harass", "boundary", "pushy", "haggl",
|
|
"cop", "leo", "police", "law enforcement", "sting", "officer",
|
|
"time waster", "timewaster", "timewaste", "scam", "robbery", "violent", "unsafe", "danger",
|
|
"chargeback", "refused deposit", "wouldn't pay", "wont pay",
|
|
)
|
|
|
|
|
|
def _normalize(text: str) -> str:
|
|
return re.sub(r"[^a-z0-9 ]+", " ", text.lower())
|
|
|
|
|
|
def decide_result(extracted: dict[str, Any]) -> str:
|
|
"""Deterministic fallback heuristic (used only if the SDK rating profile fails).
|
|
Fixes the historical bug: it never returns 'approved' over a model 'denied' or a
|
|
red flag, and it matches punctuation-variant phrasing (no-show == no show)."""
|
|
blob = _normalize(" ".join((extracted.get("reports") or []) + (extracted.get("red_flags") or [])))
|
|
suggested = extracted.get("suggested_result")
|
|
negative = any(_normalize(kw) in blob for kw in _NEG_KEYWORDS)
|
|
|
|
if suggested == "denied" or negative:
|
|
return "denied"
|
|
if suggested in ("approved", "denied", "not_found"):
|
|
return suggested
|
|
if extracted.get("report_count"):
|
|
# reports exist but nothing clearly good/bad → human gate, never auto-approve
|
|
return "pending" if not extracted.get("reports") else "approved"
|
|
return "pending"
|
|
|
|
|
|
def clean_phone(p: str) -> str:
|
|
r"""Return only a leading + (if present) followed by digits (^\+?\d+$)."""
|
|
has_plus = p.strip().startswith("+")
|
|
digits = re.sub(r"\D", "", p)
|
|
return ("+" + digits) if has_plus else digits
|
|
|
|
|
|
def save_history(phone: str, history_obj: dict[str, Any]) -> Path:
|
|
"""Persist the full consolidated history + profile to a per-caller JSON file."""
|
|
ts = int(time.time())
|
|
path = HISTORY_DIR / f"{clean_phone(phone).replace('+', '')}-{ts}.json"
|
|
path.write_text(json.dumps(history_obj, indent=2))
|
|
return path
|
|
|
|
|
|
def record_screening(client_id: int, phone: str, result: str, raw: str) -> dict[str, Any]:
|
|
if not QUINN_MY_SERVICE_TOKEN:
|
|
return {"skipped": "no QUINN_MY_SERVICE_TOKEN"}
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
return {"error": "requests not available; cannot record (pip install requests)"}
|
|
|
|
url = f"{QUINN_MY_URL}/api/clients/{client_id}/screening"
|
|
body = {
|
|
"clientId": client_id,
|
|
"service": "mr-number",
|
|
"lookupValue": phone,
|
|
"result": result,
|
|
"rawResponse": raw,
|
|
}
|
|
headers = {"Authorization": f"Bearer {QUINN_MY_SERVICE_TOKEN}"}
|
|
resp = requests.post(url, json=body, headers=headers, timeout=15)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def main_async(phone: str, client_id: int | None, dry_run: bool, dump_ui: bool = False) -> dict[str, Any]:
|
|
log(f"[mr-number] Starting lookup for {phone} on {DEVICE} (client_id={client_id}, dry_run={dry_run})")
|
|
|
|
input_phone = clean_phone(phone)
|
|
if input_phone != phone:
|
|
log(f"[mr-number] Cleaned phone for input: {input_phone} (from {phone})")
|
|
|
|
# 1. Launch + search
|
|
launch_app()
|
|
time.sleep(1.5)
|
|
if dump_ui:
|
|
log("[mr-number] UI dump after launch:")
|
|
log(get_ui_dump()[:1500])
|
|
|
|
if not find_edit_text_and_input(input_phone):
|
|
adb_text(input_phone)
|
|
time.sleep(1.5)
|
|
if not find_and_tap_text([f"look up {input_phone}", "look up"]):
|
|
adb_keyevent(66)
|
|
time.sleep(9.0) # let the paid reports load
|
|
|
|
# 1b. Make sure we're on the caller's report detail (not the recent-lookups list).
|
|
if open_report_detail(input_phone):
|
|
log("[mr-number] On report detail page.")
|
|
else:
|
|
log("[mr-number] WARNING: could not confirm the report detail page; capturing what's shown.")
|
|
|
|
# 2. Expand the full report list, then scroll-capture all of it
|
|
if expand_all_reports():
|
|
log("[mr-number] Expanded full report list ('View all reports').")
|
|
time.sleep(2.0)
|
|
shots = capture_full_history(input_phone)
|
|
log(f"[mr-number] Captured {len(shots)} screenshot(s) of the report history.")
|
|
|
|
# 3. Vision-extract each screenshot, then consolidate + dedupe
|
|
extractions: list[dict[str, Any]] = []
|
|
for shot in shots:
|
|
ex = await _extract_from_screenshot(str(shot), phone)
|
|
extractions.append(ex)
|
|
history = merge_reports(extractions, phone)
|
|
log(f"[mr-number] Consolidated {history['captured_count']} unique reports "
|
|
f"(app declares {history['declared_count']}).")
|
|
|
|
# 4. Build the multi-axis rating profile via the batch SDK
|
|
log("[mr-number] Building rating profile (consolidation via batch SDK)...")
|
|
profile = await build_rating_profile(history)
|
|
if profile:
|
|
result = result_from_profile(profile)
|
|
log(f"[mr-number] Rating: {profile.get('score')}/100 grade {profile.get('grade')} "
|
|
f"→ result '{result}' ({profile.get('summary', '')})")
|
|
else:
|
|
result = decide_result(history)
|
|
log(f"[mr-number] Rating profile unavailable; fallback heuristic → '{result}'")
|
|
|
|
# 5. Save full history + profile, build the raw record
|
|
raw_obj = {
|
|
"source": "mr-number",
|
|
"phone": phone,
|
|
"classification": history.get("classification"),
|
|
"reports": history.get("reports"),
|
|
"red_flags": history.get("red_flags"),
|
|
"report_count": history.get("report_count"),
|
|
"captured_count": history.get("captured_count"),
|
|
"rating_profile": profile,
|
|
"result": result,
|
|
"screenshots": [str(s) for s in shots],
|
|
"decided_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
}
|
|
history_path = save_history(phone, raw_obj)
|
|
log(f"[mr-number] Saved full history → {history_path}")
|
|
raw_response = json.dumps(raw_obj, indent=2)
|
|
|
|
# 6. Record (if we have everything)
|
|
recorded: dict[str, Any] | None
|
|
if client_id and not dry_run:
|
|
try:
|
|
recorded = record_screening(client_id, phone, result, raw_response)
|
|
log("[mr-number] Recorded screening check:", recorded)
|
|
except Exception as e:
|
|
recorded = {"error": str(e)}
|
|
log(f"[mr-number] Recording failed: {e}")
|
|
log("You can still paste the raw data manually in the quinn.my Screening tab.")
|
|
else:
|
|
recorded = {"skipped": "dry_run" if dry_run else "no_client_id"}
|
|
log("[mr-number] Dry run or missing client_id — not recording.")
|
|
if not JSON_MODE:
|
|
log("Raw record (paste into quinn.my Screening tab if recording manually):")
|
|
log(raw_response)
|
|
|
|
log("[mr-number] Done.")
|
|
|
|
return {
|
|
"phone": phone,
|
|
"inputPhone": input_phone,
|
|
"result": result,
|
|
"score": (profile or {}).get("score"),
|
|
"grade": (profile or {}).get("grade"),
|
|
"ratingProfile": profile,
|
|
"reports": history.get("reports"),
|
|
"classification": history.get("classification"),
|
|
"reportCount": history.get("report_count"),
|
|
"capturedCount": history.get("captured_count"),
|
|
"screenshots": [str(s) for s in shots],
|
|
"historyFile": str(history_path),
|
|
"decidedAt": raw_obj["decided_at"],
|
|
"rawResponse": raw_response,
|
|
"recorded": recorded,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
global DEVICE, JSON_MODE
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--phone", required=True, help="Phone number to look up (any format)")
|
|
parser.add_argument("--client-id", type=int, help="quinn client id (from /clients/12345 URL). Required to auto-record.")
|
|
parser.add_argument("--dry-run", action="store_true", help="Do lookup + vision + rating but do not POST the screening record")
|
|
parser.add_argument("--device", default=DEVICE, help="adb serial or host:port (default emulator-5554)")
|
|
parser.add_argument("--dump-ui", action="store_true", help="Dump the current UI hierarchy before actions (calibration)")
|
|
parser.add_argument("--json", action="store_true", help="Emit one JSON result object on stdout (progress to stderr). Used by the MCP.")
|
|
args = parser.parse_args()
|
|
|
|
DEVICE = args.device
|
|
JSON_MODE = args.json
|
|
|
|
try:
|
|
adb(["shell", "echo", "ok"], check=True)
|
|
except Exception as e:
|
|
msg = f"Cannot talk to device via adb on {DEVICE}. Is it connected/authorized? {e}"
|
|
if JSON_MODE:
|
|
print(json.dumps({"error": "adb_unavailable", "message": msg}))
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.dump_ui:
|
|
log("[mr-number] --dump-ui requested. Current UI hierarchy:")
|
|
log(get_ui_dump()[:2000] + "\n... (truncated)")
|
|
|
|
import asyncio
|
|
result = asyncio.run(main_async(args.phone, args.client_id, args.dry_run, args.dump_ui))
|
|
if JSON_MODE:
|
|
print(json.dumps(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|