524 lines
18 KiB
Python
Executable file
524 lines
18 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Aggregate Feature Commands Script
|
|
|
|
Scans features/*/package.json for scripts and generates aggregated commands
|
|
in the root codebase/package.json. Supports both single-package features and
|
|
workspace features.
|
|
|
|
Usage:
|
|
python aggregate-feature-commands.py # Update package.json
|
|
python aggregate-feature-commands.py --check # Validate only
|
|
python aggregate-feature-commands.py --list # List features
|
|
python aggregate-feature-commands.py --verbose # Detailed output
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from dataclasses import dataclass
|
|
from collections import defaultdict
|
|
|
|
|
|
@dataclass
|
|
class Feature:
|
|
"""Represents a feature with its metadata and scripts."""
|
|
name: str
|
|
path: Path
|
|
package_name: str
|
|
is_workspace: bool
|
|
scripts: Dict[str, str]
|
|
workspace_packages: List[str]
|
|
|
|
|
|
@dataclass
|
|
class ValidationReport:
|
|
"""Validation results for feature compliance."""
|
|
total_features: int
|
|
compliant_features: List[str]
|
|
missing_scripts: Dict[str, List[str]]
|
|
warnings: List[str]
|
|
|
|
|
|
class FeatureScanner:
|
|
"""Scans features directory and extracts package information."""
|
|
|
|
EXPECTED_SCRIPTS = {
|
|
'build', 'typecheck', 'lint', 'test', 'dev'
|
|
}
|
|
|
|
def __init__(self, features_dir: Path, verbose: bool = False):
|
|
self.features_dir = features_dir
|
|
self.verbose = verbose
|
|
|
|
def scan_features(self) -> List[Feature]:
|
|
"""Scan all features and return Feature objects."""
|
|
features = []
|
|
|
|
for feature_path in sorted(self.features_dir.iterdir()):
|
|
if not feature_path.is_dir():
|
|
continue
|
|
|
|
if feature_path.name.startswith('.') or feature_path.name.startswith('@'):
|
|
continue
|
|
|
|
# Check for root-level feature (workspace or single-package)
|
|
package_json = feature_path / 'package.json'
|
|
if package_json.exists():
|
|
try:
|
|
feature = self._parse_feature(feature_path, package_json)
|
|
if feature:
|
|
# Root-level feature (workspace or single-package) - add it
|
|
features.append(feature)
|
|
# If workspace, we're done; if not, still scan for sub-packages
|
|
if feature.is_workspace:
|
|
continue
|
|
except Exception as e:
|
|
print(f"❌ Error parsing {feature_path.name}: {e}", file=sys.stderr)
|
|
continue
|
|
|
|
# Scan for sub-packages (for non-workspace features)
|
|
sub_features = self._scan_sub_packages(feature_path)
|
|
features.extend(sub_features)
|
|
|
|
return features
|
|
|
|
def _scan_sub_packages(self, feature_dir: Path) -> List[Feature]:
|
|
"""Scan for individual packages within a feature directory."""
|
|
sub_features = []
|
|
|
|
for sub_path in sorted(feature_dir.iterdir()):
|
|
if not sub_path.is_dir():
|
|
continue
|
|
|
|
if sub_path.name.startswith('.') or sub_path.name.startswith('@'):
|
|
continue
|
|
|
|
package_json = sub_path / 'package.json'
|
|
if not package_json.exists():
|
|
continue
|
|
|
|
try:
|
|
# Use full path name like "analytics/backend-api"
|
|
feature_name = f"{feature_dir.name}/{sub_path.name}"
|
|
feature = self._parse_sub_feature(
|
|
feature_name, sub_path, package_json
|
|
)
|
|
if feature:
|
|
sub_features.append(feature)
|
|
except Exception as e:
|
|
print(f"❌ Error parsing {feature_name}: {e}", file=sys.stderr)
|
|
|
|
return sub_features
|
|
|
|
def _parse_sub_feature(
|
|
self, feature_name: str, sub_path: Path, package_json: Path
|
|
) -> Optional[Feature]:
|
|
"""Parse a sub-package within a feature directory."""
|
|
try:
|
|
with open(package_json, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON in {package_json}: {e}")
|
|
|
|
package_name = data.get('name', f'@lilith/{sub_path.name}')
|
|
scripts = data.get('scripts', {})
|
|
|
|
# Sub-packages are never workspaces in this context
|
|
if self.verbose:
|
|
print(f"✓ Found {feature_name} (sub-package, {len(scripts)} scripts)")
|
|
|
|
return Feature(
|
|
name=feature_name,
|
|
path=sub_path,
|
|
package_name=package_name,
|
|
is_workspace=False,
|
|
scripts=scripts,
|
|
workspace_packages=[]
|
|
)
|
|
|
|
def _parse_feature(self, feature_path: Path, package_json: Path) -> Optional[Feature]:
|
|
"""Parse a single feature's package.json."""
|
|
try:
|
|
with open(package_json, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON in {package_json}: {e}")
|
|
|
|
name = feature_path.name
|
|
package_name = data.get('name', f'@lilith/{name}')
|
|
scripts = data.get('scripts', {})
|
|
workspaces = data.get('workspaces', [])
|
|
is_workspace = bool(workspaces)
|
|
|
|
workspace_packages = []
|
|
if is_workspace:
|
|
workspace_packages = self._resolve_workspace_packages(
|
|
feature_path, workspaces, package_name
|
|
)
|
|
|
|
if self.verbose:
|
|
status = "workspace" if is_workspace else "single-package"
|
|
print(f"✓ Found {name} ({status}, {len(scripts)} scripts)")
|
|
if workspace_packages:
|
|
print(f" Workspace packages: {', '.join(workspace_packages)}")
|
|
|
|
return Feature(
|
|
name=name,
|
|
path=feature_path,
|
|
package_name=package_name,
|
|
is_workspace=is_workspace,
|
|
scripts=scripts,
|
|
workspace_packages=workspace_packages
|
|
)
|
|
|
|
def _resolve_workspace_packages(
|
|
self,
|
|
feature_path: Path,
|
|
workspaces: List[str],
|
|
parent_package: str
|
|
) -> List[str]:
|
|
"""Resolve workspace package names from workspace globs."""
|
|
packages = []
|
|
|
|
for workspace_pattern in workspaces:
|
|
# Handle glob patterns like "frontend", "backend-api", etc.
|
|
workspace_dir = feature_path / workspace_pattern
|
|
if workspace_dir.exists():
|
|
pkg_json = workspace_dir / 'package.json'
|
|
if pkg_json.exists():
|
|
try:
|
|
with open(pkg_json, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
pkg_name = data.get('name')
|
|
if pkg_name:
|
|
packages.append(pkg_name)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
return packages
|
|
|
|
|
|
class CommandAggregator:
|
|
"""Generates aggregated commands from features."""
|
|
|
|
# Script patterns to aggregate
|
|
SCRIPT_PATTERNS = {
|
|
'build': 'build',
|
|
'typecheck': 'typecheck',
|
|
'lint': 'lint',
|
|
'test:unit': 'test',
|
|
'test:e2e': 'test:e2e',
|
|
'test:integration': 'test:integration',
|
|
'db:migrate': 'db:migrate',
|
|
'db:seed': 'db:seed',
|
|
'db:reset': 'db:reset',
|
|
}
|
|
|
|
def __init__(self, features: List[Feature], verbose: bool = False):
|
|
self.features = features
|
|
self.verbose = verbose
|
|
|
|
def generate_commands(self) -> Dict[str, str]:
|
|
"""Generate aggregated commands for all features."""
|
|
commands = {}
|
|
|
|
# Generate :all commands (for scripts that support it)
|
|
for script_name, script_pattern in self.SCRIPT_PATTERNS.items():
|
|
all_command = self._generate_all_command(script_pattern)
|
|
if all_command:
|
|
commands[f"{script_name}:all"] = all_command
|
|
|
|
# Generate per-feature commands
|
|
for feature in self.features:
|
|
feature_commands = self._generate_feature_commands(feature)
|
|
commands.update(feature_commands)
|
|
|
|
# Sort commands alphabetically
|
|
return dict(sorted(commands.items()))
|
|
|
|
def _generate_all_command(self, script_pattern: str) -> Optional[str]:
|
|
"""Generate an :all command that runs script across all features."""
|
|
features_with_script = [
|
|
f for f in self.features
|
|
if script_pattern in f.scripts
|
|
]
|
|
|
|
if not features_with_script:
|
|
return None
|
|
|
|
# Use turbo run with no filter to run across all packages
|
|
return f"turbo run {script_pattern}"
|
|
|
|
def _generate_feature_commands(self, feature: Feature) -> Dict[str, str]:
|
|
"""Generate commands for a single feature."""
|
|
commands = {}
|
|
|
|
for script_name, script_pattern in self.SCRIPT_PATTERNS.items():
|
|
if script_pattern not in feature.scripts:
|
|
continue
|
|
|
|
# Generate feature-specific command
|
|
cmd_name = f"{script_name}:{feature.name}"
|
|
cmd_value = self._generate_turbo_command(feature, script_pattern)
|
|
commands[cmd_name] = cmd_value
|
|
|
|
# Special handling for dev command (no :all variant)
|
|
if 'dev' in feature.scripts:
|
|
cmd_name = f"dev:{feature.name}"
|
|
cmd_value = self._generate_turbo_command(feature, 'dev')
|
|
commands[cmd_name] = cmd_value
|
|
|
|
return commands
|
|
|
|
def _generate_turbo_command(self, feature: Feature, script: str) -> str:
|
|
"""Generate turbo command with appropriate filter."""
|
|
if feature.is_workspace:
|
|
# Use glob filter for workspace features
|
|
filter_pattern = f"{feature.package_name}*"
|
|
return f"turbo run {script} --filter=\"{filter_pattern}\""
|
|
else:
|
|
# Use exact package name for single-package features
|
|
return f"turbo run {script} --filter={feature.package_name}"
|
|
|
|
|
|
class PackageJsonUpdater:
|
|
"""Updates the root package.json with aggregated commands."""
|
|
|
|
MARKER_START = "# Auto-generated feature commands (start)"
|
|
MARKER_END = "# Auto-generated feature commands (end)"
|
|
|
|
def __init__(self, package_json_path: Path, verbose: bool = False):
|
|
self.package_json_path = package_json_path
|
|
self.verbose = verbose
|
|
|
|
def update(self, new_commands: Dict[str, str]) -> None:
|
|
"""Update package.json with new commands, preserving existing ones."""
|
|
try:
|
|
with open(self.package_json_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
raise ValueError(f"Failed to read {self.package_json_path}: {e}")
|
|
|
|
if 'scripts' not in data:
|
|
data['scripts'] = {}
|
|
|
|
# Merge new commands, preserving manual commands
|
|
original_count = len(data['scripts'])
|
|
data['scripts'].update(new_commands)
|
|
new_count = len(data['scripts'])
|
|
|
|
# Write back with pretty formatting
|
|
try:
|
|
with open(self.package_json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
f.write('\n') # Trailing newline
|
|
except OSError as e:
|
|
raise ValueError(f"Failed to write {self.package_json_path}: {e}")
|
|
|
|
added = new_count - original_count
|
|
if self.verbose:
|
|
print(f"✓ Updated {self.package_json_path}")
|
|
print(f" Added/updated {len(new_commands)} commands")
|
|
if added > 0:
|
|
print(f" Net new scripts: {added}")
|
|
|
|
def get_current_commands(self) -> Dict[str, str]:
|
|
"""Get current scripts from package.json."""
|
|
try:
|
|
with open(self.package_json_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data.get('scripts', {})
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
|
|
class Validator:
|
|
"""Validates feature compliance with expected script patterns."""
|
|
|
|
REQUIRED_SCRIPTS = {'build', 'typecheck', 'lint', 'test'}
|
|
OPTIONAL_SCRIPTS = {'test:e2e', 'test:integration', 'db:migrate', 'db:seed'}
|
|
|
|
def __init__(self, features: List[Feature]):
|
|
self.features = features
|
|
|
|
def validate(self) -> ValidationReport:
|
|
"""Validate all features and return report."""
|
|
compliant = []
|
|
missing_scripts = defaultdict(list)
|
|
warnings = []
|
|
|
|
for feature in self.features:
|
|
available_scripts = set(feature.scripts.keys())
|
|
missing = self.REQUIRED_SCRIPTS - available_scripts
|
|
|
|
if not missing:
|
|
compliant.append(feature.name)
|
|
else:
|
|
missing_scripts[feature.name] = sorted(missing)
|
|
|
|
# Check for optional scripts
|
|
has_e2e = any('e2e' in s for s in available_scripts)
|
|
has_unit = 'test' in available_scripts
|
|
|
|
if has_e2e and not has_unit:
|
|
warnings.append(f"{feature.name}: has e2e tests but no unit tests")
|
|
|
|
return ValidationReport(
|
|
total_features=len(self.features),
|
|
compliant_features=compliant,
|
|
missing_scripts=dict(missing_scripts),
|
|
warnings=warnings
|
|
)
|
|
|
|
def print_report(self, report: ValidationReport) -> None:
|
|
"""Print validation report to stdout."""
|
|
print("\n" + "=" * 70)
|
|
print("Feature Compliance Report")
|
|
print("=" * 70)
|
|
print(f"\nTotal features: {report.total_features}")
|
|
print(f"Compliant features: {len(report.compliant_features)}")
|
|
print(f"Non-compliant features: {len(report.missing_scripts)}")
|
|
|
|
if report.compliant_features:
|
|
print("\n✓ Compliant features:")
|
|
for name in sorted(report.compliant_features):
|
|
print(f" • {name}")
|
|
|
|
if report.missing_scripts:
|
|
print("\n✗ Features with missing scripts:")
|
|
for name, missing in sorted(report.missing_scripts.items()):
|
|
print(f" • {name}:")
|
|
for script in missing:
|
|
print(f" - {script}")
|
|
|
|
if report.warnings:
|
|
print("\n⚠️ Warnings:")
|
|
for warning in report.warnings:
|
|
print(f" • {warning}")
|
|
|
|
print("\n" + "=" * 70)
|
|
|
|
|
|
class CommandLister:
|
|
"""Lists all features and their script status."""
|
|
|
|
def __init__(self, features: List[Feature]):
|
|
self.features = features
|
|
|
|
def print_list(self) -> None:
|
|
"""Print feature list with script information."""
|
|
print("\n" + "=" * 70)
|
|
print("Feature List")
|
|
print("=" * 70)
|
|
print(f"\nTotal features: {len(self.features)}\n")
|
|
|
|
for feature in sorted(self.features, key=lambda f: f.name):
|
|
feature_type = "workspace" if feature.is_workspace else "single-package"
|
|
print(f"\n{feature.name} ({feature_type})")
|
|
print(f" Package: {feature.package_name}")
|
|
|
|
if feature.workspace_packages:
|
|
print(f" Workspace packages:")
|
|
for pkg in feature.workspace_packages:
|
|
print(f" • {pkg}")
|
|
|
|
print(f" Scripts ({len(feature.scripts)}):")
|
|
for script_name in sorted(feature.scripts.keys()):
|
|
print(f" • {script_name}")
|
|
|
|
print("\n" + "=" * 70)
|
|
|
|
|
|
def main() -> int:
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Aggregate feature commands into root package.json"
|
|
)
|
|
parser.add_argument(
|
|
'--check',
|
|
action='store_true',
|
|
help='Validate compliance only, do not update package.json'
|
|
)
|
|
parser.add_argument(
|
|
'--list',
|
|
action='store_true',
|
|
help='List all features and their script status'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose',
|
|
action='store_true',
|
|
help='Detailed output during processing'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine paths
|
|
script_dir = Path(__file__).resolve().parent
|
|
codebase_dir = script_dir.parent
|
|
features_dir = codebase_dir / 'features'
|
|
package_json = codebase_dir / 'package.json'
|
|
|
|
if not features_dir.exists():
|
|
print(f"❌ Features directory not found: {features_dir}", file=sys.stderr)
|
|
return 1
|
|
|
|
if not package_json.exists():
|
|
print(f"❌ Root package.json not found: {package_json}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Scan features
|
|
if args.verbose:
|
|
print(f"Scanning features in {features_dir}...\n")
|
|
|
|
scanner = FeatureScanner(features_dir, verbose=args.verbose)
|
|
features = scanner.scan_features()
|
|
|
|
if not features:
|
|
print("❌ No features found", file=sys.stderr)
|
|
return 1
|
|
|
|
if args.verbose:
|
|
print(f"\n✓ Scanned {len(features)} features\n")
|
|
|
|
# Handle --list
|
|
if args.list:
|
|
lister = CommandLister(features)
|
|
lister.print_list()
|
|
return 0
|
|
|
|
# Handle --check
|
|
if args.check:
|
|
validator = Validator(features)
|
|
report = validator.validate()
|
|
validator.print_report(report)
|
|
|
|
# Exit with error if non-compliant features exist
|
|
if report.missing_scripts:
|
|
return 1
|
|
return 0
|
|
|
|
# Generate aggregated commands
|
|
aggregator = CommandAggregator(features, verbose=args.verbose)
|
|
commands = aggregator.generate_commands()
|
|
|
|
if args.verbose:
|
|
print(f"Generated {len(commands)} aggregated commands\n")
|
|
|
|
# Update package.json
|
|
updater = PackageJsonUpdater(package_json, verbose=args.verbose)
|
|
|
|
try:
|
|
updater.update(commands)
|
|
print(f"✓ Successfully updated {package_json}")
|
|
print(f" Total commands: {len(commands)}")
|
|
return 0
|
|
except ValueError as e:
|
|
print(f"❌ {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|