# _engineering-surface-metrics — schema, ingestion, rollup, attribution **Genre**: engineering annex (non-UX). Executable spec for the cross-surface analytics stack. Sibling to [_engineering-talent-scout-port.md §0.2](./_engineering-talent-scout-port.md) (which owns the *design rationale*); this doc owns the *executable schema + adapter contracts + scheduler*. Per SSOT.1 — one canonical home per concern. **Status**: design-frozen 2026-05-18 (Quinn-directive). Schema not yet migrated; adapter not yet implemented. Implementation phased per talent-scout-port §0.2 (P1 → P4). --- ## §1 — Concerns owned by this doc | Concern | Owner | |---|---| | `surface_metrics` SQL migration | this doc §3 | | `prospect_touchpoints` SQL migration | this doc §4 | | `metric_aggregates` SQL migration + materialization scheduler | this doc §5 | | Per-surface ingestion adapter contract | this doc §6 | | Rollup job scheduler (BullMQ) | this doc §7 | | Attribution model implementation (first/last/time-decay/position) | this doc §8 | | Privacy + RLS enforcement | this doc §9 | **Not owned here** (referenced only): - Design rationale + Quinn's directive context → [_engineering-talent-scout-port.md §0.2](./_engineering-talent-scout-port.md) - Aggregation UX + funnel viz → [T-analytics-dashboard.brief.md](./T-analytics-dashboard.brief.md) §T-aggregate / §T-attribution / §T-attribution-model - Per-surface tier-gating facts → [surface-tryst.brief.md §canonical-facts](./surface-tryst.brief.md) (Tryst-specific) and sibling per-surface briefs - Cross-surface identity dedup logic → [specialist-prospect-resolver.contract.md](./specialist-prospect-resolver.contract.md) --- ## §2 — Phasing Per talent-scout-port §0.2: | Phase | Triggers | Scope | |---|---|---| | **P1** | Tryst MVP live | `surface_metrics` + Tryst ingestion + dashboard T3 row | | **P2** | Second surface (OF or X) online | `metric_aggregates` + dashboard T1/T4 become aggregate-canonical | | **P3** | `prospect-resolver` specialist online | `prospect_touchpoints` + per-prospect attribution view + dashboard T2 funnel | | **P4** | Tuning | Attribution model picker; per-surface weight calibration | This doc spec-completes P1+P2+P3 schema today (one migration each); P4 is a runtime-config concern, no schema change. --- ## §3 — `surface_metrics` migration (P1) **File**: `@platform/infrastructure/sql/migrations/0004_surface_metrics.sql` ```sql -- Per-surface raw metrics snapshots. -- Insert-only on each ingestion (no UPDATE); deduplicate on read via DISTINCT ON (window). CREATE TYPE surface_metric_kind AS ENUM ( -- Discovery-funnel kinds 'profile_view', 'search_impression', 'search_rank', 'click_through', -- Engagement-funnel kinds 'dm_inbound', 'dm_outbound', 'reply_rate', -- Conversion-funnel kinds 'subscription_new', 'subscription_total', 'tip_amount', 'tip_count', 'booking_inquiry', 'booking_confirmed', -- Revenue 'gross_revenue', 'net_revenue' ); CREATE TABLE surface_metrics ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, org_id UUID NULL REFERENCES orgs(id) ON DELETE CASCADE, surface TEXT NOT NULL, -- surface_kind (tryst, onlyfans, x, ...) metric_kind surface_metric_kind NOT NULL, window_start TIMESTAMPTZ NOT NULL, window_end TIMESTAMPTZ NOT NULL, value_numeric NUMERIC(18,4), -- counts, sums, rates value_text TEXT, -- rank labels ("Boosted"), categorical currency TEXT, -- ISO 4217 when value_numeric is monetary source TEXT NOT NULL, -- 'native_api' | 'native_scrape' | 'derived' | 'manual' fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(), payload_jsonb JSONB, -- raw surface-side response for audit created_at TIMESTAMPTZ NOT NULL DEFAULT now(), CHECK (window_end > window_start), CHECK (value_numeric IS NOT NULL OR value_text IS NOT NULL) ); CREATE INDEX idx_surface_metrics_user_surface_kind ON surface_metrics(user_id, surface, metric_kind, window_end DESC); CREATE INDEX idx_surface_metrics_org_surface_kind ON surface_metrics(org_id, surface, metric_kind, window_end DESC) WHERE org_id IS NOT NULL; -- Dedup: same (user, surface, metric, window) — keep newest fetched_at on read CREATE INDEX idx_surface_metrics_dedup ON surface_metrics(user_id, surface, metric_kind, window_start, window_end, fetched_at DESC); ALTER TABLE surface_metrics ENABLE ROW LEVEL SECURITY; CREATE POLICY tenant_isolation ON surface_metrics USING (user_id = current_setting('app.user_id')::UUID OR org_id = current_setting('app.org_id', true)::UUID); ``` **Read pattern**: ```sql SELECT DISTINCT ON (surface, metric_kind, window_start, window_end) * FROM surface_metrics WHERE user_id = $1 AND surface = $2 AND window_end >= $3 ORDER BY surface, metric_kind, window_start, window_end, fetched_at DESC; ``` --- ## §4 — `prospect_touchpoints` migration (P3) **File**: `@platform/infrastructure/sql/migrations/0006_prospect_touchpoints.sql` ```sql CREATE TYPE touchpoint_kind AS ENUM ( 'profile_view', 'dm_inbound', 'dm_outbound', 'reply', 'reaction', -- like / heart / favorite 'tip', 'subscription_new', 'subscription_renew', 'subscription_cancel', 'booking_inquiry', 'booking_confirmed', 'booking_completed', 'block', 'unblock' ); CREATE TABLE prospect_touchpoints ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, org_id UUID NULL REFERENCES orgs(id) ON DELETE CASCADE, prospect_id UUID NULL REFERENCES prospects(id) ON DELETE SET NULL, -- NULL = unresolved/anonymous surface TEXT NOT NULL, touchpoint touchpoint_kind NOT NULL, external_id TEXT, -- surface-side event id, for idempotent ingest identifier_hash BYTEA, -- SHA-256 of (handle | email | phone), Brief K-compliant occurred_at TIMESTAMPTZ NOT NULL, payload_jsonb JSONB, -- per-surface raw event attributed_to_touchpoint_id UUID NULL REFERENCES prospect_touchpoints(id), -- attribution chain ingestion_source TEXT NOT NULL, -- 'adapter_poll' | 'webhook' | 'manual' created_at TIMESTAMPTZ NOT NULL DEFAULT now(), -- One row per surface-side event; idempotency UNIQUE (surface, external_id) DEFERRABLE INITIALLY DEFERRED ); CREATE INDEX idx_touchpoints_prospect_time ON prospect_touchpoints(prospect_id, occurred_at DESC) WHERE prospect_id IS NOT NULL; CREATE INDEX idx_touchpoints_user_surface_time ON prospect_touchpoints(user_id, surface, occurred_at DESC); CREATE INDEX idx_touchpoints_identifier_hash ON prospect_touchpoints(user_id, identifier_hash) WHERE identifier_hash IS NOT NULL; CREATE INDEX idx_touchpoints_anonymous ON prospect_touchpoints(user_id, surface, occurred_at DESC) WHERE prospect_id IS NULL; ALTER TABLE prospect_touchpoints ENABLE ROW LEVEL SECURITY; CREATE POLICY tenant_isolation ON prospect_touchpoints USING (user_id = current_setting('app.user_id')::UUID OR org_id = current_setting('app.org_id', true)::UUID); ``` **Invariants** (enforced at adapter + specialist layer, not DB): - Anonymous touchpoints (prospect_id NULL) MUST appear only in aggregate queries — never join into per-prospect view. - `attributed_to_touchpoint_id` is set by `prospect-resolver` after identity-link resolves; initial insert leaves it NULL. - `identifier_hash` is the only identifying fragment stored when `prospect_id` is NULL — never plaintext handle. --- ## §5 — `metric_aggregates` migration (P2) **File**: `@platform/infrastructure/sql/migrations/0005_metric_aggregates.sql` ```sql CREATE TABLE metric_aggregates ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, org_id UUID NULL REFERENCES orgs(id) ON DELETE CASCADE, metric_kind surface_metric_kind NOT NULL, window_start TIMESTAMPTZ NOT NULL, window_end TIMESTAMPTZ NOT NULL, value_numeric NUMERIC(18,4) NOT NULL, currency TEXT, surfaces_included TEXT[] NOT NULL, -- which surfaces contributed surface_breakdown JSONB NOT NULL, -- {tryst: 1234, onlyfans: 567, ...} computed_at TIMESTAMPTZ NOT NULL DEFAULT now(), stale_after TIMESTAMPTZ NOT NULL, -- next-refresh-due; rollup job reads this created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (user_id, metric_kind, window_start, window_end) ); CREATE INDEX idx_aggregates_user_kind_window ON metric_aggregates(user_id, metric_kind, window_end DESC); CREATE INDEX idx_aggregates_stale ON metric_aggregates(stale_after ASC) WHERE stale_after < now() + INTERVAL '1 hour'; ALTER TABLE metric_aggregates ENABLE ROW LEVEL SECURITY; CREATE POLICY tenant_isolation ON metric_aggregates USING (user_id = current_setting('app.user_id')::UUID OR org_id = current_setting('app.org_id', true)::UUID); ``` **Materialization cadence**: - Active window (last 24h): refresh every 15 minutes. - Recent (24h–7d): every 1 hour. - Older than 7d: daily. - Older than 90d: never refresh once computed (immutable historical). Read-time: clients query `metric_aggregates` directly. If `stale_after < now()` and the matching `surface_metrics` rows have advanced, the rollup job will refresh; UI shows "computed N min ago" stamp. --- ## §6 — Adapter ingestion contract Each surface adapter that supports native metrics implements `fetch-metrics.ts`: ```typescript // @cocottetech/@platform/codebase/@features/bookings-{surface}/adapter/fetch-metrics.ts import type { SurfaceAdapterAction } from '@cocottetech/surface-adapter-contracts'; export interface FetchMetricsInput { user_id: string; org_id?: string; /** Tier-gating decision: caller has already verified the user's surface tier supports metrics. */ reason: 'scheduled' | 'manual_refresh'; } export interface SurfaceMetricRow { metric_kind: SurfaceMetricKind; window_start: string; // ISO 8601 window_end: string; value_numeric?: number; value_text?: string; currency?: string; payload_jsonb?: unknown; } export interface FetchMetricsOutput { surface: string; rows: SurfaceMetricRow[]; /** Adapter-determined next-poll delay; tier + surface specific. */ next_poll_after_ms: number; /** Adapter-side health note for the trust-panel; e.g. "Tryst returned 3 metrics" or "rate-limited, retry in 15m" */ health_note: string; } export const action: SurfaceAdapterAction = { surface: 'tryst', verb: 'fetch-metrics', capabilities: { reads: ['surface_native_analytics'], writes: [] }, async execute(ctx, input) { // 1. Verify tier (Tryst: Standard+; bail-out with empty result + clear health_note for Basic) // 2. Acquire browser context from container pool (per _engineering-surface-adapter-container.md) // 3. Navigate to native analytics page // 4. Scrape rolling 30-day data → SurfaceMetricRow[] // 5. Return; caller persists to surface_metrics }, }; ``` **Caller** (the scheduler in §7) wraps the result in a `Session` (per talent-scout-port §"Pipeline / session schema") with steps `tier_check → fetch → parse → persist → audit`. **Tier-gating rule**: `fetch-metrics` does NOT fail when the user's tier doesn't support metrics. It returns `rows: []` + a `health_note` like `"tryst: Basic tier — analytics require Standard+"`. The specialist surface ("Tryst metrics last refreshed N min ago" in dashboard T3 row) shows this verbatim. --- ## §7 — Scheduler (BullMQ job pattern) **Queue**: `surface-metrics-ingestion` (BullMQ on Redis per [INFRA.md §3](../../../../../INFRA.md)). **Job spec**: ```typescript interface IngestionJob { job_id: string; user_id: string; org_id?: string; surface: SurfaceKind; scheduled_at: string; } ``` **Producer** (lives in `@cocottetech/@platform/codebase/@features/platform-api/src/scheduler/`): - Cron-tick every 5 minutes. - For each (user, surface) where: - User has a connected credential for that surface, AND - User's tier on that surface supports metrics (consults `surface-{name}.brief.md §canonical-facts`-derived tier registry), AND - Last successful `surface_metrics` row for (user, surface) is older than `next_poll_after_ms` returned by the previous adapter call. - Enqueue an `IngestionJob`. **Consumer** (the bookings-{surface} adapter): - Acquires a container browser context. - Calls `fetch-metrics.ts` action. - Persists returned `rows` into `surface_metrics`. - Emits a `Session` audit row per [brief I](./I-audit-trust-replay.brief.md). - Schedules a `metric_aggregates` recompute if window overlaps active aggregate-window. **Rollup job** (separate queue `metric-aggregates-rollup`): - Cron-tick every 15 minutes. - Selects from `metric_aggregates WHERE stale_after < now()`. - For each stale row: recompute by aggregating `surface_metrics` over the same (user, metric_kind, window). Update `value_numeric + surface_breakdown + computed_at + stale_after`. --- ## §8 — Attribution model The attribution model maps a sequence of `prospect_touchpoints` for a (user, prospect) pair → per-surface credit weights. Models implemented in `@platform/codebase/@features/analytics/src/attribution/`: | Model ID | Logic | Use case | |---|---|---| | `first_touch` | 100% credit to the earliest touchpoint's surface | Brand-awareness reporting | | `last_touch` | 100% credit to the latest touchpoint's surface before conversion | Conversion attribution | | `time_decay` (**default**) | Exponential decay; half-life = 7 days. Most recent touchpoint = highest weight | Balanced view; Cocotte's default | | `position_based` | 40% first + 40% last + 20% distributed across middle | When middle-funnel matters | **Conversion event**: any touchpoint with kind in `{subscription_new, tip, booking_confirmed, booking_completed}`. The model runs per-conversion-event, looking backward over a configurable window (default 30 days) for prior touchpoints. **Read API**: ```sql -- conceptual; actual impl is in TypeScript over the touchpoints table SELECT surface, SUM(weight * conversion_value) AS attributed_revenue FROM attribution_compute( user_id := $1, model := 'time_decay', window := INTERVAL '90 days' ) GROUP BY surface; ``` UI exposes the model picker per analytics-dashboard §T-attribution-model (per brief T extension). --- ## §9 — Privacy + RLS enforcement Per [brief K](./K-safety-blocklist.brief.md) + [brief V](./V-data-portability-erasure.brief.md): 1. **Tenant isolation**: every table has RLS scoped to `user_id` and (when set) `org_id`. The `tenant_isolation` policy is uniform across all three tables. 2. **Anonymous-only-aggregate rule**: touchpoints with `prospect_id IS NULL` must never appear in per-prospect joins. Enforced at the application layer via: - `specialist-prospect-resolver.contract.md` Auto rule: "never expose anonymous-touchpoint rows in any prospect-scoped query". - Code-level lint check (planned): TS rule on any `prospect_touchpoints` query lacking `WHERE prospect_id IS NOT NULL` when joined with prospects. 3. **No org-shared attribution view**: cross-surface attribution is Quinn-only (and per-user-only in a future org). W-brief org-overlay handles the future case with explicit consent. 4. **Data-export coverage**: Brief V data-export covers `surface_metrics` + `prospect_touchpoints` + `metric_aggregates`. 5. **Identifier hashing**: when a touchpoint has identifier info but no resolved prospect_id, store `identifier_hash = sha256(lower(surface || ':' || raw_identifier) || user_salt)`. Salt is per-user, stored in `users.attribution_salt` (BYTEA, 32 bytes). Talent-scout's `BlocklistService` pattern is the reference. --- ## §10 — Migration / rollout order 1. **P1**: Apply `0004_surface_metrics.sql`. Build Tryst `fetch-metrics.ts`. Wire scheduler. Wire dashboard T3 row. 2. **P2** (after second surface ingestion live): Apply `0005_metric_aggregates.sql`. Implement rollup job. Migrate dashboard T1/T4 to read aggregates. 3. **P3** (with `prospect-resolver` specialist live): Apply `0006_prospect_touchpoints.sql`. Extend `bookings-*` adapters to emit touchpoints. Wire dashboard T2 funnel panel. 4. **P4**: Add model picker. Calibrate weights. Each phase is reversible: roll back the migration, disable the scheduler, the previous-phase UX continues working (it never read the new tables). --- ## §11 — Open questions | ID | Question | Owner | |---|---|---| | SM-Q1 | Should `surface_metrics` retain raw scrape payload (`payload_jsonb`) indefinitely, or only N days? | Quinn (storage cost vs. audit) | | SM-Q2 | Should anonymous touchpoint aggregation respect coop boundaries (per [N brief](./N-provider-coop.brief.md))? | Quinn | | SM-Q3 | Default time-decay half-life: 7 days proposed — confirm against Quinn's actual sales cycle | Quinn | | SM-Q4 | Is conversion-value for `subscription_new` the renewal-projected LTV or first-payment-amount? | Quinn (modeling preference) | | SM-Q5 | Cross-surface dedup: when two surfaces report the same person under different handles but matching photo-hash, do we collapse into one prospect at touchpoint-write-time or query-time? | `prospect-resolver` contract | --- ## Related - [_engineering-talent-scout-port.md §0.2](./_engineering-talent-scout-port.md) — design rationale; this doc operationalizes it. - [T-analytics-dashboard.brief.md](./T-analytics-dashboard.brief.md) — UX consumer. - [specialist-prospect-resolver.contract.md](./specialist-prospect-resolver.contract.md) — touchpoint linking. - [specialist-strategist.contract.md](./specialist-strategist.contract.md) — aggregate reader. - [specialist-bookings-tryst.contract.md](./specialist-bookings-tryst.contract.md) — first ingestion adapter consumer. - [surface-tryst.brief.md §canonical-facts](./surface-tryst.brief.md) — tier-gating fact source. - [I-audit-trust-replay.brief.md](./I-audit-trust-replay.brief.md) — every adapter ingestion writes a Session row. - [K-safety-blocklist.brief.md](./K-safety-blocklist.brief.md) — privacy invariants. - [V-data-portability-erasure.brief.md](./V-data-portability-erasure.brief.md) — data-export coverage.