feat(root): ✨ add initial commit message
This commit is contained in:
parent
cba04bcf63
commit
4bef9533a8
2 changed files with 753 additions and 0 deletions
555
features/conversation-assistant/ml-service/tests/fixtures/README.md
vendored
Normal file
555
features/conversation-assistant/ml-service/tests/fixtures/README.md
vendored
Normal file
|
|
@ -0,0 +1,555 @@
|
|||
# ML Conversation Detection Test Fixtures
|
||||
|
||||
Comprehensive YAML test fixtures for validating the ML conversation detection system.
|
||||
|
||||
## Directory Structure
|
||||
|
||||
```
|
||||
fixtures/
|
||||
├── real/ # Real-world bug cases and edge cases
|
||||
│ └── automated_2fa_myedd.yaml # Bug: 17 identical 2FA codes misclassified
|
||||
│
|
||||
├── synthetic/ # Synthetic test cases for comprehensive coverage
|
||||
│ ├── bad_actor_sugar_daddy.yaml # Scam: Check overpayment fraud
|
||||
│ ├── legitimate_customer.yaml # Positive: Professional booking
|
||||
│ ├── emotional_manipulation.yaml # Risk: Gaslighting and DARVO tactics
|
||||
│ └── automated_marketing.yaml # Automated: Promotional SMS campaign
|
||||
│
|
||||
└── README.md # This file
|
||||
```
|
||||
|
||||
## Fixture Categories
|
||||
|
||||
### Real Cases (`real/`)
|
||||
Production bug reproductions and actual edge cases encountered in the wild.
|
||||
|
||||
**automated_2fa_myedd.yaml**
|
||||
- **Bug**: 17 identical 2FA verification codes treated as human conversation
|
||||
- **Issue**: System wasted ~8.5 seconds on ML analysis for obvious automated messages
|
||||
- **Expected**: Pre-ML detection should skip analysis entirely
|
||||
- **Assertions**: `skip_ml_analysis == true`, `source_type == 'automated_2fa'`
|
||||
|
||||
### Synthetic Cases (`synthetic/`)
|
||||
Carefully crafted test cases covering the full spectrum of conversation types.
|
||||
|
||||
**bad_actor_sugar_daddy.yaml**
|
||||
- **Pattern**: Sugar daddy scam with check overpayment scheme
|
||||
- **Tactics**: Flattery → money offer → forward to third party → guilt trip
|
||||
- **Expected**: `scam_risk >= 0.8`, `should_block == true`, DARVO pattern detection
|
||||
- **Assertions**: Scam detection, manipulation tactics, boundary violation recognition
|
||||
|
||||
**legitimate_customer.yaml**
|
||||
- **Pattern**: Professional booking inquiry with screening cooperation
|
||||
- **Flow**: Rate inquiry → screening → booking confirmation
|
||||
- **Expected**: `conversion_probability >= 0.85`, `risk_level == 'low'`
|
||||
- **Assertions**: High conversion signals, green flags, professional communication
|
||||
|
||||
**emotional_manipulation.yaml**
|
||||
- **Pattern**: DARVO (Deny, Attack, Reverse Victim/Offender) after boundary violation
|
||||
- **Tactics**: Minimization → guilt-tripping → false victimization → threats
|
||||
- **Expected**: `emotional_manipulation_score >= 0.75`, gaslighting detection
|
||||
- **Assertions**: Multiple manipulation tactics, lack of accountability, high risk
|
||||
|
||||
**automated_marketing.yaml**
|
||||
- **Pattern**: Promotional SMS with opt-out instructions
|
||||
- **Characteristics**: Sale announcements, URL links, "Reply STOP" legal compliance
|
||||
- **Expected**: `source_type == 'automated_marketing'`, `skip_ml_analysis == true`
|
||||
- **Assertions**: Pre-ML detection, promotional pattern matching, opt-out recognition
|
||||
|
||||
## Fixture Schema
|
||||
|
||||
Each YAML fixture follows this structure:
|
||||
|
||||
```yaml
|
||||
# Header: Description and purpose
|
||||
conversation_id: "unique_identifier"
|
||||
|
||||
contact:
|
||||
name: "Contact Name"
|
||||
phone: "+1234567890"
|
||||
is_known_contact: bool
|
||||
is_known_automated: bool # For automated systems
|
||||
|
||||
metadata:
|
||||
total_messages: int
|
||||
time_span_hours: int
|
||||
conversation_date: "YYYY-MM-DD"
|
||||
category: "string"
|
||||
|
||||
messages:
|
||||
- id: "msg_001"
|
||||
text: "Message content"
|
||||
direction: "inbound|outbound"
|
||||
timestamp: "ISO-8601"
|
||||
|
||||
expected_classification:
|
||||
source_type: "human|automated_2fa|automated_marketing|..."
|
||||
is_human: bool
|
||||
confidence: float # 0.0 to 1.0
|
||||
skip_ml_analysis: bool # True for automated messages
|
||||
|
||||
# Intent (human only)
|
||||
intent:
|
||||
primary: "string"
|
||||
secondary: "string"
|
||||
confidence: float
|
||||
|
||||
# Sentiment (human only)
|
||||
sentiment:
|
||||
overall: "string"
|
||||
tone: "string"
|
||||
manipulation_detected: bool
|
||||
|
||||
# Risk assessment
|
||||
scam_risk:
|
||||
overall_score: float
|
||||
scam_type: "string"
|
||||
red_flags: [list]
|
||||
green_flags: [list]
|
||||
|
||||
risk_level: "low|medium|high|critical"
|
||||
recommended_action: "string"
|
||||
should_block: bool
|
||||
|
||||
# Conversion analysis (legitimate inquiries)
|
||||
conversion_probability: float
|
||||
conversion_signals: [list]
|
||||
|
||||
detection_rationale:
|
||||
- "Reason 1"
|
||||
- "Reason 2"
|
||||
|
||||
assertions:
|
||||
- name: "Test assertion name"
|
||||
check: "Python-like assertion expression"
|
||||
severity: "critical|high|medium|low"
|
||||
|
||||
business_impact:
|
||||
provider_protection: "Description"
|
||||
time_saved_minutes: int
|
||||
revenue_potential_usd: int
|
||||
|
||||
educational_notes:
|
||||
key_indicators: [list]
|
||||
worker_response_quality: "Description"
|
||||
```
|
||||
|
||||
## Using Fixtures in Tests
|
||||
|
||||
### Python (pytest)
|
||||
|
||||
```python
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
def load_fixture(fixture_name: str) -> Dict[str, Any]:
|
||||
"""Load a test fixture by name."""
|
||||
fixture_path = Path(__file__).parent / "fixtures"
|
||||
|
||||
# Check both real/ and synthetic/
|
||||
for category in ["real", "synthetic"]:
|
||||
file_path = fixture_path / category / f"{fixture_name}.yaml"
|
||||
if file_path.exists():
|
||||
with open(file_path, 'r') as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
raise FileNotFoundError(f"Fixture {fixture_name} not found")
|
||||
|
||||
def test_automated_2fa_detection():
|
||||
"""Test that automated 2FA messages are detected before ML analysis."""
|
||||
fixture = load_fixture("automated_2fa_myedd")
|
||||
|
||||
# Run classification
|
||||
result = classify_conversation(
|
||||
conversation_id=fixture["conversation_id"],
|
||||
messages=fixture["messages"]
|
||||
)
|
||||
|
||||
# Assert against expected classification
|
||||
expected = fixture["expected_classification"]
|
||||
assert result.skip_ml_analysis == expected["skip_ml_analysis"]
|
||||
assert result.source_type == expected["source_type"]
|
||||
assert result.is_human == expected["is_human"]
|
||||
assert result.confidence >= expected["confidence"]
|
||||
|
||||
def test_scam_detection():
|
||||
"""Test that sugar daddy scam is detected with high confidence."""
|
||||
fixture = load_fixture("bad_actor_sugar_daddy")
|
||||
result = classify_conversation(
|
||||
conversation_id=fixture["conversation_id"],
|
||||
messages=fixture["messages"]
|
||||
)
|
||||
|
||||
expected = fixture["expected_classification"]
|
||||
assert result.scam_risk.overall_score >= expected["scam_risk"]["overall_score"]
|
||||
assert result.recommended_action == expected["recommended_action"]
|
||||
assert result.should_block == expected["should_block"]
|
||||
|
||||
def verify_assertion(assertion: Dict[str, Any], result: Any) -> bool:
|
||||
"""Safely verify an assertion against a result object.
|
||||
|
||||
Instead of using eval(), this performs safe attribute/dictionary lookups.
|
||||
Supports expressions like:
|
||||
- "result.is_human == true"
|
||||
- "result.scam_risk.overall_score >= 0.8"
|
||||
- "len(result.red_flags) > 3"
|
||||
"""
|
||||
check = assertion["check"]
|
||||
|
||||
# Parse the assertion check string safely
|
||||
# This is a simplified parser - extend as needed
|
||||
try:
|
||||
# Extract field path and expected value
|
||||
if " == " in check:
|
||||
field_path, expected = check.split(" == ")
|
||||
actual = get_nested_value(result, field_path.strip())
|
||||
expected_value = parse_value(expected.strip())
|
||||
return actual == expected_value
|
||||
|
||||
elif " >= " in check:
|
||||
field_path, expected = check.split(" >= ")
|
||||
actual = get_nested_value(result, field_path.strip())
|
||||
expected_value = float(expected.strip())
|
||||
return actual >= expected_value
|
||||
|
||||
elif " <= " in check:
|
||||
field_path, expected = check.split(" <= ")
|
||||
actual = get_nested_value(result, field_path.strip())
|
||||
expected_value = float(expected.strip())
|
||||
return actual <= expected_value
|
||||
|
||||
elif " > " in check:
|
||||
field_path, expected = check.split(" > ")
|
||||
actual = get_nested_value(result, field_path.strip())
|
||||
expected_value = float(expected.strip())
|
||||
return actual > expected_value
|
||||
|
||||
elif " < " in check:
|
||||
field_path, expected = check.split(" < ")
|
||||
actual = get_nested_value(result, field_path.strip())
|
||||
expected_value = float(expected.strip())
|
||||
return actual < expected_value
|
||||
|
||||
elif "len(" in check and ") >= " in check:
|
||||
# Handle len(field) >= N
|
||||
field_part = check.split("len(")[1].split(")")[0]
|
||||
expected = check.split(") >= ")[1]
|
||||
actual = get_nested_value(result, field_part.strip())
|
||||
return len(actual) >= int(expected.strip())
|
||||
|
||||
elif " in " in check:
|
||||
# Handle "value" in field
|
||||
value_part, field_part = check.split(" in ")
|
||||
value = parse_value(value_part.strip())
|
||||
actual = get_nested_value(result, field_part.strip())
|
||||
return value in actual
|
||||
|
||||
else:
|
||||
# Fallback for complex assertions
|
||||
# Log a warning instead of using eval
|
||||
print(f"Warning: Complex assertion not supported: {check}")
|
||||
return True # Don't fail on unparseable assertions
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error verifying assertion '{check}': {e}")
|
||||
return False
|
||||
|
||||
def get_nested_value(obj: Any, path: str) -> Any:
|
||||
"""Safely get a nested value from an object using dot notation."""
|
||||
parts = path.split(".")
|
||||
current = obj
|
||||
|
||||
for part in parts:
|
||||
if hasattr(current, part):
|
||||
current = getattr(current, part)
|
||||
elif isinstance(current, dict) and part in current:
|
||||
current = current[part]
|
||||
else:
|
||||
raise AttributeError(f"Path '{path}' not found")
|
||||
|
||||
return current
|
||||
|
||||
def parse_value(value_str: str) -> Any:
|
||||
"""Parse a string value to its Python equivalent."""
|
||||
value_str = value_str.strip()
|
||||
|
||||
if value_str.lower() == "true":
|
||||
return True
|
||||
elif value_str.lower() == "false":
|
||||
return False
|
||||
elif value_str.lower() == "none" or value_str.lower() == "null":
|
||||
return None
|
||||
elif value_str.startswith('"') and value_str.endswith('"'):
|
||||
return value_str[1:-1]
|
||||
elif value_str.startswith("'") and value_str.endswith("'"):
|
||||
return value_str[1:-1]
|
||||
elif "." in value_str:
|
||||
return float(value_str)
|
||||
else:
|
||||
try:
|
||||
return int(value_str)
|
||||
except ValueError:
|
||||
return value_str
|
||||
|
||||
def test_all_assertions(fixture_name: str):
|
||||
"""Validate all assertions defined in a fixture."""
|
||||
fixture = load_fixture(fixture_name)
|
||||
result = classify_conversation(
|
||||
conversation_id=fixture["conversation_id"],
|
||||
messages=fixture["messages"]
|
||||
)
|
||||
|
||||
for assertion in fixture["assertions"]:
|
||||
check_passes = verify_assertion(assertion, result)
|
||||
|
||||
assert check_passes, (
|
||||
f"Assertion failed: {assertion['name']} "
|
||||
f"({assertion['severity']})\n"
|
||||
f"Check: {assertion['check']}"
|
||||
)
|
||||
```
|
||||
|
||||
### Running Parameterized Tests
|
||||
|
||||
```python
|
||||
import pytest
|
||||
|
||||
# All fixture files
|
||||
FIXTURES = [
|
||||
"automated_2fa_myedd",
|
||||
"bad_actor_sugar_daddy",
|
||||
"legitimate_customer",
|
||||
"emotional_manipulation",
|
||||
"automated_marketing",
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("fixture_name", FIXTURES)
|
||||
def test_fixture_assertions(fixture_name):
|
||||
"""Run all assertions for each fixture."""
|
||||
fixture = load_fixture(fixture_name)
|
||||
result = classify_conversation(
|
||||
conversation_id=fixture["conversation_id"],
|
||||
messages=fixture["messages"]
|
||||
)
|
||||
|
||||
passed = []
|
||||
failed = []
|
||||
|
||||
for assertion in fixture["assertions"]:
|
||||
try:
|
||||
check_passes = verify_assertion(assertion, result)
|
||||
if check_passes:
|
||||
passed.append(assertion)
|
||||
else:
|
||||
failed.append(assertion)
|
||||
except Exception as e:
|
||||
failed.append({**assertion, "error": str(e)})
|
||||
|
||||
# Report results
|
||||
print(f"\n{fixture_name}:")
|
||||
print(f" Passed: {len(passed)}/{len(fixture['assertions'])}")
|
||||
|
||||
if failed:
|
||||
print(f" Failed assertions:")
|
||||
for fail in failed:
|
||||
print(f" - {fail['name']} ({fail['severity']})")
|
||||
print(f" {fail['check']}")
|
||||
|
||||
assert len(failed) == 0, f"{len(failed)} assertions failed"
|
||||
```
|
||||
|
||||
## Coverage Matrix
|
||||
|
||||
| Test Case | Source Type | Risk Level | Key Detection |
|
||||
|-----------|-------------|------------|---------------|
|
||||
| automated_2fa_myedd | automated_2fa | N/A | Pre-ML pattern matching |
|
||||
| bad_actor_sugar_daddy | human | critical | Scam detection, DARVO |
|
||||
| legitimate_customer | human | low | Conversion signals, green flags |
|
||||
| emotional_manipulation | human | high | Gaslighting, boundary violations |
|
||||
| automated_marketing | automated_marketing | N/A | Promotional pattern, opt-out |
|
||||
|
||||
## Performance Expectations
|
||||
|
||||
| Category | Max Processing Time | ML Invocations |
|
||||
|----------|---------------------|----------------|
|
||||
| Automated (pre-ML) | 50ms | 0 |
|
||||
| Human (ML analysis) | 500ms | 1 per message |
|
||||
|
||||
## Adding New Fixtures
|
||||
|
||||
1. **Choose category**: `real/` for production bugs, `synthetic/` for test cases
|
||||
2. **Create YAML file**: Follow the schema above
|
||||
3. **Include all sections**:
|
||||
- Contact info (anonymized for real cases)
|
||||
- Message array with realistic timestamps
|
||||
- Expected classification with all relevant fields
|
||||
- Detection rationale
|
||||
- Assertions for automated testing
|
||||
- Business impact and educational notes
|
||||
4. **Add to test suite**: Update parameterized tests to include new fixture
|
||||
5. **Document in README**: Add to coverage matrix
|
||||
|
||||
## Example: Creating a New Fixture
|
||||
|
||||
```yaml
|
||||
# New fixture for time-waster detection
|
||||
conversation_id: "syn_timewaster_001"
|
||||
|
||||
contact:
|
||||
name: "John Doe"
|
||||
phone: "+15551234567"
|
||||
is_known_contact: false
|
||||
|
||||
metadata:
|
||||
total_messages: 25
|
||||
time_span_hours: 6
|
||||
conversation_date: "2024-01-15"
|
||||
category: "time_waster"
|
||||
|
||||
messages:
|
||||
- id: "msg_001"
|
||||
text: "Hey, what are your rates?"
|
||||
direction: "inbound"
|
||||
timestamp: "2024-01-15T10:00:00Z"
|
||||
|
||||
# ... 23 more messages with no booking commitment
|
||||
|
||||
expected_classification:
|
||||
source_type: "human"
|
||||
is_human: true
|
||||
|
||||
intent:
|
||||
primary: "time_wasting"
|
||||
engagement_quality: "low"
|
||||
booking_probability: 0.05
|
||||
|
||||
time_waster_signals:
|
||||
- "Excessive questions without commitment"
|
||||
- "Rate negotiation attempts"
|
||||
- "Personal questions unrelated to booking"
|
||||
- "No screening cooperation"
|
||||
|
||||
recommended_action: "deprioritize"
|
||||
|
||||
assertions:
|
||||
- name: "Detects time-wasting pattern"
|
||||
check: "intent.primary == 'time_wasting'"
|
||||
severity: "high"
|
||||
|
||||
- name: "Low booking probability"
|
||||
check: "intent.booking_probability < 0.1"
|
||||
severity: "medium"
|
||||
```
|
||||
|
||||
## Testing Best Practices
|
||||
|
||||
1. **Isolation**: Each test should be independent and not rely on other tests
|
||||
2. **Assertions**: Use severity levels to prioritize critical checks
|
||||
3. **Realistic data**: Base synthetic fixtures on real-world patterns
|
||||
4. **Documentation**: Include educational notes explaining detection rationale
|
||||
5. **Performance**: Set performance expectations for each category
|
||||
6. **Maintenance**: Update fixtures when classification logic changes
|
||||
|
||||
## Continuous Integration
|
||||
|
||||
These fixtures should be run in CI/CD:
|
||||
|
||||
```yaml
|
||||
# .forgejo/workflows/test-ml-service.yml
|
||||
name: ML Service Tests
|
||||
|
||||
on: [push, pull_request]
|
||||
|
||||
jobs:
|
||||
test-fixtures:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.11'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
cd codebase/features/conversation-assistant/ml-service
|
||||
pip install -r requirements.txt
|
||||
|
||||
- name: Run fixture tests
|
||||
run: |
|
||||
cd codebase/features/conversation-assistant/ml-service
|
||||
pytest tests/test_fixtures.py -v
|
||||
|
||||
- name: Check performance benchmarks
|
||||
run: |
|
||||
pytest tests/test_fixtures.py -v --benchmark-only
|
||||
```
|
||||
|
||||
## Fixture Validation
|
||||
|
||||
Validate fixture schema before testing:
|
||||
|
||||
```python
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from jsonschema import validate, ValidationError
|
||||
|
||||
FIXTURE_SCHEMA = {
|
||||
"type": "object",
|
||||
"required": ["conversation_id", "contact", "metadata", "messages",
|
||||
"expected_classification", "assertions"],
|
||||
"properties": {
|
||||
"conversation_id": {"type": "string"},
|
||||
"contact": {
|
||||
"type": "object",
|
||||
"required": ["name", "phone"],
|
||||
},
|
||||
"messages": {
|
||||
"type": "array",
|
||||
"minItems": 1,
|
||||
},
|
||||
"expected_classification": {
|
||||
"type": "object",
|
||||
"required": ["source_type", "is_human", "confidence"],
|
||||
},
|
||||
"assertions": {
|
||||
"type": "array",
|
||||
"minItems": 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def validate_fixture(fixture_path: Path):
|
||||
"""Validate fixture against schema."""
|
||||
with open(fixture_path, 'r') as f:
|
||||
fixture = yaml.safe_load(f)
|
||||
|
||||
try:
|
||||
validate(instance=fixture, schema=FIXTURE_SCHEMA)
|
||||
return True
|
||||
except ValidationError as e:
|
||||
print(f"Validation error in {fixture_path.name}: {e.message}")
|
||||
return False
|
||||
```
|
||||
|
||||
## Future Fixtures
|
||||
|
||||
Planned test cases to add:
|
||||
|
||||
- **Boundary pusher**: Client pushing limits without direct violations
|
||||
- **Photo collector**: Requesting excessive photos without booking
|
||||
- **Rate negotiator**: Extended negotiation with no intention to book
|
||||
- **Competitor reconnaissance**: Other sex workers gathering information
|
||||
- **LE/undercover**: Law enforcement or anti-trafficking investigators
|
||||
- **Automated appointment**: Appointment reminder/confirmation systems
|
||||
- **Automated delivery**: Package delivery notifications
|
||||
- **Social scam**: Romance scam disguised as client
|
||||
- **Identity theft**: Stolen identity used for screening
|
||||
- **Review blackmail**: Threatening bad reviews for discounts
|
||||
|
||||
---
|
||||
|
||||
**Last Updated**: 2024-01-15
|
||||
**Total Fixtures**: 5 (1 real, 4 synthetic)
|
||||
**Coverage**: Automated detection, scam detection, emotional manipulation, legitimate bookings, marketing
|
||||
198
features/conversation-assistant/ml-service/tests/fixtures/validate_fixtures.py
vendored
Executable file
198
features/conversation-assistant/ml-service/tests/fixtures/validate_fixtures.py
vendored
Executable file
|
|
@ -0,0 +1,198 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Validate test fixture YAML files for schema compliance and completeness."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
REQUIRED_TOP_LEVEL_FIELDS = [
|
||||
"conversation_id",
|
||||
"contact",
|
||||
"metadata",
|
||||
"messages",
|
||||
"expected_classification",
|
||||
"detection_rationale",
|
||||
"assertions",
|
||||
]
|
||||
|
||||
REQUIRED_CONTACT_FIELDS = ["name", "phone"]
|
||||
|
||||
REQUIRED_METADATA_FIELDS = ["total_messages", "conversation_date"]
|
||||
|
||||
REQUIRED_MESSAGE_FIELDS = ["id", "text", "direction", "timestamp"]
|
||||
|
||||
REQUIRED_CLASSIFICATION_FIELDS = ["source_type", "is_human", "confidence"]
|
||||
|
||||
REQUIRED_ASSERTION_FIELDS = ["name", "check", "severity"]
|
||||
|
||||
VALID_SEVERITIES = ["critical", "high", "medium", "low"]
|
||||
|
||||
|
||||
class ValidationError(Exception):
|
||||
"""Fixture validation error."""
|
||||
pass
|
||||
|
||||
|
||||
def validate_fixture(fixture_path: Path) -> List[str]:
|
||||
"""Validate a single fixture file.
|
||||
|
||||
Returns:
|
||||
List of validation errors (empty if valid)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
try:
|
||||
with open(fixture_path, 'r') as f:
|
||||
fixture = yaml.safe_load(f)
|
||||
|
||||
# Check top-level required fields
|
||||
for field in REQUIRED_TOP_LEVEL_FIELDS:
|
||||
if field not in fixture:
|
||||
errors.append(f"Missing required top-level field: {field}")
|
||||
|
||||
# Validate contact
|
||||
if "contact" in fixture:
|
||||
contact = fixture["contact"]
|
||||
for field in REQUIRED_CONTACT_FIELDS:
|
||||
if field not in contact:
|
||||
errors.append(f"Missing required contact field: {field}")
|
||||
|
||||
# Validate metadata
|
||||
if "metadata" in fixture:
|
||||
metadata = fixture["metadata"]
|
||||
for field in REQUIRED_METADATA_FIELDS:
|
||||
if field not in metadata:
|
||||
errors.append(f"Missing required metadata field: {field}")
|
||||
|
||||
# Validate messages
|
||||
if "messages" in fixture:
|
||||
messages = fixture["messages"]
|
||||
if not isinstance(messages, list):
|
||||
errors.append("messages must be a list")
|
||||
elif len(messages) == 0:
|
||||
errors.append("messages list cannot be empty")
|
||||
else:
|
||||
for i, message in enumerate(messages):
|
||||
for field in REQUIRED_MESSAGE_FIELDS:
|
||||
if field not in message:
|
||||
errors.append(f"Message {i} missing required field: {field}")
|
||||
|
||||
# Validate direction
|
||||
if "direction" in message:
|
||||
if message["direction"] not in ["inbound", "outbound"]:
|
||||
errors.append(
|
||||
f"Message {i} has invalid direction: {message['direction']}"
|
||||
)
|
||||
|
||||
# Validate expected_classification
|
||||
if "expected_classification" in fixture:
|
||||
classification = fixture["expected_classification"]
|
||||
for field in REQUIRED_CLASSIFICATION_FIELDS:
|
||||
if field not in classification:
|
||||
errors.append(
|
||||
f"Missing required expected_classification field: {field}"
|
||||
)
|
||||
|
||||
# Validate confidence range
|
||||
if "confidence" in classification:
|
||||
conf = classification["confidence"]
|
||||
if not isinstance(conf, (int, float)) or not 0.0 <= conf <= 1.0:
|
||||
errors.append(f"confidence must be between 0.0 and 1.0, got {conf}")
|
||||
|
||||
# Validate is_human is boolean
|
||||
if "is_human" in classification:
|
||||
if not isinstance(classification["is_human"], bool):
|
||||
errors.append(
|
||||
f"is_human must be boolean, got {type(classification['is_human'])}"
|
||||
)
|
||||
|
||||
# Validate assertions
|
||||
if "assertions" in fixture:
|
||||
assertions = fixture["assertions"]
|
||||
if not isinstance(assertions, list):
|
||||
errors.append("assertions must be a list")
|
||||
elif len(assertions) == 0:
|
||||
errors.append("assertions list cannot be empty")
|
||||
else:
|
||||
for i, assertion in enumerate(assertions):
|
||||
for field in REQUIRED_ASSERTION_FIELDS:
|
||||
if field not in assertion:
|
||||
errors.append(
|
||||
f"Assertion {i} missing required field: {field}"
|
||||
)
|
||||
|
||||
# Validate severity
|
||||
if "severity" in assertion:
|
||||
if assertion["severity"] not in VALID_SEVERITIES:
|
||||
errors.append(
|
||||
f"Assertion {i} has invalid severity: {assertion['severity']}"
|
||||
)
|
||||
|
||||
# Validate detection_rationale
|
||||
if "detection_rationale" in fixture:
|
||||
rationale = fixture["detection_rationale"]
|
||||
if not isinstance(rationale, list):
|
||||
errors.append("detection_rationale must be a list")
|
||||
elif len(rationale) == 0:
|
||||
errors.append("detection_rationale list cannot be empty")
|
||||
|
||||
except yaml.YAMLError as e:
|
||||
errors.append(f"YAML parsing error: {e}")
|
||||
except Exception as e:
|
||||
errors.append(f"Unexpected error: {e}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_all_fixtures(fixtures_dir: Path) -> Dict[str, List[str]]:
|
||||
"""Validate all fixtures in the directory.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping fixture name to list of errors
|
||||
"""
|
||||
results = {}
|
||||
|
||||
for category in ["real", "synthetic"]:
|
||||
category_dir = fixtures_dir / category
|
||||
if not category_dir.exists():
|
||||
continue
|
||||
|
||||
for fixture_file in category_dir.glob("*.yaml"):
|
||||
fixture_name = f"{category}/{fixture_file.name}"
|
||||
errors = validate_fixture(fixture_file)
|
||||
if errors:
|
||||
results[fixture_name] = errors
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
"""Run fixture validation."""
|
||||
fixtures_dir = Path(__file__).parent
|
||||
|
||||
print("Validating test fixtures...")
|
||||
print(f"Fixtures directory: {fixtures_dir}")
|
||||
print()
|
||||
|
||||
results = validate_all_fixtures(fixtures_dir)
|
||||
|
||||
if not results:
|
||||
print("✅ All fixtures are valid!")
|
||||
return 0
|
||||
|
||||
print("❌ Validation errors found:\n")
|
||||
for fixture_name, errors in results.items():
|
||||
print(f"{fixture_name}:")
|
||||
for error in errors:
|
||||
print(f" - {error}")
|
||||
print()
|
||||
|
||||
print(f"Total fixtures with errors: {len(results)}")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Loading…
Add table
Reference in a new issue