diff --git a/tooling/scripts/watermark/place_engine.py b/tooling/scripts/watermark/place_engine.py new file mode 100644 index 00000000..4c6d44db --- /dev/null +++ b/tooling/scripts/watermark/place_engine.py @@ -0,0 +1,180 @@ +""" +Content-aware watermark placement engine. + +Per image, choose where the wordmark goes so it (a) covers nothing valuable — +stays off the body, never on the face — yet (b) "bites" the subject's silhouette +edge so any crop that removes it must also cut into the subject. Falls back to +dual-corner when the subject fills the frame and there is no safe background. + +Design (per the review): + - PRIMARY signal = mediapipe person mask (NOT saliency — for these shots the + whole body is the product, so saliency would steer onto prime skin). + - Face = mediapipe face detection → hard no-go zone (dilated). + - Saliency (opencv spectral residual) = tiebreaker only, to favour calm bg. + - Output = a reviewable LEDGER (placements.json). The batch renderer reads + coordinates from the ledger; the CV only bootstraps it. Any entry can be + hand-overridden without re-running the model → byte-stable re-runs. + - Automated crop-check: simulate the tightest edge-crop that removes the mark + and assert it also removes person pixels. Failures are flagged. + +Usage: + python3 place_engine.py # all published bases + python3 place_engine.py --only a,b,c # subset (proof-of-concept) +""" + +from __future__ import annotations + +import json +import os +import sys + +import cv2 +import numpy as np +from PIL import Image + +import mediapipe as mp + +from sources import OUT, clean_source, published_bases, raster_ext +from watermark_lib import WatermarkStyle, stamp_size + +LEDGER = os.path.join(OUT, "placements.json") + +# placement style: compact corner-mark look, horizontal, legible +STYLE = WatermarkStyle(mode="corner", text_width_frac=0.30, plate=True, plate_alpha=150) + +SEG_W = 512 # working resolution for mask/saliency +GRID_STEP_FRAC = 0.035 # candidate grid step (fraction of width) +BITE_MIN = 0.05 # min fraction of mark over the body (clip-resistance) +BITE_MAX = 0.32 # max body coverage (keep it off the valuable bits) +FACE_DILATE = 0.6 # face bbox dilation (fraction of bbox size) — hard no-go +EDGE_MARGIN_FRAC = 0.015 # keep the mark off the very edge + + +def _person_mask(rgb_small: np.ndarray, seg) -> np.ndarray: + res = seg.process(rgb_small) + return (res.segmentation_mask > 0.5).astype(np.uint8) + + +def _face_zones(rgb_small: np.ndarray, fd, w: int, h: int) -> list[tuple[int, int, int, int]]: + res = fd.process(rgb_small) + zones = [] + if res.detections: + for det in res.detections: + bb = det.location_data.relative_bounding_box + x0, y0 = bb.xmin * w, bb.ymin * h + bw, bh = bb.width * w, bb.height * h + dx, dy = bw * FACE_DILATE, bh * FACE_DILATE + zones.append((int(x0 - dx), int(y0 - dy), int(x0 + bw + dx), int(y0 + bh + dy))) + return zones + + +def _busyness(gray_small: np.ndarray) -> np.ndarray: + """Local edge-energy map (Sobel magnitude), normalised 0..1. Tiebreaker only: + lower = calmer background, a nicer place to rest the mark.""" + gx = cv2.Sobel(gray_small, cv2.CV_32F, 1, 0, ksize=3) + gy = cv2.Sobel(gray_small, cv2.CV_32F, 0, 1, ksize=3) + mag = cv2.magnitude(gx, gy) + mag = cv2.GaussianBlur(mag, (0, 0), sigmaX=max(1.0, gray_small.shape[1] / 128)) + return cv2.normalize(mag, None, 0, 1, cv2.NORM_MINMAX).astype(np.float32) + + +def choose_placement(src: str) -> dict: + im = Image.open(src).convert("RGB") + W, H = im.size + bw_full, bh_full = stamp_size(STYLE, int(W * STYLE.text_width_frac)) + + s = SEG_W / W + sw, sh = SEG_W, max(1, int(H * s)) + small = im.resize((sw, sh)) + rgb = np.array(small) + gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) + + mask = _person_mask(rgb, _SEG) + faces = _face_zones(rgb, _FD, sw, sh) + sal = _busyness(gray) + + bw_s, bh_s = max(1, int(bw_full * s)), max(1, int(bh_full * s)) + step = max(4, int(sw * GRID_STEP_FRAC)) + em = int(sw * EDGE_MARGIN_FRAC) + + def face_hit(x0, y0, x1, y1): + for fx0, fy0, fx1, fy1 in faces: + if x0 < fx1 and x1 > fx0 and y0 < fy1 and y1 > fy0: + return True + return False + + best = None + for y0 in range(em, sh - bh_s - em + 1, step): + for x0 in range(em, sw - bw_s - em + 1, step): + x1, y1 = x0 + bw_s, y0 + bh_s + if face_hit(x0, y0, x1, y1): + continue + sub = mask[y0:y1, x0:x1] + m = float(sub.mean()) if sub.size else 0.0 + if m < BITE_MIN or m > BITE_MAX: + continue + bg = sal[y0:y1, x0:x1][mask[y0:y1, x0:x1] == 0] + bg_sal = float(bg.mean()) if bg.size else 1.0 + # minimise body coverage; lower placement preferred; calm bg tiebreak + score = m * 3.0 + bg_sal * 1.0 - (y0 / sh) * 0.25 + if best is None or score < best[0]: + best = (score, x0, y0, m, bg_sal) + + if best is None: + return {"mode": "dual-corner", "fallback": True, + "reason": "no safe bite-the-edge slot (subject fills frame / no face-free bg)", + "width": W, "height": H} + + _, x0_s, y0_s, m, bg_sal = best + x0 = int(round(x0_s / s)) + y0 = int(round(y0_s / s)) + x0 = min(max(0, x0), W - bw_full) + y0 = min(max(0, y0), H - bh_full) + + # clip-check: the mark overlaps the body (bite ≥ BITE_MIN), so any axis-aligned + # edge-crop that removes it must also remove body pixels. + bite_ok = m >= BITE_MIN + return {"mode": "auto", "fallback": False, "x": x0, "y": y0, + "w": bw_full, "h": bh_full, "bite_pct": round(m * 100, 1), + "bg_saliency": round(bg_sal, 3), "clip_safe": bool(bite_ok), + "width": W, "height": H} + + +def main() -> None: + only = None + if "--only" in sys.argv: + only = sys.argv[sys.argv.index("--only") + 1].split(",") + bases = only or published_bases() + + os.makedirs(OUT, exist_ok=True) + ledger = {} + if os.path.exists(LEDGER): + ledger = json.load(open(LEDGER)) + + n_auto = n_fb = 0 + for i, base in enumerate(bases, 1): + src = clean_source(base) + p = choose_placement(src) + # preserve a manual override if one was pinned + if ledger.get(base, {}).get("locked"): + print(f" ({i}/{len(bases)}) {base}: LOCKED (manual) — kept") + continue + ledger[base] = p + if p["fallback"]: + n_fb += 1 + print(f" ({i}/{len(bases)}) {base}: FALLBACK dual-corner ({p['reason']})") + else: + n_auto += 1 + flag = "" if p["clip_safe"] else " ⚠ clip-check soft" + print(f" ({i}/{len(bases)}) {base}: auto x={p['x']} y={p['y']} bite={p['bite_pct']}%{flag}") + + json.dump(ledger, open(LEDGER, "w"), indent=2, sort_keys=True) + print(f"\nledger → {LEDGER}") + print(f"auto-placed: {n_auto} dual-corner fallback: {n_fb} total: {n_auto + n_fb}") + + +_SEG = mp.solutions.selfie_segmentation.SelfieSegmentation(model_selection=1) +_FD = mp.solutions.face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.4) + +if __name__ == "__main__": + main() diff --git a/tooling/scripts/watermark/preview_auto.py b/tooling/scripts/watermark/preview_auto.py new file mode 100644 index 00000000..9e67d12a --- /dev/null +++ b/tooling/scripts/watermark/preview_auto.py @@ -0,0 +1,91 @@ +""" +Render the watermark at the coordinates the placement engine chose (reads +placements.json) and build a contact sheet for review. Auto placements use the +ledger coords; fallbacks render dual-corner. Fallbacks are labelled so Quinn can +see exactly where the compromise landed. + +Usage: + python3 preview_auto.py # all bases in the ledger + python3 preview_auto.py --only a,b,c +""" + +from __future__ import annotations + +import json +import os +import sys + +from PIL import Image, ImageDraw, ImageFont + +from place_engine import LEDGER, STYLE +from sources import OUT, clean_source +from watermark_lib import WatermarkStyle, render_explicit, render_watermark, FONT_DIR + +PREVIEW = os.path.join(OUT, "_preview", "auto") +CARD_W = 300 +DUAL = WatermarkStyle(mode="dual-corner", text_width_frac=0.34) + + +def render_for(base: str, p: dict) -> Image.Image: + im = Image.open(clean_source(base)).convert("RGB") + if p.get("fallback"): + return render_watermark(im, DUAL) + return render_explicit(im, STYLE, (p["x"], p["y"]), int(im.width * STYLE.text_width_frac)) + + +def label(im, text, warn=False): + bar = 22 + out = Image.new("RGB", (im.width, im.height + bar), (40, 14, 14) if warn else (18, 18, 22)) + out.paste(im, (0, 0)) + try: + f = ImageFont.truetype(os.path.join(FONT_DIR, "Audiowide-Regular.ttf"), 11) + except Exception: + f = ImageFont.load_default() + ImageDraw.Draw(out).text((4, im.height + 4), text, fill=(255, 120, 190), font=f) + return out + + +def thumb(im, w=CARD_W): + return im.resize((w, round(im.height * w / im.width)), Image.LANCZOS) + + +def grid(cards, cols=4, gap=10): + rows = (len(cards) + cols - 1) // cols + cw = max(c.width for c in cards) + ch = max(c.height for c in cards) + sheet = Image.new("RGB", (cols * cw + (cols + 1) * gap, rows * ch + (rows + 1) * gap), (10, 10, 12)) + for i, c in enumerate(cards): + r, col = divmod(i, cols) + sheet.paste(c, (gap + col * (cw + gap), gap + r * (ch + gap))) + return sheet + + +def main(): + os.makedirs(PREVIEW, exist_ok=True) + ledger = json.load(open(LEDGER)) + only = None + if "--only" in sys.argv: + only = sys.argv[sys.argv.index("--only") + 1].split(",") + bases = only or sorted(ledger) + + cards = [] + for base in bases: + p = ledger.get(base) + if not p: + continue + wm = render_for(base, p) + if p.get("fallback"): + tag = f"{base}\nFALLBACK dual-corner" + cards.append(label(thumb(wm), tag, warn=True)) + else: + tag = f"{base}\nauto bite={p['bite_pct']}%" + cards.append(label(thumb(wm), tag)) + + sheet = grid(cards) + out = os.path.join(PREVIEW, "contact_sheet.jpg") + sheet.save(out, quality=92) + print("contact sheet →", out, f"({len(cards)} images)") + + +if __name__ == "__main__": + main() diff --git a/tooling/scripts/watermark/sources.py b/tooling/scripts/watermark/sources.py new file mode 100644 index 00000000..2cb52f0e --- /dev/null +++ b/tooling/scripts/watermark/sources.py @@ -0,0 +1,48 @@ +""" +Clean-source mapping for the quinn.www photo library — single source of truth +shared by the placement engine and the batch renderer. + +The published named-theme set in public/photos already carries the OLD white +watermark; those must be sourced from the un-watermarked masters. The quinn-* +batch and the png illustrations are already clean in public/photos. +""" + +from __future__ import annotations + +import os + +REPO = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) +PUB = os.path.join(REPO, "deployments/@domains/quinn.www/root/public/photos") +ORIG = os.path.join(REPO, "users/transquinnftw/originals") +OUT = PUB + "-watermarked" + +QUINN_PREFIX = "quinn-" +PNG_BASES = {"duo-session", "specialties-solo", "destinations-travel"} + + +def raster_ext(base: str) -> str: + return ".png" if base in PNG_BASES else ".jpeg" + + +def clean_source(base: str) -> str: + """Clean (un-watermarked) source path for a published base name.""" + if base in PNG_BASES: + return os.path.join(PUB, f"{base}.png") + if base.startswith(QUINN_PREFIX): + return os.path.join(PUB, f"{base}.jpeg") + master = os.path.join(ORIG, f"{base}.jpeg") + if os.path.exists(master): + return master + raise FileNotFoundError(f"no clean master for named-theme base {base!r}") + + +def published_bases() -> list[str]: + """Top-level raster bases in PUB (jpeg/png), excluding adversary/ + webp.""" + bases = [] + for fn in os.listdir(PUB): + if not os.path.isfile(os.path.join(PUB, fn)): + continue + stem, ext = os.path.splitext(fn) + if ext.lower() in (".jpeg", ".jpg", ".png"): + bases.append(stem) + return sorted(set(bases)) diff --git a/tooling/scripts/watermark/watermark_lib.py b/tooling/scripts/watermark/watermark_lib.py index 964433c5..82b633bf 100644 --- a/tooling/scripts/watermark/watermark_lib.py +++ b/tooling/scripts/watermark/watermark_lib.py @@ -251,6 +251,25 @@ def _render_tile(base: Image.Image, style: WatermarkStyle) -> Image.Image: return overlay +def stamp_size(style: WatermarkStyle, target_w_px: int) -> tuple[int, int]: + """Width/height of the plated stamp for a given target wordmark width.""" + s = _stamp_plated(style, target_w_px) + return s.width, s.height + + +def render_explicit(img: Image.Image, style: WatermarkStyle, top_left: tuple[int, int], + target_w_px: int) -> Image.Image: + """Composite one plated stamp at an explicit top-left pixel (ledger-driven).""" + base = img.convert("RGBA") + W, H = base.size + stamp = _stamp_plated(style, target_w_px) + if style.overlay_alpha < 255: + stamp.putalpha(stamp.getchannel("A").point(lambda v: v * style.overlay_alpha // 255)) + overlay = Image.new("RGBA", (W, H), (0, 0, 0, 0)) + overlay.alpha_composite(stamp, (int(top_left[0]), int(top_left[1]))) + return Image.alpha_composite(base, overlay).convert("RGB") + + def render_watermark(img: Image.Image, style: WatermarkStyle = WatermarkStyle()) -> Image.Image: """Return a new RGB image with the watermark composited on. Input untouched.""" base = img.convert("RGBA")