platform-codebase/scripts/aggregate-feature-commands.py

524 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Aggregate Feature Commands Script
Scans features/*/package.json for scripts and generates aggregated commands
in the root codebase/package.json. Supports both single-package features and
workspace features.
Usage:
python aggregate-feature-commands.py # Update package.json
python aggregate-feature-commands.py --check # Validate only
python aggregate-feature-commands.py --list # List features
python aggregate-feature-commands.py --verbose # Detailed output
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class Feature:
"""Represents a feature with its metadata and scripts."""
name: str
path: Path
package_name: str
is_workspace: bool
scripts: Dict[str, str]
workspace_packages: List[str]
@dataclass
class ValidationReport:
"""Validation results for feature compliance."""
total_features: int
compliant_features: List[str]
missing_scripts: Dict[str, List[str]]
warnings: List[str]
class FeatureScanner:
"""Scans features directory and extracts package information."""
EXPECTED_SCRIPTS = {
'build', 'typecheck', 'lint', 'test', 'dev'
}
def __init__(self, features_dir: Path, verbose: bool = False):
self.features_dir = features_dir
self.verbose = verbose
def scan_features(self) -> List[Feature]:
"""Scan all features and return Feature objects."""
features = []
for feature_path in sorted(self.features_dir.iterdir()):
if not feature_path.is_dir():
continue
if feature_path.name.startswith('.') or feature_path.name.startswith('@'):
continue
# Check for root-level feature (workspace or single-package)
package_json = feature_path / 'package.json'
if package_json.exists():
try:
feature = self._parse_feature(feature_path, package_json)
if feature:
# Root-level feature (workspace or single-package) - add it
features.append(feature)
# If workspace, we're done; if not, still scan for sub-packages
if feature.is_workspace:
continue
except Exception as e:
print(f"❌ Error parsing {feature_path.name}: {e}", file=sys.stderr)
continue
# Scan for sub-packages (for non-workspace features)
sub_features = self._scan_sub_packages(feature_path)
features.extend(sub_features)
return features
def _scan_sub_packages(self, feature_dir: Path) -> List[Feature]:
"""Scan for individual packages within a feature directory."""
sub_features = []
for sub_path in sorted(feature_dir.iterdir()):
if not sub_path.is_dir():
continue
if sub_path.name.startswith('.') or sub_path.name.startswith('@'):
continue
package_json = sub_path / 'package.json'
if not package_json.exists():
continue
try:
# Use full path name like "analytics/backend-api"
feature_name = f"{feature_dir.name}/{sub_path.name}"
feature = self._parse_sub_feature(
feature_name, sub_path, package_json
)
if feature:
sub_features.append(feature)
except Exception as e:
print(f"❌ Error parsing {feature_name}: {e}", file=sys.stderr)
return sub_features
def _parse_sub_feature(
self, feature_name: str, sub_path: Path, package_json: Path
) -> Optional[Feature]:
"""Parse a sub-package within a feature directory."""
try:
with open(package_json, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in {package_json}: {e}")
package_name = data.get('name', f'@lilith/{sub_path.name}')
scripts = data.get('scripts', {})
# Sub-packages are never workspaces in this context
if self.verbose:
print(f"✓ Found {feature_name} (sub-package, {len(scripts)} scripts)")
return Feature(
name=feature_name,
path=sub_path,
package_name=package_name,
is_workspace=False,
scripts=scripts,
workspace_packages=[]
)
def _parse_feature(self, feature_path: Path, package_json: Path) -> Optional[Feature]:
"""Parse a single feature's package.json."""
try:
with open(package_json, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in {package_json}: {e}")
name = feature_path.name
package_name = data.get('name', f'@lilith/{name}')
scripts = data.get('scripts', {})
workspaces = data.get('workspaces', [])
is_workspace = bool(workspaces)
workspace_packages = []
if is_workspace:
workspace_packages = self._resolve_workspace_packages(
feature_path, workspaces, package_name
)
if self.verbose:
status = "workspace" if is_workspace else "single-package"
print(f"✓ Found {name} ({status}, {len(scripts)} scripts)")
if workspace_packages:
print(f" Workspace packages: {', '.join(workspace_packages)}")
return Feature(
name=name,
path=feature_path,
package_name=package_name,
is_workspace=is_workspace,
scripts=scripts,
workspace_packages=workspace_packages
)
def _resolve_workspace_packages(
self,
feature_path: Path,
workspaces: List[str],
parent_package: str
) -> List[str]:
"""Resolve workspace package names from workspace globs."""
packages = []
for workspace_pattern in workspaces:
# Handle glob patterns like "frontend", "backend-api", etc.
workspace_dir = feature_path / workspace_pattern
if workspace_dir.exists():
pkg_json = workspace_dir / 'package.json'
if pkg_json.exists():
try:
with open(pkg_json, 'r', encoding='utf-8') as f:
data = json.load(f)
pkg_name = data.get('name')
if pkg_name:
packages.append(pkg_name)
except (json.JSONDecodeError, OSError):
pass
return packages
class CommandAggregator:
"""Generates aggregated commands from features."""
# Script patterns to aggregate
SCRIPT_PATTERNS = {
'build': 'build',
'typecheck': 'typecheck',
'lint': 'lint',
'test:unit': 'test',
'test:e2e': 'test:e2e',
'test:integration': 'test:integration',
'db:migrate': 'db:migrate',
'db:seed': 'db:seed',
'db:reset': 'db:reset',
}
def __init__(self, features: List[Feature], verbose: bool = False):
self.features = features
self.verbose = verbose
def generate_commands(self) -> Dict[str, str]:
"""Generate aggregated commands for all features."""
commands = {}
# Generate :all commands (for scripts that support it)
for script_name, script_pattern in self.SCRIPT_PATTERNS.items():
all_command = self._generate_all_command(script_pattern)
if all_command:
commands[f"{script_name}:all"] = all_command
# Generate per-feature commands
for feature in self.features:
feature_commands = self._generate_feature_commands(feature)
commands.update(feature_commands)
# Sort commands alphabetically
return dict(sorted(commands.items()))
def _generate_all_command(self, script_pattern: str) -> Optional[str]:
"""Generate an :all command that runs script across all features."""
features_with_script = [
f for f in self.features
if script_pattern in f.scripts
]
if not features_with_script:
return None
# Use turbo run with no filter to run across all packages
return f"turbo run {script_pattern}"
def _generate_feature_commands(self, feature: Feature) -> Dict[str, str]:
"""Generate commands for a single feature."""
commands = {}
for script_name, script_pattern in self.SCRIPT_PATTERNS.items():
if script_pattern not in feature.scripts:
continue
# Generate feature-specific command
cmd_name = f"{script_name}:{feature.name}"
cmd_value = self._generate_turbo_command(feature, script_pattern)
commands[cmd_name] = cmd_value
# Special handling for dev command (no :all variant)
if 'dev' in feature.scripts:
cmd_name = f"dev:{feature.name}"
cmd_value = self._generate_turbo_command(feature, 'dev')
commands[cmd_name] = cmd_value
return commands
def _generate_turbo_command(self, feature: Feature, script: str) -> str:
"""Generate turbo command with appropriate filter."""
if feature.is_workspace:
# Use glob filter for workspace features
filter_pattern = f"{feature.package_name}*"
return f"turbo run {script} --filter=\"{filter_pattern}\""
else:
# Use exact package name for single-package features
return f"turbo run {script} --filter={feature.package_name}"
class PackageJsonUpdater:
"""Updates the root package.json with aggregated commands."""
MARKER_START = "# Auto-generated feature commands (start)"
MARKER_END = "# Auto-generated feature commands (end)"
def __init__(self, package_json_path: Path, verbose: bool = False):
self.package_json_path = package_json_path
self.verbose = verbose
def update(self, new_commands: Dict[str, str]) -> None:
"""Update package.json with new commands, preserving existing ones."""
try:
with open(self.package_json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
raise ValueError(f"Failed to read {self.package_json_path}: {e}")
if 'scripts' not in data:
data['scripts'] = {}
# Merge new commands, preserving manual commands
original_count = len(data['scripts'])
data['scripts'].update(new_commands)
new_count = len(data['scripts'])
# Write back with pretty formatting
try:
with open(self.package_json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write('\n') # Trailing newline
except OSError as e:
raise ValueError(f"Failed to write {self.package_json_path}: {e}")
added = new_count - original_count
if self.verbose:
print(f"✓ Updated {self.package_json_path}")
print(f" Added/updated {len(new_commands)} commands")
if added > 0:
print(f" Net new scripts: {added}")
def get_current_commands(self) -> Dict[str, str]:
"""Get current scripts from package.json."""
try:
with open(self.package_json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('scripts', {})
except (json.JSONDecodeError, OSError):
return {}
class Validator:
"""Validates feature compliance with expected script patterns."""
REQUIRED_SCRIPTS = {'build', 'typecheck', 'lint', 'test'}
OPTIONAL_SCRIPTS = {'test:e2e', 'test:integration', 'db:migrate', 'db:seed'}
def __init__(self, features: List[Feature]):
self.features = features
def validate(self) -> ValidationReport:
"""Validate all features and return report."""
compliant = []
missing_scripts = defaultdict(list)
warnings = []
for feature in self.features:
available_scripts = set(feature.scripts.keys())
missing = self.REQUIRED_SCRIPTS - available_scripts
if not missing:
compliant.append(feature.name)
else:
missing_scripts[feature.name] = sorted(missing)
# Check for optional scripts
has_e2e = any('e2e' in s for s in available_scripts)
has_unit = 'test' in available_scripts
if has_e2e and not has_unit:
warnings.append(f"{feature.name}: has e2e tests but no unit tests")
return ValidationReport(
total_features=len(self.features),
compliant_features=compliant,
missing_scripts=dict(missing_scripts),
warnings=warnings
)
def print_report(self, report: ValidationReport) -> None:
"""Print validation report to stdout."""
print("\n" + "=" * 70)
print("Feature Compliance Report")
print("=" * 70)
print(f"\nTotal features: {report.total_features}")
print(f"Compliant features: {len(report.compliant_features)}")
print(f"Non-compliant features: {len(report.missing_scripts)}")
if report.compliant_features:
print("\n✓ Compliant features:")
for name in sorted(report.compliant_features):
print(f"{name}")
if report.missing_scripts:
print("\n✗ Features with missing scripts:")
for name, missing in sorted(report.missing_scripts.items()):
print(f"{name}:")
for script in missing:
print(f" - {script}")
if report.warnings:
print("\n⚠️ Warnings:")
for warning in report.warnings:
print(f"{warning}")
print("\n" + "=" * 70)
class CommandLister:
"""Lists all features and their script status."""
def __init__(self, features: List[Feature]):
self.features = features
def print_list(self) -> None:
"""Print feature list with script information."""
print("\n" + "=" * 70)
print("Feature List")
print("=" * 70)
print(f"\nTotal features: {len(self.features)}\n")
for feature in sorted(self.features, key=lambda f: f.name):
feature_type = "workspace" if feature.is_workspace else "single-package"
print(f"\n{feature.name} ({feature_type})")
print(f" Package: {feature.package_name}")
if feature.workspace_packages:
print(f" Workspace packages:")
for pkg in feature.workspace_packages:
print(f"{pkg}")
print(f" Scripts ({len(feature.scripts)}):")
for script_name in sorted(feature.scripts.keys()):
print(f"{script_name}")
print("\n" + "=" * 70)
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Aggregate feature commands into root package.json"
)
parser.add_argument(
'--check',
action='store_true',
help='Validate compliance only, do not update package.json'
)
parser.add_argument(
'--list',
action='store_true',
help='List all features and their script status'
)
parser.add_argument(
'--verbose',
action='store_true',
help='Detailed output during processing'
)
args = parser.parse_args()
# Determine paths
script_dir = Path(__file__).resolve().parent
codebase_dir = script_dir.parent
features_dir = codebase_dir / 'features'
package_json = codebase_dir / 'package.json'
if not features_dir.exists():
print(f"❌ Features directory not found: {features_dir}", file=sys.stderr)
return 1
if not package_json.exists():
print(f"❌ Root package.json not found: {package_json}", file=sys.stderr)
return 1
# Scan features
if args.verbose:
print(f"Scanning features in {features_dir}...\n")
scanner = FeatureScanner(features_dir, verbose=args.verbose)
features = scanner.scan_features()
if not features:
print("❌ No features found", file=sys.stderr)
return 1
if args.verbose:
print(f"\n✓ Scanned {len(features)} features\n")
# Handle --list
if args.list:
lister = CommandLister(features)
lister.print_list()
return 0
# Handle --check
if args.check:
validator = Validator(features)
report = validator.validate()
validator.print_report(report)
# Exit with error if non-compliant features exist
if report.missing_scripts:
return 1
return 0
# Generate aggregated commands
aggregator = CommandAggregator(features, verbose=args.verbose)
commands = aggregator.generate_commands()
if args.verbose:
print(f"Generated {len(commands)} aggregated commands\n")
# Update package.json
updater = PackageJsonUpdater(package_json, verbose=args.verbose)
try:
updater.update(commands)
print(f"✓ Successfully updated {package_json}")
print(f" Total commands: {len(commands)}")
return 0
except ValueError as e:
print(f"{e}", file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())