feat(box): OCR extraction + GPU (OpenAI-compatible) rating backend, env-selectable

Wire the on-box (Claude-API-less) path decided with the operator: EXTRACT_BACKEND=ocr
sends each screenshot to the on-box mrnumber-ocr service (raw text, no per-shot
structuring); build_rating_profile uses an OpenAI-compatible LLM on a DO GPU droplet
(RATING_LLM_URL) which extracts the reports from the raw OCR text AND produces the
multi-axis verdict. Reports are folded back into the history so the people-signal +
counts + safety flags reflect them; safety detection also scans the raw OCR lines so a
LE term forces cop_flag even before structuring. vision/Claude stays the plum-dev
default. +5 tests incl. full OCR→GPU→cop_flag flow. 33/33.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Natalie 2026-06-30 00:39:06 -04:00
parent 9029f3789c
commit 15e7348413
2 changed files with 173 additions and 20 deletions

View file

@ -41,12 +41,18 @@ from typing import Any
from redroid_client import (
RedroidDevice,
clean_phone as _clean_phone,
extract_backend,
extract_screenshot,
json_mode,
load_sdk,
log,
ocr_extract,
ocr_url,
openai_chat,
people_base_url,
people_service_token,
rating_llm_model,
rating_llm_url,
record_people_signal,
set_json_mode,
)
@ -104,6 +110,14 @@ def _build_vision_prompt(screenshot_path: str, phone: str) -> str:
async def _extract_from_screenshot(screenshot_path: str, phone: str) -> dict[str, Any]:
"""Per-screenshot extraction. With EXTRACT_BACKEND=vision (plum dev) Claude returns a
structured report dict. With EXTRACT_BACKEND=ocr (the box) the on-box tesseract service
returns raw screen text there is no per-shot structuring; the rating LLM does the
extraction + reasoning from the concatenated OCR text downstream."""
if extract_backend() == "ocr":
payload = ocr_extract(str(screenshot_path), base_url=ocr_url())
return {"reports": [], "red_flags": [], "classification": None,
"report_count": None, "raw_ocr": payload.get("text", "")}
return await extract_screenshot(
screenshot_path=str(screenshot_path),
system=MR_NUMBER_SYSTEM,
@ -120,10 +134,14 @@ def merge_reports(extractions: list[dict[str, Any]], phone: str) -> dict[str, An
red_seen: set[str] = set()
classification: str | None = None
declared_count = 0
ocr_chunks: list[str] = []
for ex in extractions:
if not isinstance(ex, dict):
continue
chunk = (ex.get("raw_ocr") or "").strip()
if chunk:
ocr_chunks.append(chunk)
if not classification and ex.get("classification"):
classification = ex.get("classification")
rc = ex.get("report_count")
@ -140,18 +158,24 @@ def merge_reports(extractions: list[dict[str, Any]], phone: str) -> dict[str, An
red_seen.add(key)
red_flags.append(str(f).strip())
ocr_text = "\n".join(ocr_chunks)
# Safety flags are deterministic keyword/regex over text, so for the OCR backend we
# scan the raw OCR lines too (the LE/violence signal must be caught even before the
# rating LLM structures the reports).
safety_inputs = reports + [ln.strip() for ln in ocr_text.splitlines() if ln.strip()]
return {
"phone": phone,
"reports": reports,
"red_flags": red_flags,
"classification": classification,
"ocr_text": ocr_text,
# report_count = the larger of what the app declared vs. how many we captured
"report_count": max(declared_count, len(reports)),
"captured_count": len(reports),
"declared_count": declared_count,
# Critical safety signals, promoted OUT of the flat report/flag lists so a
# human (and the verdict) never has to find them buried in row 7 of 14.
"safety_flags": detect_safety_flags(reports, red_flags),
"safety_flags": detect_safety_flags(safety_inputs, red_flags),
}
@ -295,13 +319,13 @@ def _build_rating_prompt(history: dict[str, Any]) -> str:
"respect": {"score": "0-100", "note": "politeness, respects boundaries, not pushy"},
"safety": {"score": "0-100", "note": "no law-enforcement/violence/coercion signals"},
},
"reports": "array of strings — the verbatim community report texts (extract them from the raw OCR when given raw text)",
"positive_signals": "array of strings — concrete positives found (quote/paraphrase the report)",
"negative_signals": "array of strings — concrete negatives found",
"nuanced_notes": "array of strings — where you read a signal NON-literally (e.g. deposit mentions as positive)",
"summary": "2-3 sentence consolidated profile of this caller",
"recommended_result": "one of: approved, denied, pending, not_found",
}
reports_block = "\n".join(f"- {r}" for r in history.get("reports") or []) or "(no report text captured)"
safety_flags = history.get("safety_flags") or []
safety_block = ""
if safety_flags:
@ -310,38 +334,55 @@ def _build_rating_prompt(history: dict[str, Any]) -> str:
f"PROMOTED CRITICAL SAFETY FLAGS (already detected — score the safety axis "
f"at/near 0 and recommend denied): {promoted}\n\n"
)
# OCR backend gives raw screen text (no pre-structured reports) — ask the model to
# extract the reports first; the vision backend already provides a clean report list.
ocr_text = (history.get("ocr_text") or "").strip()
if not (history.get("reports")) and ocr_text:
content_block = (
"Raw OCR text of the Mr. Number report screen(s) — noisy (UI chrome, partial "
"lines). FIRST extract the genuine community report texts (ignore buttons, "
"headers, nav), then rate:\n"
f"<<<OCR\n{ocr_text}\nOCR>>>\n\n"
)
else:
reports_block = "\n".join(f"- {r}" for r in history.get("reports") or []) or "(no report text captured)"
content_block = f"All captured community reports:\n{reports_block}\n\n"
return (
f"Caller: {history.get('phone')}\n"
f"App classification: {history.get('classification')}\n"
f"Reports the app says exist: {history.get('report_count')} "
f"(captured {history.get('captured_count')})\n\n"
f"All captured community reports:\n{reports_block}\n\n"
f"{content_block}"
f"{safety_block}"
f"Vision-flagged terms: {', '.join(history.get('red_flags') or []) or '(none)'}\n\n"
f"Flagged terms: {', '.join(history.get('red_flags') or []) or '(none)'}\n\n"
"Produce the caller's rating profile. Apply the domain nuance from the system prompt "
"(especially: deposits are a positive signal; law-enforcement signals force denied). "
f"Respond with ONLY one JSON object:\n{json.dumps(schema, indent=2)}"
)
async def build_rating_profile(history: dict[str, Any]) -> dict[str, Any] | None:
"""Consolidate the full report history into a multi-axis rating profile via the SDK."""
if not (history.get("reports")):
def _extract_json(text: str) -> dict[str, Any] | None:
"""Pull the first JSON object out of an LLM response (handles ```json fences / prose)."""
if not text:
return None
ClaudeClient, parse_json_response = load_sdk()
client = ClaudeClient(model=RATING_MODEL, max_concurrent=1)
resp = await client.generate(
system=RATING_SYSTEM,
user=_build_rating_prompt(history),
cwd=str(OUTPUT_DIR),
allowed_tools=[],
)
if not resp:
fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
candidate = fence.group(1) if fence else None
if candidate is None:
start = text.find("{")
end = text.rfind("}")
candidate = text[start:end + 1] if start != -1 and end > start else None
if not candidate:
return None
parsed = parse_json_response(resp)
try:
obj = json.loads(candidate)
return obj if isinstance(obj, dict) else None
except json.JSONDecodeError:
return None
def _normalize_profile(parsed: dict[str, Any] | None) -> dict[str, Any] | None:
if not isinstance(parsed, dict):
return None
# Normalize: ensure score is an int and grade is consistent with it.
score = parsed.get("score")
if isinstance(score, (int, float)):
parsed["score"] = int(score)
@ -350,6 +391,28 @@ async def build_rating_profile(history: dict[str, Any]) -> dict[str, Any] | None
return parsed
async def build_rating_profile(history: dict[str, Any]) -> dict[str, Any] | None:
"""Consolidate the report history into a multi-axis rating profile. Backend is
env-selected: RATING_LLM_URL set an OpenAI-compatible LLM on the DO GPU droplet
(also does report extraction from raw OCR); else the Claude batch SDK (plum dev)."""
if not (history.get("reports") or history.get("ocr_text")):
return None
prompt = _build_rating_prompt(history)
gpu_url = rating_llm_url()
if gpu_url:
content = openai_chat(base_url=gpu_url, model=rating_llm_model(),
system=RATING_SYSTEM, user=prompt)
return _normalize_profile(_extract_json(content))
ClaudeClient, parse_json_response = load_sdk()
client = ClaudeClient(model=RATING_MODEL, max_concurrent=1)
resp = await client.generate(system=RATING_SYSTEM, user=prompt, cwd=str(OUTPUT_DIR), allowed_tools=[])
if not resp:
return None
return _normalize_profile(parse_json_response(resp))
def grade_from_score(score: int | float | None) -> str:
if score is None:
return "?"
@ -686,6 +749,10 @@ async def main_async(phone: str, ref: str | None, dry_run: bool, dump_ui: bool =
ex = await _extract_from_screenshot(str(shot), phone)
extractions.append(ex)
history = merge_reports(extractions, phone)
if history.get("ocr_text") and not history.get("reports"):
log(f"[mr-number] OCR backend: {len(history['ocr_text'])} chars of raw text "
f"from {len(shots)} screenshot(s) — reports extracted during rating.")
else:
log(f"[mr-number] Consolidated {history['captured_count']} unique reports "
f"(app declares {history['declared_count']}).")
@ -700,6 +767,17 @@ async def main_async(phone: str, ref: str | None, dry_run: bool, dump_ui: bool =
log("[mr-number] Building rating profile (consolidation via batch SDK)...")
profile = await build_rating_profile(history)
if profile:
# OCR path: the rating LLM extracted the reports — fold them back so the signal
# record + counts reflect them (and re-run safety detection over the real reports).
if not history.get("reports") and isinstance(profile.get("reports"), list):
history["reports"] = [str(r).strip() for r in profile["reports"] if str(r).strip()]
history["captured_count"] = len(history["reports"])
history["report_count"] = max(history.get("report_count") or 0, len(history["reports"]))
history["safety_flags"] = detect_safety_flags(
history["reports"] + [ln.strip() for ln in (history.get("ocr_text") or "").splitlines() if ln.strip()],
history.get("red_flags") or [],
)
safety_flags = history["safety_flags"]
result = result_from_profile(profile)
log(f"[mr-number] Rating: {profile.get('score')}/100 grade {profile.get('grade')} "
f"→ result '{result}' ({profile.get('summary', '')})")

View file

@ -160,6 +160,43 @@ class TestScreeningSignalValue(unittest.TestCase):
self.assertIsNone(mr_lookup.screening_signal_value("not_found", []))
class TestOcrGpuBackends(unittest.IsolatedAsyncioTestCase):
"""The on-box (Claude-API-less) path: OCR extraction + GPU OpenAI-compatible rating."""
def test_extract_json_variants(self):
self.assertEqual(mr_lookup._extract_json('```json\n{"a": 1}\n```'), {"a": 1})
self.assertEqual(mr_lookup._extract_json('blah {"score": 8} trailing'), {"score": 8})
self.assertIsNone(mr_lookup._extract_json("no json here"))
def test_merge_collects_ocr_and_flags_from_raw_text(self):
# OCR extractions carry raw_ocr (no structured reports). A LE term in the raw text
# must still trip the safety flag before the rating LLM structures anything.
ex = [{"reports": [], "red_flags": [], "raw_ocr": "User reports\nEs policía\nasks for address"}]
h = mr_lookup.merge_reports(ex, "+16315304426")
self.assertEqual(h["reports"], [])
self.assertIn("Es policía", h["ocr_text"])
self.assertTrue(any(f["category"] == "law_enforcement" for f in h["safety_flags"]))
async def test_extract_from_screenshot_uses_ocr_backend(self):
with patch("mr_lookup.extract_backend", return_value="ocr"), \
patch("mr_lookup.ocr_extract", return_value={"ok": True, "text": "raw screen text", "lines": ["raw screen text"]}) as m:
out = await mr_lookup._extract_from_screenshot("/tmp/s.png", "+15551234567")
m.assert_called_once()
self.assertEqual(out["raw_ocr"], "raw screen text")
self.assertEqual(out["reports"], [])
async def test_rating_uses_gpu_when_url_set(self):
gpu_json = '{"score": 8, "recommended_result": "denied", "reports": ["no-show, ghosted"], "axes": {"safety": {"score": 0}}}'
with patch("mr_lookup.rating_llm_url", return_value="http://10.20.0.9:8000"), \
patch("mr_lookup.rating_llm_model", return_value="qwen"), \
patch("mr_lookup.openai_chat", return_value=gpu_json) as chat:
prof = await mr_lookup.build_rating_profile({"ocr_text": "Es policia\nno show", "reports": []})
chat.assert_called_once()
self.assertEqual(prof["score"], 8)
self.assertEqual(prof["grade"], "F") # normalized from score
self.assertEqual(prof["reports"], ["no-show, ghosted"])
class TestFullFlow(unittest.IsolatedAsyncioTestCase):
"""End-to-end device path with the expensive parts mocked."""
@ -260,6 +297,44 @@ class TestFullFlow(unittest.IsolatedAsyncioTestCase):
mock_rate.assert_not_called()
mock_record.assert_not_called()
async def test_box_path_ocr_extract_plus_gpu_rating(self):
# The real on-box flow: OCR backend (no structured reports) → GPU rating extracts
# the reports; a LE term in the OCR text forces cop_flag, folded into the signal.
phone = "+16315304426"
gpu_json = ('{"score": 3, "recommended_result": "denied", '
'"reports": ["Es policia", "no show after booking"], '
'"axes": {"safety": {"score": 0}}, "summary": "LE-adjacent; deny."}')
mock_requests = MagicMock()
mock_post = mock_requests.post
mock_post.return_value.json.return_value = {"id": 7, "status": "created"}
mock_post.return_value.raise_for_status = MagicMock()
with patch("mr_lookup.launch_app"), \
patch("mr_lookup.find_and_tap_text", return_value=True), \
patch("mr_lookup.find_edit_text_and_input", return_value=True), \
patch("mr_lookup.go_to_search", return_value=True), \
patch("mr_lookup.open_report_detail", return_value=True), \
patch("mr_lookup.expand_all_reports", return_value=True), \
patch("mr_lookup.capture_full_history", return_value=[Path("/tmp/s0.png")]), \
patch("mr_lookup.extract_backend", return_value="ocr"), \
patch("mr_lookup.ocr_extract", return_value={"ok": True, "text": "User reports\nEs policia\nno show after booking", "lines": []}), \
patch("mr_lookup.rating_llm_url", return_value="http://10.20.0.9:8000"), \
patch("mr_lookup.rating_llm_model", return_value="qwen"), \
patch("mr_lookup.openai_chat", return_value=gpu_json), \
patch("mr_lookup.save_history", return_value=Path("/tmp/hist.json")), \
patch.dict("sys.modules", {"requests": mock_requests}), \
patch("mr_lookup.PEOPLE_SERVICE_TOKEN", "fake-token"), \
patch("mr_lookup.time.sleep"):
out = await mr_lookup.main_async(phone=phone, ref="prospect-9", dry_run=False)
self.assertEqual(out["result"], "denied") # LE flag forces denied
body = mock_post.call_args[1].get("json", {})
self.assertEqual(body.get("valueText"), "cop_flag") # LE → cop_flag verdict
# GPU-extracted reports folded into the recorded signal.
self.assertIn("no show after booking", body["valueJsonb"]["reports"])
self.assertTrue(any(f["category"] == "law_enforcement" for f in body["valueJsonb"]["safety_flags"]))
class TestEmulatorControl(unittest.TestCase):
"""adb controller in isolation."""