lilith-platform.live/codebase/@features/ad-watch/scripts/classify_photos.py

116 lines
3.9 KiB
Python
Raw Permalink Normal View History

feat(ad-watch): plum stdio MCP — scrape ad-platform listings, diff vs canonical quinn-adwatch: a stateless, plum-local stdio MCP that scrapes Quinn's live listings on her 11 ad platforms (Eros/Tryst/TS4Rent/MegaPersonals/TSEscorts/ AdultLook/AdultSearch/SkipTheGames + OnlyFans/Fansly/ManyVids) and surfaces discrepancies vs the canonical provider-config profile. - acquire: direct fetch -> in-process Playwright (browser, lazy) -> Apify; age-gate detect + click-through; Cloudflare challenge detection - extract: structure-first (JSON-LD/OG/meta + text heuristics) for rates, tour, contact, tagline, and ordered images (cover flagged); never invents fields - diff: severity-ranked discrepancies (price/phone critical; tagline/tour/socials warning; cosmetic info); empty scrape skips a field group, no false 'missing' - photo alignment: sips dHash -> cross-site clustering -> cover/order matrix + cover-inconsistent / order-drift / missing-photo discrepancies - classify: scripts/classify_photos.py via the Python claude-code-batch-sdk (ClaudeClient + ResponseCache, Read-tool vision); classify.ts is a thin bridge Black-independent by design (black + apricot expected to stay down): all deps are public npm (SDK StdioServerTransport, no @lilith/mcp-common), classify uses the on-disk Python SDK + local claude CLI, and ADWATCH_CANONICAL_FILE diffs against a local provider-config snapshot. 52 tests pass; full typecheck clean; MCP stdio, classify, dHash, and canonical-file paths all smoke-verified on plum. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 19:11:33 -04:00
#!/usr/bin/env python3
"""Classify ad-watch photo-cluster representatives via claude-code-batch-sdk.
The batch SDK (``~/Code/@applications/@ml/@packages/@py/claude-code-batch-sdk``)
wraps the ``claude`` CLI no API key, content-addressable disk cache, concurrent
execution. Vision goes through ``generate(cwd=, allowed_tools=["Read"])``: the
model calls the Read tool (which renders images) on the path named in the prompt.
``run_batched`` is *not* used because it does not forward ``allowed_tools``/``cwd``
to ``generate`` so vision needs direct ``generate`` calls, still using the SDK's
``ClaudeClient`` (concurrency semaphore) and ``ResponseCache`` (per-photo dedup).
IO contract (JSON):
stdin: {"photos":[{"id","path","sha256"?}], "imageRoot":str,
"model"?:str, "cacheDir"?:str}
stdout: [{"photoId","category","thumbnailFitness","faceVisible","note"} | {"photoId","error"}]
"""
from __future__ import annotations
import asyncio
import json
import os
import sys
from pathlib import Path
from typing import Any
_SDK_SRC = os.environ.get(
"CLAUDE_CODE_BATCH_SDK_PATH",
str(Path.home() / "Code/@applications/@ml/@packages/@py/claude-code-batch-sdk/src"),
)
if _SDK_SRC not in sys.path:
sys.path.insert(0, _SDK_SRC)
from claude_code_batch_sdk import ClaudeClient, ResponseCache, parse_json_response # noqa: E402
CATEGORIES = ["glamour", "casual", "suggestive", "headshot", "lifestyle", "portrait"]
TEMPLATE_ID = "adwatch-photo-class"
SYSTEM = (
"You classify an adult-content creator's advertising photos for cross-platform "
"consistency. Respond ONLY with a single JSON object, no markdown fences."
)
def _build_prompt(path: str) -> str:
schema = {
"category": "EXACTLY ONE of: " + ", ".join(CATEGORIES),
"thumbnailFitness": "number 0..1 — suitability as a profile cover/thumbnail",
"faceVisible": "boolean — is the face clearly visible",
"note": "string — one short phrase on what the photo shows",
}
return (
f"Read the image file at: {path}\n\n"
"Classify the photo. Respond with ONLY one JSON object (category is a single "
"string, not a list):\n"
f"{json.dumps(schema, indent=2)}"
)
async def _classify_one(
client: ClaudeClient,
cache: ResponseCache | None,
image_root: str,
photo: dict[str, Any],
) -> dict[str, Any]:
photo_id = str(photo["id"])
path = str(photo["path"])
cache_key = str(photo.get("sha256") or path)
if cache is not None:
hit = cache.get(TEMPLATE_ID, cache_key)
if hit is not None:
return {**hit, "photoId": photo_id}
resp = await client.generate(
system=SYSTEM,
user=_build_prompt(path),
cwd=image_root,
allowed_tools=["Read"],
)
if not resp:
return {"photoId": photo_id, "error": "empty or failed CLI response"}
parsed = parse_json_response(resp)
if not isinstance(parsed, dict):
return {"photoId": photo_id, "error": "non-JSON reply", "raw": resp[:160]}
if cache is not None:
cache.put(TEMPLATE_ID, cache_key, parsed)
return {**parsed, "photoId": photo_id}
async def _main() -> int:
req = json.load(sys.stdin)
photos: list[dict[str, Any]] = req.get("photos", [])
if not photos:
json.dump([], sys.stdout)
return 0
image_root = req.get("imageRoot") or os.path.commonpath([str(p["path"]) for p in photos])
model = req.get("model", "haiku")
cache_dir = req.get("cacheDir")
max_concurrent = int(req.get("maxConcurrent", 4))
client = ClaudeClient(model=model, max_concurrent=max_concurrent)
cache = ResponseCache(Path(cache_dir)) if cache_dir else None
results = await asyncio.gather(
*(_classify_one(client, cache, image_root, p) for p in photos)
)
json.dump(list(results), sys.stdout)
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(_main()))