18 KiB
_engineering-surface-metrics — schema, ingestion, rollup, attribution
Genre: engineering annex (non-UX). Executable spec for the cross-surface analytics stack. Sibling to _engineering-talent-scout-port.md §0.2 (which owns the design rationale); this doc owns the executable schema + adapter contracts + scheduler. Per SSOT.1 — one canonical home per concern.
Status: design-frozen 2026-05-18 (Quinn-directive). Schema not yet migrated; adapter not yet implemented. Implementation phased per talent-scout-port §0.2 (P1 → P4).
§1 — Concerns owned by this doc
| Concern | Owner |
|---|---|
surface_metrics SQL migration |
this doc §3 |
prospect_touchpoints SQL migration |
this doc §4 |
metric_aggregates SQL migration + materialization scheduler |
this doc §5 |
| Per-surface ingestion adapter contract | this doc §6 |
| Rollup job scheduler (BullMQ) | this doc §7 |
| Attribution model implementation (first/last/time-decay/position) | this doc §8 |
| Privacy + RLS enforcement | this doc §9 |
Not owned here (referenced only):
- Design rationale + Quinn's directive context → _engineering-talent-scout-port.md §0.2
- Aggregation UX + funnel viz → T-analytics-dashboard.brief.md §T-aggregate / §T-attribution / §T-attribution-model
- Per-surface tier-gating facts → surface-tryst.brief.md §canonical-facts (Tryst-specific) and sibling per-surface briefs
- Cross-surface identity dedup logic → specialist-prospect-resolver.contract.md
§2 — Phasing
Per talent-scout-port §0.2:
| Phase | Triggers | Scope |
|---|---|---|
| P1 | Tryst MVP live | surface_metrics + Tryst ingestion + dashboard T3 row |
| P2 | Second surface (OF or X) online | metric_aggregates + dashboard T1/T4 become aggregate-canonical |
| P3 | prospect-resolver specialist online |
prospect_touchpoints + per-prospect attribution view + dashboard T2 funnel |
| P4 | Tuning | Attribution model picker; per-surface weight calibration |
This doc spec-completes P1+P2+P3 schema today (one migration each); P4 is a runtime-config concern, no schema change.
§3 — surface_metrics migration (P1)
File: @platform/infrastructure/sql/migrations/0004_surface_metrics.sql
-- Per-surface raw metrics snapshots.
-- Insert-only on each ingestion (no UPDATE); deduplicate on read via DISTINCT ON (window).
CREATE TYPE surface_metric_kind AS ENUM (
-- Discovery-funnel kinds
'profile_view',
'search_impression',
'search_rank',
'click_through',
-- Engagement-funnel kinds
'dm_inbound',
'dm_outbound',
'reply_rate',
-- Conversion-funnel kinds
'subscription_new',
'subscription_total',
'tip_amount',
'tip_count',
'booking_inquiry',
'booking_confirmed',
-- Revenue
'gross_revenue',
'net_revenue'
);
CREATE TABLE surface_metrics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
org_id UUID NULL REFERENCES orgs(id) ON DELETE CASCADE,
surface TEXT NOT NULL, -- surface_kind (tryst, onlyfans, x, ...)
metric_kind surface_metric_kind NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
window_end TIMESTAMPTZ NOT NULL,
value_numeric NUMERIC(18,4), -- counts, sums, rates
value_text TEXT, -- rank labels ("Boosted"), categorical
currency TEXT, -- ISO 4217 when value_numeric is monetary
source TEXT NOT NULL, -- 'native_api' | 'native_scrape' | 'derived' | 'manual'
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
payload_jsonb JSONB, -- raw surface-side response for audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CHECK (window_end > window_start),
CHECK (value_numeric IS NOT NULL OR value_text IS NOT NULL)
);
CREATE INDEX idx_surface_metrics_user_surface_kind
ON surface_metrics(user_id, surface, metric_kind, window_end DESC);
CREATE INDEX idx_surface_metrics_org_surface_kind
ON surface_metrics(org_id, surface, metric_kind, window_end DESC)
WHERE org_id IS NOT NULL;
-- Dedup: same (user, surface, metric, window) — keep newest fetched_at on read
CREATE INDEX idx_surface_metrics_dedup
ON surface_metrics(user_id, surface, metric_kind, window_start, window_end, fetched_at DESC);
ALTER TABLE surface_metrics ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON surface_metrics
USING (user_id = current_setting('app.user_id')::UUID
OR org_id = current_setting('app.org_id', true)::UUID);
Read pattern:
SELECT DISTINCT ON (surface, metric_kind, window_start, window_end)
*
FROM surface_metrics
WHERE user_id = $1 AND surface = $2 AND window_end >= $3
ORDER BY surface, metric_kind, window_start, window_end, fetched_at DESC;
§4 — prospect_touchpoints migration (P3)
File: @platform/infrastructure/sql/migrations/0006_prospect_touchpoints.sql
CREATE TYPE touchpoint_kind AS ENUM (
'profile_view',
'dm_inbound',
'dm_outbound',
'reply',
'reaction', -- like / heart / favorite
'tip',
'subscription_new',
'subscription_renew',
'subscription_cancel',
'booking_inquiry',
'booking_confirmed',
'booking_completed',
'block',
'unblock'
);
CREATE TABLE prospect_touchpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
org_id UUID NULL REFERENCES orgs(id) ON DELETE CASCADE,
prospect_id UUID NULL REFERENCES prospects(id) ON DELETE SET NULL, -- NULL = unresolved/anonymous
surface TEXT NOT NULL,
touchpoint touchpoint_kind NOT NULL,
external_id TEXT, -- surface-side event id, for idempotent ingest
identifier_hash BYTEA, -- SHA-256 of (handle | email | phone), Brief K-compliant
occurred_at TIMESTAMPTZ NOT NULL,
payload_jsonb JSONB, -- per-surface raw event
attributed_to_touchpoint_id UUID NULL REFERENCES prospect_touchpoints(id), -- attribution chain
ingestion_source TEXT NOT NULL, -- 'adapter_poll' | 'webhook' | 'manual'
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- One row per surface-side event; idempotency
UNIQUE (surface, external_id) DEFERRABLE INITIALLY DEFERRED
);
CREATE INDEX idx_touchpoints_prospect_time
ON prospect_touchpoints(prospect_id, occurred_at DESC)
WHERE prospect_id IS NOT NULL;
CREATE INDEX idx_touchpoints_user_surface_time
ON prospect_touchpoints(user_id, surface, occurred_at DESC);
CREATE INDEX idx_touchpoints_identifier_hash
ON prospect_touchpoints(user_id, identifier_hash)
WHERE identifier_hash IS NOT NULL;
CREATE INDEX idx_touchpoints_anonymous
ON prospect_touchpoints(user_id, surface, occurred_at DESC)
WHERE prospect_id IS NULL;
ALTER TABLE prospect_touchpoints ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON prospect_touchpoints
USING (user_id = current_setting('app.user_id')::UUID
OR org_id = current_setting('app.org_id', true)::UUID);
Invariants (enforced at adapter + specialist layer, not DB):
- Anonymous touchpoints (prospect_id NULL) MUST appear only in aggregate queries — never join into per-prospect view.
attributed_to_touchpoint_idis set byprospect-resolverafter identity-link resolves; initial insert leaves it NULL.identifier_hashis the only identifying fragment stored whenprospect_idis NULL — never plaintext handle.
§5 — metric_aggregates migration (P2)
File: @platform/infrastructure/sql/migrations/0005_metric_aggregates.sql
CREATE TABLE metric_aggregates (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
org_id UUID NULL REFERENCES orgs(id) ON DELETE CASCADE,
metric_kind surface_metric_kind NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
window_end TIMESTAMPTZ NOT NULL,
value_numeric NUMERIC(18,4) NOT NULL,
currency TEXT,
surfaces_included TEXT[] NOT NULL, -- which surfaces contributed
surface_breakdown JSONB NOT NULL, -- {tryst: 1234, onlyfans: 567, ...}
computed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
stale_after TIMESTAMPTZ NOT NULL, -- next-refresh-due; rollup job reads this
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (user_id, metric_kind, window_start, window_end)
);
CREATE INDEX idx_aggregates_user_kind_window
ON metric_aggregates(user_id, metric_kind, window_end DESC);
CREATE INDEX idx_aggregates_stale
ON metric_aggregates(stale_after ASC)
WHERE stale_after < now() + INTERVAL '1 hour';
ALTER TABLE metric_aggregates ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON metric_aggregates
USING (user_id = current_setting('app.user_id')::UUID
OR org_id = current_setting('app.org_id', true)::UUID);
Materialization cadence:
- Active window (last 24h): refresh every 15 minutes.
- Recent (24h–7d): every 1 hour.
- Older than 7d: daily.
- Older than 90d: never refresh once computed (immutable historical).
Read-time: clients query metric_aggregates directly. If stale_after < now() and the matching surface_metrics rows have advanced, the rollup job will refresh; UI shows "computed N min ago" stamp.
§6 — Adapter ingestion contract
Each surface adapter that supports native metrics implements fetch-metrics.ts:
// @cocottetech/@platform/codebase/@features/bookings-{surface}/adapter/fetch-metrics.ts
import type { SurfaceAdapterAction } from '@cocottetech/surface-adapter-contracts';
export interface FetchMetricsInput {
user_id: string;
org_id?: string;
/** Tier-gating decision: caller has already verified the user's surface tier supports metrics. */
reason: 'scheduled' | 'manual_refresh';
}
export interface SurfaceMetricRow {
metric_kind: SurfaceMetricKind;
window_start: string; // ISO 8601
window_end: string;
value_numeric?: number;
value_text?: string;
currency?: string;
payload_jsonb?: unknown;
}
export interface FetchMetricsOutput {
surface: string;
rows: SurfaceMetricRow[];
/** Adapter-determined next-poll delay; tier + surface specific. */
next_poll_after_ms: number;
/** Adapter-side health note for the trust-panel; e.g. "Tryst returned 3 metrics" or "rate-limited, retry in 15m" */
health_note: string;
}
export const action: SurfaceAdapterAction<FetchMetricsInput, FetchMetricsOutput> = {
surface: 'tryst',
verb: 'fetch-metrics',
capabilities: { reads: ['surface_native_analytics'], writes: [] },
async execute(ctx, input) {
// 1. Verify tier (Tryst: Standard+; bail-out with empty result + clear health_note for Basic)
// 2. Acquire browser context from container pool (per _engineering-surface-adapter-container.md)
// 3. Navigate to native analytics page
// 4. Scrape rolling 30-day data → SurfaceMetricRow[]
// 5. Return; caller persists to surface_metrics
},
};
Caller (the scheduler in §7) wraps the result in a Session (per talent-scout-port §"Pipeline / session schema") with steps tier_check → fetch → parse → persist → audit.
Tier-gating rule: fetch-metrics does NOT fail when the user's tier doesn't support metrics. It returns rows: [] + a health_note like "tryst: Basic tier — analytics require Standard+". The specialist surface ("Tryst metrics last refreshed N min ago" in dashboard T3 row) shows this verbatim.
§7 — Scheduler (BullMQ job pattern)
Queue: surface-metrics-ingestion (BullMQ on Redis per INFRA.md §3).
Job spec:
interface IngestionJob {
job_id: string;
user_id: string;
org_id?: string;
surface: SurfaceKind;
scheduled_at: string;
}
Producer (lives in @cocottetech/@platform/codebase/@features/platform-api/src/scheduler/):
- Cron-tick every 5 minutes.
- For each (user, surface) where:
- User has a connected credential for that surface, AND
- User's tier on that surface supports metrics (consults
surface-{name}.brief.md §canonical-facts-derived tier registry), AND - Last successful
surface_metricsrow for (user, surface) is older thannext_poll_after_msreturned by the previous adapter call.
- Enqueue an
IngestionJob.
Consumer (the bookings-{surface} adapter):
- Acquires a container browser context.
- Calls
fetch-metrics.tsaction. - Persists returned
rowsintosurface_metrics. - Emits a
Sessionaudit row per brief I. - Schedules a
metric_aggregatesrecompute if window overlaps active aggregate-window.
Rollup job (separate queue metric-aggregates-rollup):
- Cron-tick every 15 minutes.
- Selects from
metric_aggregates WHERE stale_after < now(). - For each stale row: recompute by aggregating
surface_metricsover the same (user, metric_kind, window). Updatevalue_numeric + surface_breakdown + computed_at + stale_after.
§8 — Attribution model
The attribution model maps a sequence of prospect_touchpoints for a (user, prospect) pair → per-surface credit weights. Models implemented in @platform/codebase/@features/analytics/src/attribution/:
| Model ID | Logic | Use case |
|---|---|---|
first_touch |
100% credit to the earliest touchpoint's surface | Brand-awareness reporting |
last_touch |
100% credit to the latest touchpoint's surface before conversion | Conversion attribution |
time_decay (default) |
Exponential decay; half-life = 7 days. Most recent touchpoint = highest weight | Balanced view; Cocotte's default |
position_based |
40% first + 40% last + 20% distributed across middle | When middle-funnel matters |
Conversion event: any touchpoint with kind in {subscription_new, tip, booking_confirmed, booking_completed}. The model runs per-conversion-event, looking backward over a configurable window (default 30 days) for prior touchpoints.
Read API:
-- conceptual; actual impl is in TypeScript over the touchpoints table
SELECT
surface,
SUM(weight * conversion_value) AS attributed_revenue
FROM attribution_compute(
user_id := $1,
model := 'time_decay',
window := INTERVAL '90 days'
)
GROUP BY surface;
UI exposes the model picker per analytics-dashboard §T-attribution-model (per brief T extension).
§9 — Privacy + RLS enforcement
- Tenant isolation: every table has RLS scoped to
user_idand (when set)org_id. Thetenant_isolationpolicy is uniform across all three tables. - Anonymous-only-aggregate rule: touchpoints with
prospect_id IS NULLmust never appear in per-prospect joins. Enforced at the application layer via:specialist-prospect-resolver.contract.mdAuto rule: "never expose anonymous-touchpoint rows in any prospect-scoped query".- Code-level lint check (planned): TS rule on any
prospect_touchpointsquery lackingWHERE prospect_id IS NOT NULLwhen joined with prospects.
- No org-shared attribution view: cross-surface attribution is Quinn-only (and per-user-only in a future org). W-brief org-overlay handles the future case with explicit consent.
- Data-export coverage: Brief V data-export covers
surface_metrics+prospect_touchpoints+metric_aggregates. - Identifier hashing: when a touchpoint has identifier info but no resolved prospect_id, store
identifier_hash = sha256(lower(surface || ':' || raw_identifier) || user_salt). Salt is per-user, stored inusers.attribution_salt(BYTEA, 32 bytes). Talent-scout'sBlocklistServicepattern is the reference.
§10 — Migration / rollout order
- P1: Apply
0004_surface_metrics.sql. Build Trystfetch-metrics.ts. Wire scheduler. Wire dashboard T3 row. - P2 (after second surface ingestion live): Apply
0005_metric_aggregates.sql. Implement rollup job. Migrate dashboard T1/T4 to read aggregates. - P3 (with
prospect-resolverspecialist live): Apply0006_prospect_touchpoints.sql. Extendbookings-*adapters to emit touchpoints. Wire dashboard T2 funnel panel. - P4: Add model picker. Calibrate weights.
Each phase is reversible: roll back the migration, disable the scheduler, the previous-phase UX continues working (it never read the new tables).
§11 — Open questions
| ID | Question | Owner |
|---|---|---|
| SM-Q1 | Should surface_metrics retain raw scrape payload (payload_jsonb) indefinitely, or only N days? |
Quinn (storage cost vs. audit) |
| SM-Q2 | Should anonymous touchpoint aggregation respect coop boundaries (per N brief)? | Quinn |
| SM-Q3 | Default time-decay half-life: 7 days proposed — confirm against Quinn's actual sales cycle | Quinn |
| SM-Q4 | Is conversion-value for subscription_new the renewal-projected LTV or first-payment-amount? |
Quinn (modeling preference) |
| SM-Q5 | Cross-surface dedup: when two surfaces report the same person under different handles but matching photo-hash, do we collapse into one prospect at touchpoint-write-time or query-time? | prospect-resolver contract |
Related
- _engineering-talent-scout-port.md §0.2 — design rationale; this doc operationalizes it.
- T-analytics-dashboard.brief.md — UX consumer.
- specialist-prospect-resolver.contract.md — touchpoint linking.
- specialist-strategist.contract.md — aggregate reader.
- specialist-bookings-tryst.contract.md — first ingestion adapter consumer.
- surface-tryst.brief.md §canonical-facts — tier-gating fact source.
- I-audit-trust-replay.brief.md — every adapter ingestion writes a Session row.
- K-safety-blocklist.brief.md — privacy invariants.
- V-data-portability-erasure.brief.md — data-export coverage.