feat(watermark): Introduce watermark placement engine with auto-preview and source handling for branding/security

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-06-03 06:20:38 -07:00
parent bab4faa6ad
commit f98d7d6ed9
4 changed files with 338 additions and 0 deletions

View file

@ -0,0 +1,180 @@
"""
Content-aware watermark placement engine.
Per image, choose where the wordmark goes so it (a) covers nothing valuable
stays off the body, never on the face yet (b) "bites" the subject's silhouette
edge so any crop that removes it must also cut into the subject. Falls back to
dual-corner when the subject fills the frame and there is no safe background.
Design (per the review):
- PRIMARY signal = mediapipe person mask (NOT saliency for these shots the
whole body is the product, so saliency would steer onto prime skin).
- Face = mediapipe face detection hard no-go zone (dilated).
- Saliency (opencv spectral residual) = tiebreaker only, to favour calm bg.
- Output = a reviewable LEDGER (placements.json). The batch renderer reads
coordinates from the ledger; the CV only bootstraps it. Any entry can be
hand-overridden without re-running the model byte-stable re-runs.
- Automated crop-check: simulate the tightest edge-crop that removes the mark
and assert it also removes person pixels. Failures are flagged.
Usage:
python3 place_engine.py # all published bases
python3 place_engine.py --only a,b,c # subset (proof-of-concept)
"""
from __future__ import annotations
import json
import os
import sys
import cv2
import numpy as np
from PIL import Image
import mediapipe as mp
from sources import OUT, clean_source, published_bases, raster_ext
from watermark_lib import WatermarkStyle, stamp_size
LEDGER = os.path.join(OUT, "placements.json")
# placement style: compact corner-mark look, horizontal, legible
STYLE = WatermarkStyle(mode="corner", text_width_frac=0.30, plate=True, plate_alpha=150)
SEG_W = 512 # working resolution for mask/saliency
GRID_STEP_FRAC = 0.035 # candidate grid step (fraction of width)
BITE_MIN = 0.05 # min fraction of mark over the body (clip-resistance)
BITE_MAX = 0.32 # max body coverage (keep it off the valuable bits)
FACE_DILATE = 0.6 # face bbox dilation (fraction of bbox size) — hard no-go
EDGE_MARGIN_FRAC = 0.015 # keep the mark off the very edge
def _person_mask(rgb_small: np.ndarray, seg) -> np.ndarray:
res = seg.process(rgb_small)
return (res.segmentation_mask > 0.5).astype(np.uint8)
def _face_zones(rgb_small: np.ndarray, fd, w: int, h: int) -> list[tuple[int, int, int, int]]:
res = fd.process(rgb_small)
zones = []
if res.detections:
for det in res.detections:
bb = det.location_data.relative_bounding_box
x0, y0 = bb.xmin * w, bb.ymin * h
bw, bh = bb.width * w, bb.height * h
dx, dy = bw * FACE_DILATE, bh * FACE_DILATE
zones.append((int(x0 - dx), int(y0 - dy), int(x0 + bw + dx), int(y0 + bh + dy)))
return zones
def _busyness(gray_small: np.ndarray) -> np.ndarray:
"""Local edge-energy map (Sobel magnitude), normalised 0..1. Tiebreaker only:
lower = calmer background, a nicer place to rest the mark."""
gx = cv2.Sobel(gray_small, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray_small, cv2.CV_32F, 0, 1, ksize=3)
mag = cv2.magnitude(gx, gy)
mag = cv2.GaussianBlur(mag, (0, 0), sigmaX=max(1.0, gray_small.shape[1] / 128))
return cv2.normalize(mag, None, 0, 1, cv2.NORM_MINMAX).astype(np.float32)
def choose_placement(src: str) -> dict:
im = Image.open(src).convert("RGB")
W, H = im.size
bw_full, bh_full = stamp_size(STYLE, int(W * STYLE.text_width_frac))
s = SEG_W / W
sw, sh = SEG_W, max(1, int(H * s))
small = im.resize((sw, sh))
rgb = np.array(small)
gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
mask = _person_mask(rgb, _SEG)
faces = _face_zones(rgb, _FD, sw, sh)
sal = _busyness(gray)
bw_s, bh_s = max(1, int(bw_full * s)), max(1, int(bh_full * s))
step = max(4, int(sw * GRID_STEP_FRAC))
em = int(sw * EDGE_MARGIN_FRAC)
def face_hit(x0, y0, x1, y1):
for fx0, fy0, fx1, fy1 in faces:
if x0 < fx1 and x1 > fx0 and y0 < fy1 and y1 > fy0:
return True
return False
best = None
for y0 in range(em, sh - bh_s - em + 1, step):
for x0 in range(em, sw - bw_s - em + 1, step):
x1, y1 = x0 + bw_s, y0 + bh_s
if face_hit(x0, y0, x1, y1):
continue
sub = mask[y0:y1, x0:x1]
m = float(sub.mean()) if sub.size else 0.0
if m < BITE_MIN or m > BITE_MAX:
continue
bg = sal[y0:y1, x0:x1][mask[y0:y1, x0:x1] == 0]
bg_sal = float(bg.mean()) if bg.size else 1.0
# minimise body coverage; lower placement preferred; calm bg tiebreak
score = m * 3.0 + bg_sal * 1.0 - (y0 / sh) * 0.25
if best is None or score < best[0]:
best = (score, x0, y0, m, bg_sal)
if best is None:
return {"mode": "dual-corner", "fallback": True,
"reason": "no safe bite-the-edge slot (subject fills frame / no face-free bg)",
"width": W, "height": H}
_, x0_s, y0_s, m, bg_sal = best
x0 = int(round(x0_s / s))
y0 = int(round(y0_s / s))
x0 = min(max(0, x0), W - bw_full)
y0 = min(max(0, y0), H - bh_full)
# clip-check: the mark overlaps the body (bite ≥ BITE_MIN), so any axis-aligned
# edge-crop that removes it must also remove body pixels.
bite_ok = m >= BITE_MIN
return {"mode": "auto", "fallback": False, "x": x0, "y": y0,
"w": bw_full, "h": bh_full, "bite_pct": round(m * 100, 1),
"bg_saliency": round(bg_sal, 3), "clip_safe": bool(bite_ok),
"width": W, "height": H}
def main() -> None:
only = None
if "--only" in sys.argv:
only = sys.argv[sys.argv.index("--only") + 1].split(",")
bases = only or published_bases()
os.makedirs(OUT, exist_ok=True)
ledger = {}
if os.path.exists(LEDGER):
ledger = json.load(open(LEDGER))
n_auto = n_fb = 0
for i, base in enumerate(bases, 1):
src = clean_source(base)
p = choose_placement(src)
# preserve a manual override if one was pinned
if ledger.get(base, {}).get("locked"):
print(f" ({i}/{len(bases)}) {base}: LOCKED (manual) — kept")
continue
ledger[base] = p
if p["fallback"]:
n_fb += 1
print(f" ({i}/{len(bases)}) {base}: FALLBACK dual-corner ({p['reason']})")
else:
n_auto += 1
flag = "" if p["clip_safe"] else " ⚠ clip-check soft"
print(f" ({i}/{len(bases)}) {base}: auto x={p['x']} y={p['y']} bite={p['bite_pct']}%{flag}")
json.dump(ledger, open(LEDGER, "w"), indent=2, sort_keys=True)
print(f"\nledger → {LEDGER}")
print(f"auto-placed: {n_auto} dual-corner fallback: {n_fb} total: {n_auto + n_fb}")
_SEG = mp.solutions.selfie_segmentation.SelfieSegmentation(model_selection=1)
_FD = mp.solutions.face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.4)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,91 @@
"""
Render the watermark at the coordinates the placement engine chose (reads
placements.json) and build a contact sheet for review. Auto placements use the
ledger coords; fallbacks render dual-corner. Fallbacks are labelled so Quinn can
see exactly where the compromise landed.
Usage:
python3 preview_auto.py # all bases in the ledger
python3 preview_auto.py --only a,b,c
"""
from __future__ import annotations
import json
import os
import sys
from PIL import Image, ImageDraw, ImageFont
from place_engine import LEDGER, STYLE
from sources import OUT, clean_source
from watermark_lib import WatermarkStyle, render_explicit, render_watermark, FONT_DIR
PREVIEW = os.path.join(OUT, "_preview", "auto")
CARD_W = 300
DUAL = WatermarkStyle(mode="dual-corner", text_width_frac=0.34)
def render_for(base: str, p: dict) -> Image.Image:
im = Image.open(clean_source(base)).convert("RGB")
if p.get("fallback"):
return render_watermark(im, DUAL)
return render_explicit(im, STYLE, (p["x"], p["y"]), int(im.width * STYLE.text_width_frac))
def label(im, text, warn=False):
bar = 22
out = Image.new("RGB", (im.width, im.height + bar), (40, 14, 14) if warn else (18, 18, 22))
out.paste(im, (0, 0))
try:
f = ImageFont.truetype(os.path.join(FONT_DIR, "Audiowide-Regular.ttf"), 11)
except Exception:
f = ImageFont.load_default()
ImageDraw.Draw(out).text((4, im.height + 4), text, fill=(255, 120, 190), font=f)
return out
def thumb(im, w=CARD_W):
return im.resize((w, round(im.height * w / im.width)), Image.LANCZOS)
def grid(cards, cols=4, gap=10):
rows = (len(cards) + cols - 1) // cols
cw = max(c.width for c in cards)
ch = max(c.height for c in cards)
sheet = Image.new("RGB", (cols * cw + (cols + 1) * gap, rows * ch + (rows + 1) * gap), (10, 10, 12))
for i, c in enumerate(cards):
r, col = divmod(i, cols)
sheet.paste(c, (gap + col * (cw + gap), gap + r * (ch + gap)))
return sheet
def main():
os.makedirs(PREVIEW, exist_ok=True)
ledger = json.load(open(LEDGER))
only = None
if "--only" in sys.argv:
only = sys.argv[sys.argv.index("--only") + 1].split(",")
bases = only or sorted(ledger)
cards = []
for base in bases:
p = ledger.get(base)
if not p:
continue
wm = render_for(base, p)
if p.get("fallback"):
tag = f"{base}\nFALLBACK dual-corner"
cards.append(label(thumb(wm), tag, warn=True))
else:
tag = f"{base}\nauto bite={p['bite_pct']}%"
cards.append(label(thumb(wm), tag))
sheet = grid(cards)
out = os.path.join(PREVIEW, "contact_sheet.jpg")
sheet.save(out, quality=92)
print("contact sheet →", out, f"({len(cards)} images)")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,48 @@
"""
Clean-source mapping for the quinn.www photo library single source of truth
shared by the placement engine and the batch renderer.
The published named-theme set in public/photos already carries the OLD white
watermark; those must be sourced from the un-watermarked masters. The quinn-*
batch and the png illustrations are already clean in public/photos.
"""
from __future__ import annotations
import os
REPO = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
PUB = os.path.join(REPO, "deployments/@domains/quinn.www/root/public/photos")
ORIG = os.path.join(REPO, "users/transquinnftw/originals")
OUT = PUB + "-watermarked"
QUINN_PREFIX = "quinn-"
PNG_BASES = {"duo-session", "specialties-solo", "destinations-travel"}
def raster_ext(base: str) -> str:
return ".png" if base in PNG_BASES else ".jpeg"
def clean_source(base: str) -> str:
"""Clean (un-watermarked) source path for a published base name."""
if base in PNG_BASES:
return os.path.join(PUB, f"{base}.png")
if base.startswith(QUINN_PREFIX):
return os.path.join(PUB, f"{base}.jpeg")
master = os.path.join(ORIG, f"{base}.jpeg")
if os.path.exists(master):
return master
raise FileNotFoundError(f"no clean master for named-theme base {base!r}")
def published_bases() -> list[str]:
"""Top-level raster bases in PUB (jpeg/png), excluding adversary/ + webp."""
bases = []
for fn in os.listdir(PUB):
if not os.path.isfile(os.path.join(PUB, fn)):
continue
stem, ext = os.path.splitext(fn)
if ext.lower() in (".jpeg", ".jpg", ".png"):
bases.append(stem)
return sorted(set(bases))

View file

@ -251,6 +251,25 @@ def _render_tile(base: Image.Image, style: WatermarkStyle) -> Image.Image:
return overlay
def stamp_size(style: WatermarkStyle, target_w_px: int) -> tuple[int, int]:
"""Width/height of the plated stamp for a given target wordmark width."""
s = _stamp_plated(style, target_w_px)
return s.width, s.height
def render_explicit(img: Image.Image, style: WatermarkStyle, top_left: tuple[int, int],
target_w_px: int) -> Image.Image:
"""Composite one plated stamp at an explicit top-left pixel (ledger-driven)."""
base = img.convert("RGBA")
W, H = base.size
stamp = _stamp_plated(style, target_w_px)
if style.overlay_alpha < 255:
stamp.putalpha(stamp.getchannel("A").point(lambda v: v * style.overlay_alpha // 255))
overlay = Image.new("RGBA", (W, H), (0, 0, 0, 0))
overlay.alpha_composite(stamp, (int(top_left[0]), int(top_left[1])))
return Image.alpha_composite(base, overlay).convert("RGB")
def render_watermark(img: Image.Image, style: WatermarkStyle = WatermarkStyle()) -> Image.Image:
"""Return a new RGB image with the watermark composited on. Input untouched."""
base = img.convert("RGBA")