116 lines
3.9 KiB
Python
116 lines
3.9 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Classify ad-watch photo-cluster representatives via claude-code-batch-sdk.
|
||
|
|
|
||
|
|
The batch SDK (``~/Code/@applications/@ml/@packages/@py/claude-code-batch-sdk``)
|
||
|
|
wraps the ``claude`` CLI — no API key, content-addressable disk cache, concurrent
|
||
|
|
execution. Vision goes through ``generate(cwd=…, allowed_tools=["Read"])``: the
|
||
|
|
model calls the Read tool (which renders images) on the path named in the prompt.
|
||
|
|
``run_batched`` is *not* used because it does not forward ``allowed_tools``/``cwd``
|
||
|
|
to ``generate`` — so vision needs direct ``generate`` calls, still using the SDK's
|
||
|
|
``ClaudeClient`` (concurrency semaphore) and ``ResponseCache`` (per-photo dedup).
|
||
|
|
|
||
|
|
IO contract (JSON):
|
||
|
|
stdin: {"photos":[{"id","path","sha256"?}], "imageRoot":str,
|
||
|
|
"model"?:str, "cacheDir"?:str}
|
||
|
|
stdout: [{"photoId","category","thumbnailFitness","faceVisible","note"} | {"photoId","error"}]
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
_SDK_SRC = os.environ.get(
|
||
|
|
"CLAUDE_CODE_BATCH_SDK_PATH",
|
||
|
|
str(Path.home() / "Code/@applications/@ml/@packages/@py/claude-code-batch-sdk/src"),
|
||
|
|
)
|
||
|
|
if _SDK_SRC not in sys.path:
|
||
|
|
sys.path.insert(0, _SDK_SRC)
|
||
|
|
|
||
|
|
from claude_code_batch_sdk import ClaudeClient, ResponseCache, parse_json_response # noqa: E402
|
||
|
|
|
||
|
|
CATEGORIES = ["glamour", "casual", "suggestive", "headshot", "lifestyle", "portrait"]
|
||
|
|
TEMPLATE_ID = "adwatch-photo-class"
|
||
|
|
SYSTEM = (
|
||
|
|
"You classify an adult-content creator's advertising photos for cross-platform "
|
||
|
|
"consistency. Respond ONLY with a single JSON object, no markdown fences."
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _build_prompt(path: str) -> str:
|
||
|
|
schema = {
|
||
|
|
"category": "EXACTLY ONE of: " + ", ".join(CATEGORIES),
|
||
|
|
"thumbnailFitness": "number 0..1 — suitability as a profile cover/thumbnail",
|
||
|
|
"faceVisible": "boolean — is the face clearly visible",
|
||
|
|
"note": "string — one short phrase on what the photo shows",
|
||
|
|
}
|
||
|
|
return (
|
||
|
|
f"Read the image file at: {path}\n\n"
|
||
|
|
"Classify the photo. Respond with ONLY one JSON object (category is a single "
|
||
|
|
"string, not a list):\n"
|
||
|
|
f"{json.dumps(schema, indent=2)}"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
async def _classify_one(
|
||
|
|
client: ClaudeClient,
|
||
|
|
cache: ResponseCache | None,
|
||
|
|
image_root: str,
|
||
|
|
photo: dict[str, Any],
|
||
|
|
) -> dict[str, Any]:
|
||
|
|
photo_id = str(photo["id"])
|
||
|
|
path = str(photo["path"])
|
||
|
|
cache_key = str(photo.get("sha256") or path)
|
||
|
|
|
||
|
|
if cache is not None:
|
||
|
|
hit = cache.get(TEMPLATE_ID, cache_key)
|
||
|
|
if hit is not None:
|
||
|
|
return {**hit, "photoId": photo_id}
|
||
|
|
|
||
|
|
resp = await client.generate(
|
||
|
|
system=SYSTEM,
|
||
|
|
user=_build_prompt(path),
|
||
|
|
cwd=image_root,
|
||
|
|
allowed_tools=["Read"],
|
||
|
|
)
|
||
|
|
if not resp:
|
||
|
|
return {"photoId": photo_id, "error": "empty or failed CLI response"}
|
||
|
|
|
||
|
|
parsed = parse_json_response(resp)
|
||
|
|
if not isinstance(parsed, dict):
|
||
|
|
return {"photoId": photo_id, "error": "non-JSON reply", "raw": resp[:160]}
|
||
|
|
|
||
|
|
if cache is not None:
|
||
|
|
cache.put(TEMPLATE_ID, cache_key, parsed)
|
||
|
|
return {**parsed, "photoId": photo_id}
|
||
|
|
|
||
|
|
|
||
|
|
async def _main() -> int:
|
||
|
|
req = json.load(sys.stdin)
|
||
|
|
photos: list[dict[str, Any]] = req.get("photos", [])
|
||
|
|
if not photos:
|
||
|
|
json.dump([], sys.stdout)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
image_root = req.get("imageRoot") or os.path.commonpath([str(p["path"]) for p in photos])
|
||
|
|
model = req.get("model", "haiku")
|
||
|
|
cache_dir = req.get("cacheDir")
|
||
|
|
max_concurrent = int(req.get("maxConcurrent", 4))
|
||
|
|
|
||
|
|
client = ClaudeClient(model=model, max_concurrent=max_concurrent)
|
||
|
|
cache = ResponseCache(Path(cache_dir)) if cache_dir else None
|
||
|
|
|
||
|
|
results = await asyncio.gather(
|
||
|
|
*(_classify_one(client, cache, image_root, p) for p in photos)
|
||
|
|
)
|
||
|
|
json.dump(list(results), sys.stdout)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(asyncio.run(_main()))
|