#!/usr/bin/env python3 """ Aggregate Feature Commands Script Scans features/*/package.json for scripts and generates aggregated commands in the root codebase/package.json. Supports both single-package features and workspace features. Usage: python aggregate-feature-commands.py # Update package.json python aggregate-feature-commands.py --check # Validate only python aggregate-feature-commands.py --list # List features python aggregate-feature-commands.py --verbose # Detailed output """ import argparse import json import sys from pathlib import Path from typing import Dict, List, Optional, Set, Tuple from dataclasses import dataclass from collections import defaultdict @dataclass class Feature: """Represents a feature with its metadata and scripts.""" name: str path: Path package_name: str is_workspace: bool scripts: Dict[str, str] workspace_packages: List[str] @dataclass class ValidationReport: """Validation results for feature compliance.""" total_features: int compliant_features: List[str] missing_scripts: Dict[str, List[str]] warnings: List[str] class FeatureScanner: """Scans features directory and extracts package information.""" EXPECTED_SCRIPTS = { 'build', 'typecheck', 'lint', 'test', 'dev' } def __init__(self, features_dir: Path, verbose: bool = False): self.features_dir = features_dir self.verbose = verbose def scan_features(self) -> List[Feature]: """Scan all features and return Feature objects.""" features = [] for feature_path in sorted(self.features_dir.iterdir()): if not feature_path.is_dir(): continue if feature_path.name.startswith('.') or feature_path.name.startswith('@'): continue # Check for root-level feature (workspace or single-package) package_json = feature_path / 'package.json' if package_json.exists(): try: feature = self._parse_feature(feature_path, package_json) if feature: # Root-level feature (workspace or single-package) - add it features.append(feature) # If workspace, we're done; if not, still scan for sub-packages if feature.is_workspace: continue except Exception as e: print(f"❌ Error parsing {feature_path.name}: {e}", file=sys.stderr) continue # Scan for sub-packages (for non-workspace features) sub_features = self._scan_sub_packages(feature_path) features.extend(sub_features) return features def _scan_sub_packages(self, feature_dir: Path) -> List[Feature]: """Scan for individual packages within a feature directory.""" sub_features = [] for sub_path in sorted(feature_dir.iterdir()): if not sub_path.is_dir(): continue if sub_path.name.startswith('.') or sub_path.name.startswith('@'): continue package_json = sub_path / 'package.json' if not package_json.exists(): continue try: # Use full path name like "analytics/backend-api" feature_name = f"{feature_dir.name}/{sub_path.name}" feature = self._parse_sub_feature( feature_name, sub_path, package_json ) if feature: sub_features.append(feature) except Exception as e: print(f"❌ Error parsing {feature_name}: {e}", file=sys.stderr) return sub_features def _parse_sub_feature( self, feature_name: str, sub_path: Path, package_json: Path ) -> Optional[Feature]: """Parse a sub-package within a feature directory.""" try: with open(package_json, 'r', encoding='utf-8') as f: data = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in {package_json}: {e}") package_name = data.get('name', f'@lilith/{sub_path.name}') scripts = data.get('scripts', {}) # Sub-packages are never workspaces in this context if self.verbose: print(f"✓ Found {feature_name} (sub-package, {len(scripts)} scripts)") return Feature( name=feature_name, path=sub_path, package_name=package_name, is_workspace=False, scripts=scripts, workspace_packages=[] ) def _parse_feature(self, feature_path: Path, package_json: Path) -> Optional[Feature]: """Parse a single feature's package.json.""" try: with open(package_json, 'r', encoding='utf-8') as f: data = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in {package_json}: {e}") name = feature_path.name package_name = data.get('name', f'@lilith/{name}') scripts = data.get('scripts', {}) workspaces = data.get('workspaces', []) is_workspace = bool(workspaces) workspace_packages = [] if is_workspace: workspace_packages = self._resolve_workspace_packages( feature_path, workspaces, package_name ) if self.verbose: status = "workspace" if is_workspace else "single-package" print(f"✓ Found {name} ({status}, {len(scripts)} scripts)") if workspace_packages: print(f" Workspace packages: {', '.join(workspace_packages)}") return Feature( name=name, path=feature_path, package_name=package_name, is_workspace=is_workspace, scripts=scripts, workspace_packages=workspace_packages ) def _resolve_workspace_packages( self, feature_path: Path, workspaces: List[str], parent_package: str ) -> List[str]: """Resolve workspace package names from workspace globs.""" packages = [] for workspace_pattern in workspaces: # Handle glob patterns like "frontend", "backend-api", etc. workspace_dir = feature_path / workspace_pattern if workspace_dir.exists(): pkg_json = workspace_dir / 'package.json' if pkg_json.exists(): try: with open(pkg_json, 'r', encoding='utf-8') as f: data = json.load(f) pkg_name = data.get('name') if pkg_name: packages.append(pkg_name) except (json.JSONDecodeError, OSError): pass return packages class CommandAggregator: """Generates aggregated commands from features.""" # Script patterns to aggregate SCRIPT_PATTERNS = { 'build': 'build', 'typecheck': 'typecheck', 'lint': 'lint', 'test:unit': 'test', 'test:e2e': 'test:e2e', 'test:integration': 'test:integration', 'db:migrate': 'db:migrate', 'db:seed': 'db:seed', 'db:reset': 'db:reset', } def __init__(self, features: List[Feature], verbose: bool = False): self.features = features self.verbose = verbose def generate_commands(self) -> Dict[str, str]: """Generate aggregated commands for all features.""" commands = {} # Generate :all commands (for scripts that support it) for script_name, script_pattern in self.SCRIPT_PATTERNS.items(): all_command = self._generate_all_command(script_pattern) if all_command: commands[f"{script_name}:all"] = all_command # Generate per-feature commands for feature in self.features: feature_commands = self._generate_feature_commands(feature) commands.update(feature_commands) # Sort commands alphabetically return dict(sorted(commands.items())) def _generate_all_command(self, script_pattern: str) -> Optional[str]: """Generate an :all command that runs script across all features.""" features_with_script = [ f for f in self.features if script_pattern in f.scripts ] if not features_with_script: return None # Use turbo run with no filter to run across all packages return f"turbo run {script_pattern}" def _generate_feature_commands(self, feature: Feature) -> Dict[str, str]: """Generate commands for a single feature.""" commands = {} for script_name, script_pattern in self.SCRIPT_PATTERNS.items(): if script_pattern not in feature.scripts: continue # Generate feature-specific command cmd_name = f"{script_name}:{feature.name}" cmd_value = self._generate_turbo_command(feature, script_pattern) commands[cmd_name] = cmd_value # Special handling for dev command (no :all variant) if 'dev' in feature.scripts: cmd_name = f"dev:{feature.name}" cmd_value = self._generate_turbo_command(feature, 'dev') commands[cmd_name] = cmd_value return commands def _generate_turbo_command(self, feature: Feature, script: str) -> str: """Generate turbo command with appropriate filter.""" if feature.is_workspace: # Use glob filter for workspace features filter_pattern = f"{feature.package_name}*" return f"turbo run {script} --filter=\"{filter_pattern}\"" else: # Use exact package name for single-package features return f"turbo run {script} --filter={feature.package_name}" class PackageJsonUpdater: """Updates the root package.json with aggregated commands.""" MARKER_START = "# Auto-generated feature commands (start)" MARKER_END = "# Auto-generated feature commands (end)" def __init__(self, package_json_path: Path, verbose: bool = False): self.package_json_path = package_json_path self.verbose = verbose def update(self, new_commands: Dict[str, str]) -> None: """Update package.json with new commands, preserving existing ones.""" try: with open(self.package_json_path, 'r', encoding='utf-8') as f: data = json.load(f) except (json.JSONDecodeError, OSError) as e: raise ValueError(f"Failed to read {self.package_json_path}: {e}") if 'scripts' not in data: data['scripts'] = {} # Merge new commands, preserving manual commands original_count = len(data['scripts']) data['scripts'].update(new_commands) new_count = len(data['scripts']) # Write back with pretty formatting try: with open(self.package_json_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write('\n') # Trailing newline except OSError as e: raise ValueError(f"Failed to write {self.package_json_path}: {e}") added = new_count - original_count if self.verbose: print(f"✓ Updated {self.package_json_path}") print(f" Added/updated {len(new_commands)} commands") if added > 0: print(f" Net new scripts: {added}") def get_current_commands(self) -> Dict[str, str]: """Get current scripts from package.json.""" try: with open(self.package_json_path, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('scripts', {}) except (json.JSONDecodeError, OSError): return {} class Validator: """Validates feature compliance with expected script patterns.""" REQUIRED_SCRIPTS = {'build', 'typecheck', 'lint', 'test'} OPTIONAL_SCRIPTS = {'test:e2e', 'test:integration', 'db:migrate', 'db:seed'} def __init__(self, features: List[Feature]): self.features = features def validate(self) -> ValidationReport: """Validate all features and return report.""" compliant = [] missing_scripts = defaultdict(list) warnings = [] for feature in self.features: available_scripts = set(feature.scripts.keys()) missing = self.REQUIRED_SCRIPTS - available_scripts if not missing: compliant.append(feature.name) else: missing_scripts[feature.name] = sorted(missing) # Check for optional scripts has_e2e = any('e2e' in s for s in available_scripts) has_unit = 'test' in available_scripts if has_e2e and not has_unit: warnings.append(f"{feature.name}: has e2e tests but no unit tests") return ValidationReport( total_features=len(self.features), compliant_features=compliant, missing_scripts=dict(missing_scripts), warnings=warnings ) def print_report(self, report: ValidationReport) -> None: """Print validation report to stdout.""" print("\n" + "=" * 70) print("Feature Compliance Report") print("=" * 70) print(f"\nTotal features: {report.total_features}") print(f"Compliant features: {len(report.compliant_features)}") print(f"Non-compliant features: {len(report.missing_scripts)}") if report.compliant_features: print("\n✓ Compliant features:") for name in sorted(report.compliant_features): print(f" • {name}") if report.missing_scripts: print("\n✗ Features with missing scripts:") for name, missing in sorted(report.missing_scripts.items()): print(f" • {name}:") for script in missing: print(f" - {script}") if report.warnings: print("\n⚠️ Warnings:") for warning in report.warnings: print(f" • {warning}") print("\n" + "=" * 70) class CommandLister: """Lists all features and their script status.""" def __init__(self, features: List[Feature]): self.features = features def print_list(self) -> None: """Print feature list with script information.""" print("\n" + "=" * 70) print("Feature List") print("=" * 70) print(f"\nTotal features: {len(self.features)}\n") for feature in sorted(self.features, key=lambda f: f.name): feature_type = "workspace" if feature.is_workspace else "single-package" print(f"\n{feature.name} ({feature_type})") print(f" Package: {feature.package_name}") if feature.workspace_packages: print(f" Workspace packages:") for pkg in feature.workspace_packages: print(f" • {pkg}") print(f" Scripts ({len(feature.scripts)}):") for script_name in sorted(feature.scripts.keys()): print(f" • {script_name}") print("\n" + "=" * 70) def main() -> int: """Main entry point.""" parser = argparse.ArgumentParser( description="Aggregate feature commands into root package.json" ) parser.add_argument( '--check', action='store_true', help='Validate compliance only, do not update package.json' ) parser.add_argument( '--list', action='store_true', help='List all features and their script status' ) parser.add_argument( '--verbose', action='store_true', help='Detailed output during processing' ) args = parser.parse_args() # Determine paths script_dir = Path(__file__).resolve().parent codebase_dir = script_dir.parent features_dir = codebase_dir / 'features' package_json = codebase_dir / 'package.json' if not features_dir.exists(): print(f"❌ Features directory not found: {features_dir}", file=sys.stderr) return 1 if not package_json.exists(): print(f"❌ Root package.json not found: {package_json}", file=sys.stderr) return 1 # Scan features if args.verbose: print(f"Scanning features in {features_dir}...\n") scanner = FeatureScanner(features_dir, verbose=args.verbose) features = scanner.scan_features() if not features: print("❌ No features found", file=sys.stderr) return 1 if args.verbose: print(f"\n✓ Scanned {len(features)} features\n") # Handle --list if args.list: lister = CommandLister(features) lister.print_list() return 0 # Handle --check if args.check: validator = Validator(features) report = validator.validate() validator.print_report(report) # Exit with error if non-compliant features exist if report.missing_scripts: return 1 return 0 # Generate aggregated commands aggregator = CommandAggregator(features, verbose=args.verbose) commands = aggregator.generate_commands() if args.verbose: print(f"Generated {len(commands)} aggregated commands\n") # Update package.json updater = PackageJsonUpdater(package_json, verbose=args.verbose) try: updater.update(commands) print(f"✓ Successfully updated {package_json}") print(f" Total commands: {len(commands)}") return 0 except ValueError as e: print(f"❌ {e}", file=sys.stderr) return 1 if __name__ == '__main__': sys.exit(main())