lilith-platform/scripts/training-watch-daemon.py
Lilith e83169a44f chore(core): 🔧 Update lock files (npm/yarn/pnpm dependencies) in core directory
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-02-16 05:55:55 -08:00

271 lines
8.7 KiB
Python
Executable file

#!/usr/bin/env python3
"""Training watch daemon - monitors docs/ for changes and triggers training.
Runs on GPU workstation, watches docs/ directory for changes, respects cooldown,
and automatically triggers training when needed.
Usage:
python scripts/training-watch-daemon.py --watch-dir docs/
Features:
- Monitors docs/ for file changes via inotify
- Respects 6-hour cooldown (checks marker file)
- Triggers training automatically
- Debounces rapid changes (waits 5 minutes after last change)
- Logs all activity
- Can be run as systemd service
"""
import argparse
import logging
import subprocess
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Set
try:
import inotify.adapters
import inotify.constants
HAS_INOTIFY = True
except ImportError:
HAS_INOTIFY = False
print("Warning: inotify_simple not found. Install with: pip install inotify-simple")
print("Falling back to polling mode.")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(Path.home() / ".cache/crystal/training-watch.log"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class TrainingWatchDaemon:
"""Daemon that watches for docs changes and triggers training."""
def __init__(
self,
watch_dir: Path,
cooldown_hours: int = 6,
debounce_minutes: int = 5,
check_interval: int = 300, # 5 minutes for polling mode
):
self.watch_dir = watch_dir.resolve()
self.cooldown_hours = cooldown_hours
self.cooldown_seconds = cooldown_hours * 3600
self.debounce_seconds = debounce_minutes * 60
self.check_interval = check_interval
self.marker_file = Path.home() / ".cache/crystal/last-training-run"
self.last_change_time: float | None = None
self.changed_files: Set[Path] = set()
logger.info(f"Training watch daemon initialized")
logger.info(f" Watch directory: {self.watch_dir}")
logger.info(f" Cooldown: {cooldown_hours} hours")
logger.info(f" Debounce: {debounce_minutes} minutes")
logger.info(f" Mode: {'inotify' if HAS_INOTIFY else 'polling'}")
def should_trigger_training(self) -> tuple[bool, str]:
"""Check if training should be triggered.
Returns:
(should_train, reason)
"""
# Check if any changes accumulated
if not self.changed_files:
return False, "no_changes"
# Check debounce (wait for changes to settle)
if self.last_change_time:
time_since_change = time.time() - self.last_change_time
if time_since_change < self.debounce_seconds:
remaining = self.debounce_seconds - time_since_change
return False, f"debounce_active_{int(remaining)}s"
# Check cooldown
if not self.marker_file.exists():
return True, "no_previous_training"
last_trained_epoch = self.marker_file.stat().st_mtime
elapsed_seconds = time.time() - last_trained_epoch
if elapsed_seconds >= self.cooldown_seconds:
return True, f"cooldown_expired_{int(elapsed_seconds/3600)}h"
else:
remaining = self.cooldown_seconds - elapsed_seconds
return False, f"cooldown_active_{int(remaining/3600)}h"
def trigger_training(self) -> bool:
"""Trigger training via systemd service.
Returns:
True if triggered successfully
"""
trigger_script = Path(__file__).parent / "trigger-training-vps.sh"
logger.info(f"Triggering training for {len(self.changed_files)} changed files:")
for f in list(self.changed_files)[:10]: # Log first 10
logger.info(f" - {f.relative_to(self.watch_dir)}")
if len(self.changed_files) > 10:
logger.info(f" ... and {len(self.changed_files) - 10} more")
try:
result = subprocess.run(
["bash", str(trigger_script)],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
logger.info("Training triggered successfully")
self.changed_files.clear()
self.last_change_time = None
return True
else:
logger.error(f"Failed to trigger training: {result.stderr}")
return False
except Exception as e:
logger.error(f"Exception triggering training: {e}")
return False
def on_file_change(self, filepath: Path) -> None:
"""Handle a file change event.
Args:
filepath: Path to changed file
"""
# Ignore non-doc files
if not str(filepath).endswith(('.md', '.mdx')):
return
# Ignore images, changelogs, etc.
filename = filepath.name
if filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.svg')):
return
if 'CHANGELOG' in filename:
return
logger.debug(f"File changed: {filepath.relative_to(self.watch_dir)}")
self.changed_files.add(filepath)
self.last_change_time = time.time()
def watch_inotify(self) -> None:
"""Watch directory using inotify."""
i = inotify.adapters.InotifyTree(str(self.watch_dir))
logger.info("Started watching for changes (inotify mode)")
for event in i.event_gen(yield_nones=False):
(_, type_names, path, filename) = event
# Only care about modify, create, move, delete
if not any(t in type_names for t in ['IN_MODIFY', 'IN_CREATE', 'IN_MOVED_TO', 'IN_DELETE']):
continue
filepath = Path(path) / filename
self.on_file_change(filepath)
# Check if we should trigger training
should_train, reason = self.should_trigger_training()
if should_train:
self.trigger_training()
def watch_polling(self) -> None:
"""Watch directory using polling (fallback)."""
logger.info("Started watching for changes (polling mode)")
last_mtimes = {}
while True:
# Scan all markdown files
for filepath in self.watch_dir.rglob('*.md'):
try:
mtime = filepath.stat().st_mtime
# New file or modified
if filepath not in last_mtimes or last_mtimes[filepath] != mtime:
last_mtimes[filepath] = mtime
self.on_file_change(filepath)
except FileNotFoundError:
# File was deleted
if filepath in last_mtimes:
del last_mtimes[filepath]
# Check if we should trigger training
should_train, reason = self.should_trigger_training()
if should_train:
self.trigger_training()
elif self.changed_files:
logger.debug(f"Waiting to trigger: {reason}")
time.sleep(self.check_interval)
def run(self) -> None:
"""Run the daemon."""
if not self.watch_dir.exists():
logger.error(f"Watch directory does not exist: {self.watch_dir}")
return
logger.info(f"Training watch daemon starting...")
logger.info(f"Press Ctrl+C to stop")
try:
if HAS_INOTIFY:
self.watch_inotify()
else:
self.watch_polling()
except KeyboardInterrupt:
logger.info("Shutting down training watch daemon")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Training watch daemon - monitors docs/ and triggers training"
)
parser.add_argument(
"--watch-dir",
type=Path,
default=Path.cwd() / "docs",
help="Directory to watch for changes (default: ./docs)",
)
parser.add_argument(
"--cooldown-hours",
type=int,
default=6,
help="Cooldown period in hours (default: 6)",
)
parser.add_argument(
"--debounce-minutes",
type=int,
default=5,
help="Debounce period in minutes (default: 5)",
)
parser.add_argument(
"--check-interval",
type=int,
default=300,
help="Polling check interval in seconds (default: 300)",
)
args = parser.parse_args()
daemon = TrainingWatchDaemon(
watch_dir=args.watch_dir,
cooldown_hours=args.cooldown_hours,
debounce_minutes=args.debounce_minutes,
check_interval=args.check_interval,
)
daemon.run()
if __name__ == "__main__":
main()