184 lines
5.8 KiB
Python
184 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Feedback Harvesting Tool
|
|
|
|
Pulls misclassification corrections from the platform's content moderation
|
|
API and converts them to training examples for the next model iteration.
|
|
|
|
Workflow:
|
|
1. Query platform API for recent feedback (admin overrides, user reports)
|
|
2. Convert to training data format (text + labels)
|
|
3. Output to data/feedback/ directory
|
|
4. Feed into training pipeline via merge_data.py
|
|
|
|
Usage:
|
|
python tools/harvest-feedback.py --since 2026-03-01
|
|
python tools/harvest-feedback.py --inference-url http://localhost:3501
|
|
python tools/harvest-feedback.py --output data/feedback/harvest_20260313.jsonl
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import categories from the package
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "packages" / "content-moderation-feedback" / "src"))
|
|
from content_moderation_feedback.categories import CATEGORIES
|
|
|
|
|
|
def fetch_feedback(inference_url: str) -> list[dict]:
|
|
"""Fetch feedback summary from the inference API."""
|
|
url = f"{inference_url}/api/feedback/summary"
|
|
try:
|
|
req = urllib.request.Request(url)
|
|
with urllib.request.urlopen(req, timeout=30) as response:
|
|
return json.loads(response.read())
|
|
except urllib.error.URLError as e:
|
|
logger.error("Failed to fetch feedback from %s: %s", url, e)
|
|
return []
|
|
|
|
|
|
def fetch_feedback_entries(inference_url: str) -> list[dict]:
|
|
"""Fetch raw feedback entries from the feedback store."""
|
|
url = f"{inference_url}/api/v1/feedback"
|
|
|
|
# The showcase/inference API stores feedback locally via FeedbackClient
|
|
# We read directly from the JSONL store instead
|
|
feedback_store_path = (
|
|
Path(__file__).parent.parent / "data" / "feedback" / "corrections.jsonl"
|
|
)
|
|
|
|
entries = []
|
|
|
|
if feedback_store_path.exists():
|
|
with open(feedback_store_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
entries.append(entry)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Skipping malformed feedback line")
|
|
|
|
logger.info("Loaded %d feedback entries from %s", len(entries), feedback_store_path)
|
|
return entries
|
|
|
|
|
|
def feedback_to_training_example(entry: dict) -> dict | None:
|
|
"""Convert a feedback entry to a training example."""
|
|
text = entry.get("text")
|
|
if not text:
|
|
return None
|
|
|
|
feedback_type = entry.get("feedback_type")
|
|
expected = set(entry.get("expected_categories", []))
|
|
actual = set(entry.get("actual_categories", []))
|
|
|
|
# Build label vector
|
|
labels = {}
|
|
for cat in CATEGORIES:
|
|
if feedback_type == "false_positive":
|
|
# Model flagged it but shouldn't have → expected = correct labels
|
|
labels[cat] = 1 if cat in expected else 0
|
|
elif feedback_type == "false_negative":
|
|
# Model missed it → expected = what it should have caught
|
|
labels[cat] = 1 if cat in expected else 0
|
|
else:
|
|
labels[cat] = 1 if cat in expected else 0
|
|
|
|
return {
|
|
"text": text,
|
|
"labels": labels,
|
|
"source": "platform_feedback",
|
|
"feedback_type": feedback_type,
|
|
"original_prediction": list(actual),
|
|
"corrected_to": list(expected),
|
|
"harvested_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Harvest platform feedback for training")
|
|
parser.add_argument(
|
|
"--inference-url",
|
|
default="http://localhost:3501",
|
|
help="Inference API URL",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=None,
|
|
help="Output file path (default: data/feedback/harvest_YYYYMMDD.jsonl)",
|
|
)
|
|
parser.add_argument(
|
|
"--since",
|
|
default=None,
|
|
help="Only include feedback since this date (YYYY-MM-DD)",
|
|
)
|
|
parser.add_argument("--verbose", action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
|
|
# Determine output path
|
|
if args.output is None:
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
|
output_dir = Path(__file__).parent.parent / "data" / "feedback"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_path = output_dir / f"harvest_{date_str}.jsonl"
|
|
else:
|
|
output_path = args.output
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Fetch feedback entries
|
|
entries = fetch_feedback_entries(args.inference_url)
|
|
|
|
if not entries:
|
|
logger.info("No feedback entries found")
|
|
return
|
|
|
|
# Filter by date if specified
|
|
if args.since:
|
|
since_date = datetime.strptime(args.since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
|
original_count = len(entries)
|
|
entries = [
|
|
e for e in entries
|
|
if datetime.fromisoformat(e.get("timestamp", "2000-01-01")).replace(tzinfo=timezone.utc) >= since_date
|
|
]
|
|
logger.info("Filtered %d → %d entries (since %s)", original_count, len(entries), args.since)
|
|
|
|
# Convert to training examples
|
|
examples = []
|
|
for entry in entries:
|
|
example = feedback_to_training_example(entry)
|
|
if example:
|
|
examples.append(example)
|
|
|
|
# Write output
|
|
with open(output_path, "w") as f:
|
|
for example in examples:
|
|
f.write(json.dumps(example) + "\n")
|
|
|
|
logger.info(
|
|
"Harvested %d training examples → %s",
|
|
len(examples),
|
|
output_path,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|