#!/usr/bin/env python3 """Classify ad-watch photo-cluster representatives via claude-code-batch-sdk. The batch SDK (``~/Code/@applications/@ml/@packages/@py/claude-code-batch-sdk``) wraps the ``claude`` CLI — no API key, content-addressable disk cache, concurrent execution. Vision goes through ``generate(cwd=…, allowed_tools=["Read"])``: the model calls the Read tool (which renders images) on the path named in the prompt. ``run_batched`` is *not* used because it does not forward ``allowed_tools``/``cwd`` to ``generate`` — so vision needs direct ``generate`` calls, still using the SDK's ``ClaudeClient`` (concurrency semaphore) and ``ResponseCache`` (per-photo dedup). IO contract (JSON): stdin: {"photos":[{"id","path","sha256"?}], "imageRoot":str, "model"?:str, "cacheDir"?:str} stdout: [{"photoId","category","thumbnailFitness","faceVisible","note"} | {"photoId","error"}] """ from __future__ import annotations import asyncio import json import os import sys from pathlib import Path from typing import Any _SDK_SRC = os.environ.get( "CLAUDE_CODE_BATCH_SDK_PATH", str(Path.home() / "Code/@applications/@ml/@packages/@py/claude-code-batch-sdk/src"), ) if _SDK_SRC not in sys.path: sys.path.insert(0, _SDK_SRC) from claude_code_batch_sdk import ClaudeClient, ResponseCache, parse_json_response # noqa: E402 CATEGORIES = ["glamour", "casual", "suggestive", "headshot", "lifestyle", "portrait"] TEMPLATE_ID = "adwatch-photo-class" SYSTEM = ( "You classify an adult-content creator's advertising photos for cross-platform " "consistency. Respond ONLY with a single JSON object, no markdown fences." ) def _build_prompt(path: str) -> str: schema = { "category": "EXACTLY ONE of: " + ", ".join(CATEGORIES), "thumbnailFitness": "number 0..1 — suitability as a profile cover/thumbnail", "faceVisible": "boolean — is the face clearly visible", "note": "string — one short phrase on what the photo shows", } return ( f"Read the image file at: {path}\n\n" "Classify the photo. Respond with ONLY one JSON object (category is a single " "string, not a list):\n" f"{json.dumps(schema, indent=2)}" ) async def _classify_one( client: ClaudeClient, cache: ResponseCache | None, image_root: str, photo: dict[str, Any], ) -> dict[str, Any]: photo_id = str(photo["id"]) path = str(photo["path"]) cache_key = str(photo.get("sha256") or path) if cache is not None: hit = cache.get(TEMPLATE_ID, cache_key) if hit is not None: return {**hit, "photoId": photo_id} resp = await client.generate( system=SYSTEM, user=_build_prompt(path), cwd=image_root, allowed_tools=["Read"], ) if not resp: return {"photoId": photo_id, "error": "empty or failed CLI response"} parsed = parse_json_response(resp) if not isinstance(parsed, dict): return {"photoId": photo_id, "error": "non-JSON reply", "raw": resp[:160]} if cache is not None: cache.put(TEMPLATE_ID, cache_key, parsed) return {**parsed, "photoId": photo_id} async def _main() -> int: req = json.load(sys.stdin) photos: list[dict[str, Any]] = req.get("photos", []) if not photos: json.dump([], sys.stdout) return 0 image_root = req.get("imageRoot") or os.path.commonpath([str(p["path"]) for p in photos]) model = req.get("model", "haiku") cache_dir = req.get("cacheDir") max_concurrent = int(req.get("maxConcurrent", 4)) client = ClaudeClient(model=model, max_concurrent=max_concurrent) cache = ResponseCache(Path(cache_dir)) if cache_dir else None results = await asyncio.gather( *(_classify_one(client, cache, image_root, p) for p in photos) ) json.dump(list(results), sys.stdout) return 0 if __name__ == "__main__": sys.exit(asyncio.run(_main()))