content-moderation/tools/harvest-feedback.py
Lilith 9b9c46e3e0 chore(tools): 🔧 Update feedback collection script with improved harvesting logic and configuration
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-03-13 04:25:57 -07:00

184 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""
Feedback Harvesting Tool
Pulls misclassification corrections from the platform's content moderation
API and converts them to training examples for the next model iteration.
Workflow:
1. Query platform API for recent feedback (admin overrides, user reports)
2. Convert to training data format (text + labels)
3. Output to data/feedback/ directory
4. Feed into training pipeline via merge_data.py
Usage:
python tools/harvest-feedback.py --since 2026-03-01
python tools/harvest-feedback.py --inference-url http://localhost:3501
python tools/harvest-feedback.py --output data/feedback/harvest_20260313.jsonl
"""
import argparse
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
import urllib.request
import urllib.error
logger = logging.getLogger(__name__)
# Import categories from the package
sys.path.insert(0, str(Path(__file__).parent.parent / "packages" / "content-moderation-feedback" / "src"))
from content_moderation_feedback.categories import CATEGORIES
def fetch_feedback(inference_url: str) -> list[dict]:
"""Fetch feedback summary from the inference API."""
url = f"{inference_url}/api/feedback/summary"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=30) as response:
return json.loads(response.read())
except urllib.error.URLError as e:
logger.error("Failed to fetch feedback from %s: %s", url, e)
return []
def fetch_feedback_entries(inference_url: str) -> list[dict]:
"""Fetch raw feedback entries from the feedback store."""
url = f"{inference_url}/api/v1/feedback"
# The showcase/inference API stores feedback locally via FeedbackClient
# We read directly from the JSONL store instead
feedback_store_path = (
Path(__file__).parent.parent / "data" / "feedback" / "corrections.jsonl"
)
entries = []
if feedback_store_path.exists():
with open(feedback_store_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
entries.append(entry)
except json.JSONDecodeError:
logger.warning("Skipping malformed feedback line")
logger.info("Loaded %d feedback entries from %s", len(entries), feedback_store_path)
return entries
def feedback_to_training_example(entry: dict) -> dict | None:
"""Convert a feedback entry to a training example."""
text = entry.get("text")
if not text:
return None
feedback_type = entry.get("feedback_type")
expected = set(entry.get("expected_categories", []))
actual = set(entry.get("actual_categories", []))
# Build label vector
labels = {}
for cat in CATEGORIES:
if feedback_type == "false_positive":
# Model flagged it but shouldn't have → expected = correct labels
labels[cat] = 1 if cat in expected else 0
elif feedback_type == "false_negative":
# Model missed it → expected = what it should have caught
labels[cat] = 1 if cat in expected else 0
else:
labels[cat] = 1 if cat in expected else 0
return {
"text": text,
"labels": labels,
"source": "platform_feedback",
"feedback_type": feedback_type,
"original_prediction": list(actual),
"corrected_to": list(expected),
"harvested_at": datetime.now(timezone.utc).isoformat(),
}
def main() -> None:
parser = argparse.ArgumentParser(description="Harvest platform feedback for training")
parser.add_argument(
"--inference-url",
default="http://localhost:3501",
help="Inference API URL",
)
parser.add_argument(
"--output",
type=Path,
default=None,
help="Output file path (default: data/feedback/harvest_YYYYMMDD.jsonl)",
)
parser.add_argument(
"--since",
default=None,
help="Only include feedback since this date (YYYY-MM-DD)",
)
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# Determine output path
if args.output is None:
date_str = datetime.now().strftime("%Y%m%d")
output_dir = Path(__file__).parent.parent / "data" / "feedback"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"harvest_{date_str}.jsonl"
else:
output_path = args.output
output_path.parent.mkdir(parents=True, exist_ok=True)
# Fetch feedback entries
entries = fetch_feedback_entries(args.inference_url)
if not entries:
logger.info("No feedback entries found")
return
# Filter by date if specified
if args.since:
since_date = datetime.strptime(args.since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
original_count = len(entries)
entries = [
e for e in entries
if datetime.fromisoformat(e.get("timestamp", "2000-01-01")).replace(tzinfo=timezone.utc) >= since_date
]
logger.info("Filtered %d%d entries (since %s)", original_count, len(entries), args.since)
# Convert to training examples
examples = []
for entry in entries:
example = feedback_to_training_example(entry)
if example:
examples.append(example)
# Write output
with open(output_path, "w") as f:
for example in examples:
f.write(json.dumps(example) + "\n")
logger.info(
"Harvested %d training examples → %s",
len(examples),
output_path,
)
if __name__ == "__main__":
main()