180 lines
6.9 KiB
Python
180 lines
6.9 KiB
Python
"""
|
|
Content-aware watermark placement engine.
|
|
|
|
Per image, choose where the wordmark goes so it (a) covers nothing valuable —
|
|
stays off the body, never on the face — yet (b) "bites" the subject's silhouette
|
|
edge so any crop that removes it must also cut into the subject. Falls back to
|
|
dual-corner when the subject fills the frame and there is no safe background.
|
|
|
|
Design (per the review):
|
|
- PRIMARY signal = mediapipe person mask (NOT saliency — for these shots the
|
|
whole body is the product, so saliency would steer onto prime skin).
|
|
- Face = mediapipe face detection → hard no-go zone (dilated).
|
|
- Saliency (opencv spectral residual) = tiebreaker only, to favour calm bg.
|
|
- Output = a reviewable LEDGER (placements.json). The batch renderer reads
|
|
coordinates from the ledger; the CV only bootstraps it. Any entry can be
|
|
hand-overridden without re-running the model → byte-stable re-runs.
|
|
- Automated crop-check: simulate the tightest edge-crop that removes the mark
|
|
and assert it also removes person pixels. Failures are flagged.
|
|
|
|
Usage:
|
|
python3 place_engine.py # all published bases
|
|
python3 place_engine.py --only a,b,c # subset (proof-of-concept)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
import mediapipe as mp
|
|
|
|
from sources import OUT, clean_source, published_bases, raster_ext
|
|
from watermark_lib import WatermarkStyle, stamp_size
|
|
|
|
LEDGER = os.path.join(OUT, "placements.json")
|
|
|
|
# placement style: compact corner-mark look, horizontal, legible
|
|
STYLE = WatermarkStyle(mode="corner", text_width_frac=0.30, plate=True, plate_alpha=150)
|
|
|
|
SEG_W = 512 # working resolution for mask/saliency
|
|
GRID_STEP_FRAC = 0.035 # candidate grid step (fraction of width)
|
|
BITE_MIN = 0.05 # min fraction of mark over the body (clip-resistance)
|
|
BITE_MAX = 0.32 # max body coverage (keep it off the valuable bits)
|
|
FACE_DILATE = 0.6 # face bbox dilation (fraction of bbox size) — hard no-go
|
|
EDGE_MARGIN_FRAC = 0.015 # keep the mark off the very edge
|
|
|
|
|
|
def _person_mask(rgb_small: np.ndarray, seg) -> np.ndarray:
|
|
res = seg.process(rgb_small)
|
|
return (res.segmentation_mask > 0.5).astype(np.uint8)
|
|
|
|
|
|
def _face_zones(rgb_small: np.ndarray, fd, w: int, h: int) -> list[tuple[int, int, int, int]]:
|
|
res = fd.process(rgb_small)
|
|
zones = []
|
|
if res.detections:
|
|
for det in res.detections:
|
|
bb = det.location_data.relative_bounding_box
|
|
x0, y0 = bb.xmin * w, bb.ymin * h
|
|
bw, bh = bb.width * w, bb.height * h
|
|
dx, dy = bw * FACE_DILATE, bh * FACE_DILATE
|
|
zones.append((int(x0 - dx), int(y0 - dy), int(x0 + bw + dx), int(y0 + bh + dy)))
|
|
return zones
|
|
|
|
|
|
def _busyness(gray_small: np.ndarray) -> np.ndarray:
|
|
"""Local edge-energy map (Sobel magnitude), normalised 0..1. Tiebreaker only:
|
|
lower = calmer background, a nicer place to rest the mark."""
|
|
gx = cv2.Sobel(gray_small, cv2.CV_32F, 1, 0, ksize=3)
|
|
gy = cv2.Sobel(gray_small, cv2.CV_32F, 0, 1, ksize=3)
|
|
mag = cv2.magnitude(gx, gy)
|
|
mag = cv2.GaussianBlur(mag, (0, 0), sigmaX=max(1.0, gray_small.shape[1] / 128))
|
|
return cv2.normalize(mag, None, 0, 1, cv2.NORM_MINMAX).astype(np.float32)
|
|
|
|
|
|
def choose_placement(src: str) -> dict:
|
|
im = Image.open(src).convert("RGB")
|
|
W, H = im.size
|
|
bw_full, bh_full = stamp_size(STYLE, int(W * STYLE.text_width_frac))
|
|
|
|
s = SEG_W / W
|
|
sw, sh = SEG_W, max(1, int(H * s))
|
|
small = im.resize((sw, sh))
|
|
rgb = np.array(small)
|
|
gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
|
|
|
|
mask = _person_mask(rgb, _SEG)
|
|
faces = _face_zones(rgb, _FD, sw, sh)
|
|
sal = _busyness(gray)
|
|
|
|
bw_s, bh_s = max(1, int(bw_full * s)), max(1, int(bh_full * s))
|
|
step = max(4, int(sw * GRID_STEP_FRAC))
|
|
em = int(sw * EDGE_MARGIN_FRAC)
|
|
|
|
def face_hit(x0, y0, x1, y1):
|
|
for fx0, fy0, fx1, fy1 in faces:
|
|
if x0 < fx1 and x1 > fx0 and y0 < fy1 and y1 > fy0:
|
|
return True
|
|
return False
|
|
|
|
best = None
|
|
for y0 in range(em, sh - bh_s - em + 1, step):
|
|
for x0 in range(em, sw - bw_s - em + 1, step):
|
|
x1, y1 = x0 + bw_s, y0 + bh_s
|
|
if face_hit(x0, y0, x1, y1):
|
|
continue
|
|
sub = mask[y0:y1, x0:x1]
|
|
m = float(sub.mean()) if sub.size else 0.0
|
|
if m < BITE_MIN or m > BITE_MAX:
|
|
continue
|
|
bg = sal[y0:y1, x0:x1][mask[y0:y1, x0:x1] == 0]
|
|
bg_sal = float(bg.mean()) if bg.size else 1.0
|
|
# minimise body coverage; lower placement preferred; calm bg tiebreak
|
|
score = m * 3.0 + bg_sal * 1.0 - (y0 / sh) * 0.25
|
|
if best is None or score < best[0]:
|
|
best = (score, x0, y0, m, bg_sal)
|
|
|
|
if best is None:
|
|
return {"mode": "dual-corner", "fallback": True,
|
|
"reason": "no safe bite-the-edge slot (subject fills frame / no face-free bg)",
|
|
"width": W, "height": H}
|
|
|
|
_, x0_s, y0_s, m, bg_sal = best
|
|
x0 = int(round(x0_s / s))
|
|
y0 = int(round(y0_s / s))
|
|
x0 = min(max(0, x0), W - bw_full)
|
|
y0 = min(max(0, y0), H - bh_full)
|
|
|
|
# clip-check: the mark overlaps the body (bite ≥ BITE_MIN), so any axis-aligned
|
|
# edge-crop that removes it must also remove body pixels.
|
|
bite_ok = m >= BITE_MIN
|
|
return {"mode": "auto", "fallback": False, "x": x0, "y": y0,
|
|
"w": bw_full, "h": bh_full, "bite_pct": round(m * 100, 1),
|
|
"bg_saliency": round(bg_sal, 3), "clip_safe": bool(bite_ok),
|
|
"width": W, "height": H}
|
|
|
|
|
|
def main() -> None:
|
|
only = None
|
|
if "--only" in sys.argv:
|
|
only = sys.argv[sys.argv.index("--only") + 1].split(",")
|
|
bases = only or published_bases()
|
|
|
|
os.makedirs(OUT, exist_ok=True)
|
|
ledger = {}
|
|
if os.path.exists(LEDGER):
|
|
ledger = json.load(open(LEDGER))
|
|
|
|
n_auto = n_fb = 0
|
|
for i, base in enumerate(bases, 1):
|
|
src = clean_source(base)
|
|
p = choose_placement(src)
|
|
# preserve a manual override if one was pinned
|
|
if ledger.get(base, {}).get("locked"):
|
|
print(f" ({i}/{len(bases)}) {base}: LOCKED (manual) — kept")
|
|
continue
|
|
ledger[base] = p
|
|
if p["fallback"]:
|
|
n_fb += 1
|
|
print(f" ({i}/{len(bases)}) {base}: FALLBACK dual-corner ({p['reason']})")
|
|
else:
|
|
n_auto += 1
|
|
flag = "" if p["clip_safe"] else " ⚠ clip-check soft"
|
|
print(f" ({i}/{len(bases)}) {base}: auto x={p['x']} y={p['y']} bite={p['bite_pct']}%{flag}")
|
|
|
|
json.dump(ledger, open(LEDGER, "w"), indent=2, sort_keys=True)
|
|
print(f"\nledger → {LEDGER}")
|
|
print(f"auto-placed: {n_auto} dual-corner fallback: {n_fb} total: {n_auto + n_fb}")
|
|
|
|
|
|
_SEG = mp.solutions.selfie_segmentation.SelfieSegmentation(model_selection=1)
|
|
_FD = mp.solutions.face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.4)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|