172 lines
5.5 KiB
Python
172 lines
5.5 KiB
Python
"""Policy engine: maps category scores + thresholds to severity and action.
|
|
|
|
Groups:
|
|
illegal — always hard_block regardless of severity
|
|
harmful — hard_block at critical, soft_block at high
|
|
legal_kink — allow on adult platform, age_gate on general context
|
|
informational — warn at high, allow otherwise
|
|
modifier — warn if co-occurring with harmful categories
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Literal
|
|
|
|
Action = Literal["allow", "warn", "soft_block", "hard_block", "age_gate", "payment_route"]
|
|
Severity = Literal["critical", "high", "medium", "low", "none"]
|
|
|
|
# ── Category groups ───────────────────────────────────────────────────────────
|
|
|
|
ILLEGAL: frozenset[str] = frozenset({
|
|
"csam",
|
|
"trafficking",
|
|
"bestiality",
|
|
"necrophilia",
|
|
"snuff",
|
|
})
|
|
|
|
HARMFUL: frozenset[str] = frozenset({
|
|
"threats",
|
|
"hate_speech",
|
|
"doxxing",
|
|
"predatory_behavior",
|
|
"sextortion",
|
|
"ncii",
|
|
"self_harm",
|
|
"consent_violation",
|
|
"scam_patterns",
|
|
"impersonation",
|
|
"harassment",
|
|
"financial_coercion",
|
|
"extreme_gore",
|
|
})
|
|
|
|
LEGAL_KINK: frozenset[str] = frozenset({
|
|
"adult_content",
|
|
"bdsm",
|
|
"edge_play",
|
|
"furry",
|
|
"watersports",
|
|
"scat",
|
|
"age_play",
|
|
"roleplay",
|
|
})
|
|
|
|
INFORMATIONAL: frozenset[str] = frozenset({
|
|
"contact_info",
|
|
"solicitation",
|
|
"spam",
|
|
"profanity",
|
|
"law_enforcement",
|
|
})
|
|
|
|
MODIFIER: frozenset[str] = frozenset({
|
|
"intoxication",
|
|
})
|
|
|
|
# ── Severity ordering ─────────────────────────────────────────────────────────
|
|
|
|
_SEVERITY_RANK: dict[str, int] = {
|
|
"critical": 4,
|
|
"high": 3,
|
|
"medium": 2,
|
|
"low": 1,
|
|
"none": 0,
|
|
}
|
|
|
|
_SEVERITY_ORDER: list[Severity] = ["critical", "high", "medium", "low", "none"]
|
|
|
|
|
|
def _max_severity(severities: list[Severity]) -> Severity:
|
|
if not severities:
|
|
return "none"
|
|
return max(severities, key=lambda s: _SEVERITY_RANK[s])
|
|
|
|
|
|
# ── Public API ────────────────────────────────────────────────────────────────
|
|
|
|
def evaluate(
|
|
scores: dict[str, float],
|
|
thresholds: dict[str, float],
|
|
flagged_categories: list[str],
|
|
*,
|
|
is_adult_platform: bool = True,
|
|
) -> tuple[Severity, Action]:
|
|
"""Determine (severity, action) from scores and flagged categories.
|
|
|
|
Args:
|
|
scores: {category: probability} for all detected categories.
|
|
thresholds: {category: threshold} defaults.
|
|
flagged_categories: categories that exceeded their threshold.
|
|
is_adult_platform: when True, legal_kink content is allowed rather
|
|
than age-gated. Defaults True because this service runs on
|
|
the adult creator platform.
|
|
|
|
Returns:
|
|
A (severity, action) tuple.
|
|
"""
|
|
if not flagged_categories:
|
|
return "none", "allow"
|
|
|
|
flagged = set(flagged_categories)
|
|
|
|
# ── 1. Illegal — always hard_block, no further evaluation needed ──────────
|
|
if flagged & ILLEGAL:
|
|
return "critical", "hard_block"
|
|
|
|
# ── 2. Collect per-group severities for flagged categories ────────────────
|
|
from content_moderation_feedback.categories import CATEGORY_SEVERITY # local import avoids circular dep at module level
|
|
|
|
harmful_flagged = flagged & HARMFUL
|
|
legal_kink_flagged = flagged & LEGAL_KINK
|
|
informational_flagged = flagged & INFORMATIONAL
|
|
modifier_flagged = flagged & MODIFIER
|
|
|
|
# ── 3. Determine overall severity from all flagged categories ─────────────
|
|
all_severities: list[Severity] = [
|
|
CATEGORY_SEVERITY.get(cat, "low") # type: ignore[arg-type]
|
|
for cat in flagged
|
|
]
|
|
severity: Severity = _max_severity(all_severities)
|
|
|
|
# ── 4. Determine action ───────────────────────────────────────────────────
|
|
|
|
# Harmful content
|
|
if harmful_flagged:
|
|
if severity == "critical":
|
|
return severity, "hard_block"
|
|
if severity == "high":
|
|
return severity, "soft_block"
|
|
# medium/low harmful: warn
|
|
return severity, "warn"
|
|
|
|
# Legal kink on adult vs general platform
|
|
if legal_kink_flagged:
|
|
if is_adult_platform:
|
|
return severity, "allow"
|
|
return severity, "age_gate"
|
|
|
|
# Informational categories
|
|
if informational_flagged:
|
|
if _SEVERITY_RANK[severity] >= _SEVERITY_RANK["high"]:
|
|
return severity, "warn"
|
|
return severity, "allow"
|
|
|
|
# Modifier (intoxication) — warn when combined with harmful, already handled above
|
|
if modifier_flagged:
|
|
return severity, "warn"
|
|
|
|
# Fallback: something flagged but not in any group definition
|
|
return severity, "warn"
|
|
|
|
|
|
def build_context_prefix(content_type: str, override: str | None) -> str:
|
|
"""Return the context prefix string for a given content_type.
|
|
|
|
If override is provided, use it directly. Otherwise derive from
|
|
content_type using the mapping in config.
|
|
"""
|
|
if override is not None:
|
|
return override
|
|
from config import CONTENT_TYPE_PREFIXES
|
|
return CONTENT_TYPE_PREFIXES.get(content_type, "[ADULT][MESSAGE]")
|