cocottetech/@platform/codebase/@features/ai-copilot/docs/_engineering-surface-metrics.md
2026-05-18 21:42:43 -07:00

18 KiB
Raw Blame History

_engineering-surface-metrics — schema, ingestion, rollup, attribution

Genre: engineering annex (non-UX). Executable spec for the cross-surface analytics stack. Sibling to _engineering-talent-scout-port.md §0.2 (which owns the design rationale); this doc owns the executable schema + adapter contracts + scheduler. Per SSOT.1 — one canonical home per concern.

Status: design-frozen 2026-05-18 (Quinn-directive). Schema not yet migrated; adapter not yet implemented. Implementation phased per talent-scout-port §0.2 (P1 → P4).


§1 — Concerns owned by this doc

Concern Owner
surface_metrics SQL migration this doc §3
prospect_touchpoints SQL migration this doc §4
metric_aggregates SQL migration + materialization scheduler this doc §5
Per-surface ingestion adapter contract this doc §6
Rollup job scheduler (BullMQ) this doc §7
Attribution model implementation (first/last/time-decay/position) this doc §8
Privacy + RLS enforcement this doc §9

Not owned here (referenced only):


§2 — Phasing

Per talent-scout-port §0.2:

Phase Triggers Scope
P1 Tryst MVP live surface_metrics + Tryst ingestion + dashboard T3 row
P2 Second surface (OF or X) online metric_aggregates + dashboard T1/T4 become aggregate-canonical
P3 prospect-resolver specialist online prospect_touchpoints + per-prospect attribution view + dashboard T2 funnel
P4 Tuning Attribution model picker; per-surface weight calibration

This doc spec-completes P1+P2+P3 schema today (one migration each); P4 is a runtime-config concern, no schema change.


§3 — surface_metrics migration (P1)

File: @platform/infrastructure/sql/migrations/0004_surface_metrics.sql

-- Per-surface raw metrics snapshots.
-- Insert-only on each ingestion (no UPDATE); deduplicate on read via DISTINCT ON (window).

CREATE TYPE surface_metric_kind AS ENUM (
  -- Discovery-funnel kinds
  'profile_view',
  'search_impression',
  'search_rank',
  'click_through',
  -- Engagement-funnel kinds
  'dm_inbound',
  'dm_outbound',
  'reply_rate',
  -- Conversion-funnel kinds
  'subscription_new',
  'subscription_total',
  'tip_amount',
  'tip_count',
  'booking_inquiry',
  'booking_confirmed',
  -- Revenue
  'gross_revenue',
  'net_revenue'
);

CREATE TABLE surface_metrics (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id         UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  org_id          UUID NULL     REFERENCES orgs(id)  ON DELETE CASCADE,
  surface         TEXT NOT NULL,               -- surface_kind (tryst, onlyfans, x, ...)
  metric_kind     surface_metric_kind NOT NULL,
  window_start    TIMESTAMPTZ NOT NULL,
  window_end      TIMESTAMPTZ NOT NULL,
  value_numeric   NUMERIC(18,4),               -- counts, sums, rates
  value_text      TEXT,                        -- rank labels ("Boosted"), categorical
  currency        TEXT,                        -- ISO 4217 when value_numeric is monetary
  source          TEXT NOT NULL,               -- 'native_api' | 'native_scrape' | 'derived' | 'manual'
  fetched_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  payload_jsonb   JSONB,                       -- raw surface-side response for audit
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),

  CHECK (window_end > window_start),
  CHECK (value_numeric IS NOT NULL OR value_text IS NOT NULL)
);

CREATE INDEX idx_surface_metrics_user_surface_kind
  ON surface_metrics(user_id, surface, metric_kind, window_end DESC);

CREATE INDEX idx_surface_metrics_org_surface_kind
  ON surface_metrics(org_id, surface, metric_kind, window_end DESC)
  WHERE org_id IS NOT NULL;

-- Dedup: same (user, surface, metric, window) — keep newest fetched_at on read
CREATE INDEX idx_surface_metrics_dedup
  ON surface_metrics(user_id, surface, metric_kind, window_start, window_end, fetched_at DESC);

ALTER TABLE surface_metrics ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON surface_metrics
  USING (user_id = current_setting('app.user_id')::UUID
         OR org_id = current_setting('app.org_id', true)::UUID);

Read pattern:

SELECT DISTINCT ON (surface, metric_kind, window_start, window_end)
       *
FROM surface_metrics
WHERE user_id = $1 AND surface = $2 AND window_end >= $3
ORDER BY surface, metric_kind, window_start, window_end, fetched_at DESC;

§4 — prospect_touchpoints migration (P3)

File: @platform/infrastructure/sql/migrations/0006_prospect_touchpoints.sql

CREATE TYPE touchpoint_kind AS ENUM (
  'profile_view',
  'dm_inbound',
  'dm_outbound',
  'reply',
  'reaction',          -- like / heart / favorite
  'tip',
  'subscription_new',
  'subscription_renew',
  'subscription_cancel',
  'booking_inquiry',
  'booking_confirmed',
  'booking_completed',
  'block',
  'unblock'
);

CREATE TABLE prospect_touchpoints (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id         UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  org_id          UUID NULL     REFERENCES orgs(id)  ON DELETE CASCADE,
  prospect_id     UUID NULL     REFERENCES prospects(id) ON DELETE SET NULL,  -- NULL = unresolved/anonymous
  surface         TEXT NOT NULL,
  touchpoint      touchpoint_kind NOT NULL,
  external_id     TEXT,                        -- surface-side event id, for idempotent ingest
  identifier_hash BYTEA,                       -- SHA-256 of (handle | email | phone), Brief K-compliant
  occurred_at     TIMESTAMPTZ NOT NULL,
  payload_jsonb   JSONB,                       -- per-surface raw event
  attributed_to_touchpoint_id UUID NULL REFERENCES prospect_touchpoints(id),  -- attribution chain
  ingestion_source TEXT NOT NULL,              -- 'adapter_poll' | 'webhook' | 'manual'
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),

  -- One row per surface-side event; idempotency
  UNIQUE (surface, external_id) DEFERRABLE INITIALLY DEFERRED
);

CREATE INDEX idx_touchpoints_prospect_time
  ON prospect_touchpoints(prospect_id, occurred_at DESC)
  WHERE prospect_id IS NOT NULL;

CREATE INDEX idx_touchpoints_user_surface_time
  ON prospect_touchpoints(user_id, surface, occurred_at DESC);

CREATE INDEX idx_touchpoints_identifier_hash
  ON prospect_touchpoints(user_id, identifier_hash)
  WHERE identifier_hash IS NOT NULL;

CREATE INDEX idx_touchpoints_anonymous
  ON prospect_touchpoints(user_id, surface, occurred_at DESC)
  WHERE prospect_id IS NULL;

ALTER TABLE prospect_touchpoints ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON prospect_touchpoints
  USING (user_id = current_setting('app.user_id')::UUID
         OR org_id = current_setting('app.org_id', true)::UUID);

Invariants (enforced at adapter + specialist layer, not DB):

  • Anonymous touchpoints (prospect_id NULL) MUST appear only in aggregate queries — never join into per-prospect view.
  • attributed_to_touchpoint_id is set by prospect-resolver after identity-link resolves; initial insert leaves it NULL.
  • identifier_hash is the only identifying fragment stored when prospect_id is NULL — never plaintext handle.

§5 — metric_aggregates migration (P2)

File: @platform/infrastructure/sql/migrations/0005_metric_aggregates.sql

CREATE TABLE metric_aggregates (
  id                UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id           UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  org_id            UUID NULL     REFERENCES orgs(id)  ON DELETE CASCADE,
  metric_kind       surface_metric_kind NOT NULL,
  window_start      TIMESTAMPTZ NOT NULL,
  window_end        TIMESTAMPTZ NOT NULL,
  value_numeric     NUMERIC(18,4) NOT NULL,
  currency          TEXT,
  surfaces_included TEXT[] NOT NULL,            -- which surfaces contributed
  surface_breakdown JSONB NOT NULL,             -- {tryst: 1234, onlyfans: 567, ...}
  computed_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
  stale_after       TIMESTAMPTZ NOT NULL,       -- next-refresh-due; rollup job reads this
  created_at        TIMESTAMPTZ NOT NULL DEFAULT now(),

  UNIQUE (user_id, metric_kind, window_start, window_end)
);

CREATE INDEX idx_aggregates_user_kind_window
  ON metric_aggregates(user_id, metric_kind, window_end DESC);

CREATE INDEX idx_aggregates_stale
  ON metric_aggregates(stale_after ASC)
  WHERE stale_after < now() + INTERVAL '1 hour';

ALTER TABLE metric_aggregates ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON metric_aggregates
  USING (user_id = current_setting('app.user_id')::UUID
         OR org_id = current_setting('app.org_id', true)::UUID);

Materialization cadence:

  • Active window (last 24h): refresh every 15 minutes.
  • Recent (24h7d): every 1 hour.
  • Older than 7d: daily.
  • Older than 90d: never refresh once computed (immutable historical).

Read-time: clients query metric_aggregates directly. If stale_after < now() and the matching surface_metrics rows have advanced, the rollup job will refresh; UI shows "computed N min ago" stamp.


§6 — Adapter ingestion contract

Each surface adapter that supports native metrics implements fetch-metrics.ts:

// @cocottetech/@platform/codebase/@features/bookings-{surface}/adapter/fetch-metrics.ts

import type { SurfaceAdapterAction } from '@cocottetech/surface-adapter-contracts';

export interface FetchMetricsInput {
  user_id: string;
  org_id?: string;
  /** Tier-gating decision: caller has already verified the user's surface tier supports metrics. */
  reason: 'scheduled' | 'manual_refresh';
}

export interface SurfaceMetricRow {
  metric_kind: SurfaceMetricKind;
  window_start: string;        // ISO 8601
  window_end: string;
  value_numeric?: number;
  value_text?: string;
  currency?: string;
  payload_jsonb?: unknown;
}

export interface FetchMetricsOutput {
  surface: string;
  rows: SurfaceMetricRow[];
  /** Adapter-determined next-poll delay; tier + surface specific. */
  next_poll_after_ms: number;
  /** Adapter-side health note for the trust-panel; e.g. "Tryst returned 3 metrics" or "rate-limited, retry in 15m" */
  health_note: string;
}

export const action: SurfaceAdapterAction<FetchMetricsInput, FetchMetricsOutput> = {
  surface: 'tryst',
  verb: 'fetch-metrics',
  capabilities: { reads: ['surface_native_analytics'], writes: [] },
  async execute(ctx, input) {
    // 1. Verify tier (Tryst: Standard+; bail-out with empty result + clear health_note for Basic)
    // 2. Acquire browser context from container pool (per _engineering-surface-adapter-container.md)
    // 3. Navigate to native analytics page
    // 4. Scrape rolling 30-day data → SurfaceMetricRow[]
    // 5. Return; caller persists to surface_metrics
  },
};

Caller (the scheduler in §7) wraps the result in a Session (per talent-scout-port §"Pipeline / session schema") with steps tier_check → fetch → parse → persist → audit.

Tier-gating rule: fetch-metrics does NOT fail when the user's tier doesn't support metrics. It returns rows: [] + a health_note like "tryst: Basic tier — analytics require Standard+". The specialist surface ("Tryst metrics last refreshed N min ago" in dashboard T3 row) shows this verbatim.


§7 — Scheduler (BullMQ job pattern)

Queue: surface-metrics-ingestion (BullMQ on Redis per INFRA.md §3).

Job spec:

interface IngestionJob {
  job_id: string;
  user_id: string;
  org_id?: string;
  surface: SurfaceKind;
  scheduled_at: string;
}

Producer (lives in @cocottetech/@platform/codebase/@features/platform-api/src/scheduler/):

  • Cron-tick every 5 minutes.
  • For each (user, surface) where:
    • User has a connected credential for that surface, AND
    • User's tier on that surface supports metrics (consults surface-{name}.brief.md §canonical-facts-derived tier registry), AND
    • Last successful surface_metrics row for (user, surface) is older than next_poll_after_ms returned by the previous adapter call.
  • Enqueue an IngestionJob.

Consumer (the bookings-{surface} adapter):

  • Acquires a container browser context.
  • Calls fetch-metrics.ts action.
  • Persists returned rows into surface_metrics.
  • Emits a Session audit row per brief I.
  • Schedules a metric_aggregates recompute if window overlaps active aggregate-window.

Rollup job (separate queue metric-aggregates-rollup):

  • Cron-tick every 15 minutes.
  • Selects from metric_aggregates WHERE stale_after < now().
  • For each stale row: recompute by aggregating surface_metrics over the same (user, metric_kind, window). Update value_numeric + surface_breakdown + computed_at + stale_after.

§8 — Attribution model

The attribution model maps a sequence of prospect_touchpoints for a (user, prospect) pair → per-surface credit weights. Models implemented in @platform/codebase/@features/analytics/src/attribution/:

Model ID Logic Use case
first_touch 100% credit to the earliest touchpoint's surface Brand-awareness reporting
last_touch 100% credit to the latest touchpoint's surface before conversion Conversion attribution
time_decay (default) Exponential decay; half-life = 7 days. Most recent touchpoint = highest weight Balanced view; Cocotte's default
position_based 40% first + 40% last + 20% distributed across middle When middle-funnel matters

Conversion event: any touchpoint with kind in {subscription_new, tip, booking_confirmed, booking_completed}. The model runs per-conversion-event, looking backward over a configurable window (default 30 days) for prior touchpoints.

Read API:

-- conceptual; actual impl is in TypeScript over the touchpoints table
SELECT
  surface,
  SUM(weight * conversion_value) AS attributed_revenue
FROM attribution_compute(
  user_id := $1,
  model := 'time_decay',
  window := INTERVAL '90 days'
)
GROUP BY surface;

UI exposes the model picker per analytics-dashboard §T-attribution-model (per brief T extension).


§9 — Privacy + RLS enforcement

Per brief K + brief V:

  1. Tenant isolation: every table has RLS scoped to user_id and (when set) org_id. The tenant_isolation policy is uniform across all three tables.
  2. Anonymous-only-aggregate rule: touchpoints with prospect_id IS NULL must never appear in per-prospect joins. Enforced at the application layer via:
    • specialist-prospect-resolver.contract.md Auto rule: "never expose anonymous-touchpoint rows in any prospect-scoped query".
    • Code-level lint check (planned): TS rule on any prospect_touchpoints query lacking WHERE prospect_id IS NOT NULL when joined with prospects.
  3. No org-shared attribution view: cross-surface attribution is Quinn-only (and per-user-only in a future org). W-brief org-overlay handles the future case with explicit consent.
  4. Data-export coverage: Brief V data-export covers surface_metrics + prospect_touchpoints + metric_aggregates.
  5. Identifier hashing: when a touchpoint has identifier info but no resolved prospect_id, store identifier_hash = sha256(lower(surface || ':' || raw_identifier) || user_salt). Salt is per-user, stored in users.attribution_salt (BYTEA, 32 bytes). Talent-scout's BlocklistService pattern is the reference.

§10 — Migration / rollout order

  1. P1: Apply 0004_surface_metrics.sql. Build Tryst fetch-metrics.ts. Wire scheduler. Wire dashboard T3 row.
  2. P2 (after second surface ingestion live): Apply 0005_metric_aggregates.sql. Implement rollup job. Migrate dashboard T1/T4 to read aggregates.
  3. P3 (with prospect-resolver specialist live): Apply 0006_prospect_touchpoints.sql. Extend bookings-* adapters to emit touchpoints. Wire dashboard T2 funnel panel.
  4. P4: Add model picker. Calibrate weights.

Each phase is reversible: roll back the migration, disable the scheduler, the previous-phase UX continues working (it never read the new tables).


§11 — Open questions

ID Question Owner
SM-Q1 Should surface_metrics retain raw scrape payload (payload_jsonb) indefinitely, or only N days? Quinn (storage cost vs. audit)
SM-Q2 Should anonymous touchpoint aggregation respect coop boundaries (per N brief)? Quinn
SM-Q3 Default time-decay half-life: 7 days proposed — confirm against Quinn's actual sales cycle Quinn
SM-Q4 Is conversion-value for subscription_new the renewal-projected LTV or first-payment-amount? Quinn (modeling preference)
SM-Q5 Cross-surface dedup: when two surfaces report the same person under different handles but matching photo-hash, do we collapse into one prospect at touchpoint-write-time or query-time? prospect-resolver contract