content-moderation/services/inference-api/policy.py
2026-03-13 04:13:49 -07:00

172 lines
5.5 KiB
Python

"""Policy engine: maps category scores + thresholds to severity and action.
Groups:
illegal — always hard_block regardless of severity
harmful — hard_block at critical, soft_block at high
legal_kink — allow on adult platform, age_gate on general context
informational — warn at high, allow otherwise
modifier — warn if co-occurring with harmful categories
"""
from __future__ import annotations
from typing import Literal
Action = Literal["allow", "warn", "soft_block", "hard_block", "age_gate", "payment_route"]
Severity = Literal["critical", "high", "medium", "low", "none"]
# ── Category groups ───────────────────────────────────────────────────────────
ILLEGAL: frozenset[str] = frozenset({
"csam",
"trafficking",
"bestiality",
"necrophilia",
"snuff",
})
HARMFUL: frozenset[str] = frozenset({
"threats",
"hate_speech",
"doxxing",
"predatory_behavior",
"sextortion",
"ncii",
"self_harm",
"consent_violation",
"scam_patterns",
"impersonation",
"harassment",
"financial_coercion",
"extreme_gore",
})
LEGAL_KINK: frozenset[str] = frozenset({
"adult_content",
"bdsm",
"edge_play",
"furry",
"watersports",
"scat",
"age_play",
"roleplay",
})
INFORMATIONAL: frozenset[str] = frozenset({
"contact_info",
"solicitation",
"spam",
"profanity",
"law_enforcement",
})
MODIFIER: frozenset[str] = frozenset({
"intoxication",
})
# ── Severity ordering ─────────────────────────────────────────────────────────
_SEVERITY_RANK: dict[str, int] = {
"critical": 4,
"high": 3,
"medium": 2,
"low": 1,
"none": 0,
}
_SEVERITY_ORDER: list[Severity] = ["critical", "high", "medium", "low", "none"]
def _max_severity(severities: list[Severity]) -> Severity:
if not severities:
return "none"
return max(severities, key=lambda s: _SEVERITY_RANK[s])
# ── Public API ────────────────────────────────────────────────────────────────
def evaluate(
scores: dict[str, float],
thresholds: dict[str, float],
flagged_categories: list[str],
*,
is_adult_platform: bool = True,
) -> tuple[Severity, Action]:
"""Determine (severity, action) from scores and flagged categories.
Args:
scores: {category: probability} for all detected categories.
thresholds: {category: threshold} defaults.
flagged_categories: categories that exceeded their threshold.
is_adult_platform: when True, legal_kink content is allowed rather
than age-gated. Defaults True because this service runs on
the adult creator platform.
Returns:
A (severity, action) tuple.
"""
if not flagged_categories:
return "none", "allow"
flagged = set(flagged_categories)
# ── 1. Illegal — always hard_block, no further evaluation needed ──────────
if flagged & ILLEGAL:
return "critical", "hard_block"
# ── 2. Collect per-group severities for flagged categories ────────────────
from content_moderation_feedback.categories import CATEGORY_SEVERITY # local import avoids circular dep at module level
harmful_flagged = flagged & HARMFUL
legal_kink_flagged = flagged & LEGAL_KINK
informational_flagged = flagged & INFORMATIONAL
modifier_flagged = flagged & MODIFIER
# ── 3. Determine overall severity from all flagged categories ─────────────
all_severities: list[Severity] = [
CATEGORY_SEVERITY.get(cat, "low") # type: ignore[arg-type]
for cat in flagged
]
severity: Severity = _max_severity(all_severities)
# ── 4. Determine action ───────────────────────────────────────────────────
# Harmful content
if harmful_flagged:
if severity == "critical":
return severity, "hard_block"
if severity == "high":
return severity, "soft_block"
# medium/low harmful: warn
return severity, "warn"
# Legal kink on adult vs general platform
if legal_kink_flagged:
if is_adult_platform:
return severity, "allow"
return severity, "age_gate"
# Informational categories
if informational_flagged:
if _SEVERITY_RANK[severity] >= _SEVERITY_RANK["high"]:
return severity, "warn"
return severity, "allow"
# Modifier (intoxication) — warn when combined with harmful, already handled above
if modifier_flagged:
return severity, "warn"
# Fallback: something flagged but not in any group definition
return severity, "warn"
def build_context_prefix(content_type: str, override: str | None) -> str:
"""Return the context prefix string for a given content_type.
If override is provided, use it directly. Otherwise derive from
content_type using the mapping in config.
"""
if override is not None:
return override
from config import CONTENT_TYPE_PREFIXES
return CONTENT_TYPE_PREFIXES.get(content_type, "[ADULT][MESSAGE]")