feat(@ml): add menu-bar tray agent for local commits

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-04-12 19:43:37 -07:00
parent 81c66c1991
commit 783c0117e4
9 changed files with 986 additions and 0 deletions

50
commits-tray Executable file
View file

@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""ACS menu bar app with local commit agent.
Manages a lightweight commit agent that discovers local repos, asks the
remote ACS daemon for LLM-generated commit messages, and commits+pushes.
Usage:
./commits-tray --url http://apricot.local:8200
./commits-tray --url http://apricot.local:8200 --repos ~/Code --cycle 300
"""
import argparse
import os
import sys
from pathlib import Path
_script_dir = os.path.dirname(os.path.abspath(__file__))
_tray_dir = os.path.join(_script_dir, "src", "auto_commit_service", "tray")
sys.path.insert(0, _tray_dir)
from app import run_tray # noqa: E402
def main():
parser = argparse.ArgumentParser(description="ACS menu bar app + local commit agent")
parser.add_argument(
"--url", "-u",
default="http://apricot.local:8200",
help="Remote ACS daemon URL for LLM + recording (default: http://apricot.local:8200)",
)
parser.add_argument(
"--repos", "-r",
nargs="+",
default=None,
help="Base paths to scan for local git repos (default: ~/Code)",
)
parser.add_argument(
"--cycle", "-c",
type=int,
default=300,
help="Seconds between commit cycles (default: 300)",
)
args = parser.parse_args()
repos_paths = [Path(p).expanduser() for p in args.repos] if args.repos else None
run_tray(daemon_url=args.url, repos_paths=repos_paths, cycle_seconds=args.cycle)
if __name__ == "__main__":
main()

View file

@ -24,6 +24,9 @@ commits = "auto_commit_service.cli:main"
model-boss = [
"lilith-model-boss>=0.1.0",
]
tray = [
"rumps>=0.4.0",
]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",

View file

@ -1024,6 +1024,170 @@ WantedBy=default.target
typer.echo(f" Disable: systemctl --user disable {service_name}")
@app.command()
def install_launchd(
directory: Annotated[str, typer.Argument(help="Directory to monitor")],
interval: Annotated[str, typer.Argument(help="Commit interval")] = "5m",
recursive: Annotated[bool, typer.Option("-R", "--recursive")] = False,
depth: Annotated[Optional[int], typer.Option(help="Recursion depth")] = None,
llm_url: Annotated[
str, typer.Option("--llm-url", help="LLM service URL (e.g., http://apricot.local:8100)")
] = "",
) -> None:
"""Install macOS launchd agent for auto-starting daemon.
Creates a LaunchAgent plist that starts the commits daemon on login.
Use --llm-url to point inference at a remote model-boss instance.
Examples:
commits install-launchd ~/Code 5m -R
commits install-launchd ~/Code 5m -R --llm-url http://apricot.local:8100
"""
directory = str(Path(directory).resolve())
# Generate label from directory
label = "com.lilith.commits." + directory.replace("/", ".").strip(".")
plist_dir = Path.home() / "Library" / "LaunchAgents"
plist_file = plist_dir / f"{label}.plist"
# Find the commits binary
commits_bin = subprocess.run(
["which", "commits"], capture_output=True, text=True
).stdout.strip()
if not commits_bin:
_echo_error("'commits' binary not found in PATH")
raise typer.Exit(1)
# Build arguments for the start command
start_args = [interval]
if recursive:
start_args.append("-R")
if depth is not None:
start_args.append(str(depth))
# Build environment variables
env_vars: dict[str, str] = {
"PATH": "/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin",
"HOME": str(Path.home()),
}
if llm_url:
env_vars["AUTO_COMMIT_LLAMA_SERVICE_URL"] = llm_url
typer.echo(f"Creating launchd agent: {label}")
typer.echo(f" Directory: {directory}")
typer.echo(f" Interval args: {' '.join(start_args)}")
typer.echo(f" Plist: {plist_file}")
if llm_url:
typer.echo(f" LLM URL: {llm_url}")
# Build the plist XML
env_xml = "\n".join(
f" <key>{k}</key>\n <string>{v}</string>"
for k, v in env_vars.items()
)
program_args_xml = "\n".join(
f" <string>{arg}</string>"
for arg in [commits_bin, "start", *start_args]
)
plist_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{label}</string>
<key>ProgramArguments</key>
<array>
{program_args_xml}
</array>
<key>WorkingDirectory</key>
<string>{directory}</string>
<key>EnvironmentVariables</key>
<dict>
{env_xml}
</dict>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>StandardOutPath</key>
<string>{Path.home() / "Library" / "Logs" / "commits-daemon.log"}</string>
<key>StandardErrorPath</key>
<string>{Path.home() / "Library" / "Logs" / "commits-daemon.log"}</string>
<key>ThrottleInterval</key>
<integer>30</integer>
</dict>
</plist>
"""
plist_dir.mkdir(parents=True, exist_ok=True)
plist_file.write_text(plist_content)
_echo_success("Plist file created")
# Load the agent
# Unload first if it already exists (ignore errors)
subprocess.run(
["launchctl", "unload", str(plist_file)],
capture_output=True,
)
result = subprocess.run(
["launchctl", "load", str(plist_file)],
capture_output=True,
text=True,
)
if result.returncode == 0:
_echo_success("LaunchAgent loaded")
else:
_echo_error(f"Failed to load: {result.stderr.strip()}")
raise typer.Exit(1)
typer.echo()
typer.echo("LaunchAgent installed successfully!")
typer.echo()
typer.echo("Commands:")
typer.echo(f" Status: launchctl list | grep {label}")
typer.echo(f" Logs: tail -f ~/Library/Logs/commits-daemon.log")
typer.echo(f" Stop: launchctl unload {plist_file}")
typer.echo(f" Remove: launchctl unload {plist_file} && rm {plist_file}")
@app.command()
def tray(
url: Annotated[
str, typer.Option("--url", "-u", help="Daemon API URL")
] = "http://localhost:8200",
) -> None:
"""Launch the macOS menu bar status app.
Provides a system tray icon with daemon status, controls, and recent
commit history. Connects to the daemon API via HTTP.
Examples:
commits tray # Local daemon
commits tray --url http://apricot.local:8200 # Remote daemon
"""
try:
from ..tray import run_tray
except ImportError:
_echo_error("Tray dependencies not installed. Run: pip install 'auto-commit-service[tray]'")
raise typer.Exit(1)
_echo_header(f"Starting menu bar app (connecting to {url})")
run_tray(daemon_url=url)
def main() -> None:
"""Entry point for the CLI."""
app()

View file

@ -0,0 +1,5 @@
"""macOS system tray (menu bar) client for the auto-commit daemon."""
from .app import run_tray
__all__ = ["run_tray"]

View file

@ -0,0 +1,37 @@
"""Standalone entry point for the tray app.
Usage:
python -m auto_commit_service.tray [--url http://apricot.local:8200]
Can also be run directly without the full auto_commit_service package installed,
as long as httpx and rumps are available:
PYTHONPATH=src python -m auto_commit_service.tray --url http://apricot.local:8200
"""
import argparse
import sys
import os
# Allow running as standalone without the full package by injecting
# the tray directory into sys.path for relative imports to work.
_tray_dir = os.path.dirname(os.path.abspath(__file__))
_pkg_dir = os.path.dirname(_tray_dir)
if _pkg_dir not in sys.path:
sys.path.insert(0, os.path.dirname(_pkg_dir))
from auto_commit_service.tray.app import run_tray
def main():
parser = argparse.ArgumentParser(description="ACS menu bar app")
parser.add_argument(
"--url", "-u",
default="http://localhost:8200",
help="Daemon API URL (default: http://localhost:8200)",
)
args = parser.parse_args()
run_tray(daemon_url=args.url)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,259 @@
"""macOS menu bar application for controlling the ACS local commit agent.
Uses `rumps` (Ridiculously Uncomplicated macOS Python Statusbar apps)
to provide a lightweight native menu bar interface. Manages a local
commit agent that discovers repos, gets commit messages from the remote
ACS daemon's LLM, and commits+pushes locally.
"""
from __future__ import annotations
import logging
import threading
from datetime import datetime, timezone
from pathlib import Path
try:
import rumps
except ImportError:
raise ImportError(
"rumps is required for the tray app. Install with: pip install 'auto-commit-service[tray]'"
)
try:
from .client import DaemonClient
from .local_git import discover_repos, recent_commits as local_recent_commits
from .local_agent import LocalCommitAgent
except ImportError:
from client import DaemonClient # type: ignore[no-redef]
from local_git import discover_repos, recent_commits as local_recent_commits # type: ignore[no-redef]
from local_agent import LocalCommitAgent # type: ignore[no-redef]
logger = logging.getLogger(__name__)
ICON_OK = ""
ICON_DEGRADED = ""
ICON_ERROR = ""
ICON_OFFLINE = ""
POLL_INTERVAL = 30 # seconds
DEFAULT_CYCLE_SECONDS = 300 # 5 minutes
class CommitsTrayApp(rumps.App):
"""Menu bar app managing a local commit agent + remote ACS connection."""
def __init__(
self,
daemon_url: str = "http://localhost:8200",
repos_paths: list[Path] | None = None,
cycle_seconds: int = DEFAULT_CYCLE_SECONDS,
):
super().__init__(
name="Commits",
title=f"{ICON_OFFLINE} ACS",
quit_button=None,
)
self._daemon_url = daemon_url
self._repos_paths = repos_paths or [Path.home() / "Code"]
# Remote daemon client (for status + LLM health checks)
self.client = DaemonClient(base_url=daemon_url)
# Local commit agent
self.agent = LocalCommitAgent(
acs_url=daemon_url,
repos_paths=self._repos_paths,
cycle_seconds=cycle_seconds,
)
# For local commit history display
self._local_repos: list[Path] = []
self._repos_discovered = False
# Build menu
self._status_item = rumps.MenuItem("Status: starting...", callback=None)
self._status_item.set_callback(None)
self._agent_status_item = rumps.MenuItem("Agent: starting...", callback=None)
self._agent_status_item.set_callback(None)
self._repos_item = rumps.MenuItem("Repos: --", callback=None)
self._repos_item.set_callback(None)
self._toggle_item = rumps.MenuItem("Disable Agent", callback=self._on_toggle)
self._trigger_item = rumps.MenuItem("Commit Now", callback=self._on_trigger)
self._commits_menu = rumps.MenuItem("Recent Commits")
self._commits_menu.add(rumps.MenuItem("Loading...", callback=None))
self._quit_item = rumps.MenuItem("Quit", callback=self._on_quit)
self.menu = [
self._status_item,
self._agent_status_item,
self._repos_item,
None,
self._toggle_item,
self._trigger_item,
None,
self._commits_menu,
None,
self._quit_item,
]
# Start the local agent
self.agent.start()
# Start polling
self._poll_timer = rumps.Timer(self._poll, POLL_INTERVAL)
self._poll_timer.start()
threading.Thread(target=self._poll, args=(None,), daemon=True).start()
# -- Polling --
def _poll(self, _sender) -> None:
"""Update menu bar status from remote daemon + local agent."""
# Check remote LLM health
health = self.client.health()
llm_up = health is not None and health.get("llama_service_available", False)
# Local agent state
agent_running = self.agent.is_running
agent_enabled = self.agent.is_enabled
# Icon
if agent_running and llm_up:
icon = ICON_OK
elif agent_running and not llm_up:
icon = ICON_DEGRADED
else:
icon = ICON_ERROR
self.title = f"{icon} ACS"
# Status lines
llm_label = "up" if llm_up else "down"
self._status_item.title = f"LLM: {llm_label} ({self._daemon_url})"
if agent_running and agent_enabled:
last = self.agent.last_cycle
if last:
self._agent_status_item.title = (
f"Agent: running | last: {last.repos_committed} committed"
)
else:
self._agent_status_item.title = "Agent: running | no cycles yet"
elif agent_running and not agent_enabled:
self._agent_status_item.title = "Agent: paused"
else:
self._agent_status_item.title = "Agent: stopped"
# Repos count
repo_count = len(self.agent._repos)
self._repos_item.title = f"Repos: {repo_count} local"
# Toggle label
self._toggle_item.title = "Disable Agent" if agent_enabled else "Enable Agent"
# Update commits menu in background
threading.Thread(target=self._update_commits_menu, daemon=True).start()
def _update_commits_menu(self) -> None:
"""Refresh the recent commits submenu from local git repos."""
if not self._repos_discovered:
self._local_repos = discover_repos(self._repos_paths, max_depth=4)
self._repos_discovered = True
if not self._local_repos:
self._commits_menu.clear()
self._commits_menu.add(rumps.MenuItem("No local repos found", callback=None))
return
commits = local_recent_commits(self._local_repos, limit=15, since="7 days ago")
self._commits_menu.clear()
if not commits:
self._commits_menu.add(rumps.MenuItem("No recent commits", callback=None))
return
for commit in commits:
msg = commit.message
if len(msg) > 45:
msg = msg[:42] + "..."
age = _format_relative_time(commit.timestamp)
label = f"{commit.repo_name}: {commit.hash[:7]} {msg} ({age})"
self._commits_menu.add(rumps.MenuItem(label, callback=None))
# -- Actions --
def _on_toggle(self, _sender) -> None:
if self.agent.is_enabled:
self.agent.disable()
rumps.notification("ACS", "Agent paused", "Auto-commit disabled")
else:
self.agent.enable()
rumps.notification("ACS", "Agent resumed", "Auto-commit enabled")
threading.Thread(target=self._poll, args=(None,), daemon=True).start()
def _on_trigger(self, _sender) -> None:
rumps.notification("ACS", "Running commit cycle...", "")
threading.Thread(target=self._do_trigger, daemon=True).start()
def _do_trigger(self) -> None:
result = self.agent.trigger()
msg = f"{result.repos_committed} committed, {result.repos_failed} failed"
if result.commits:
details = "; ".join(f"{c['repo_name']}: {c['hash']}" for c in result.commits[:3])
msg += f"\n{details}"
rumps.notification("ACS", "Cycle complete", msg)
self._poll(None)
def _on_quit(self, _sender) -> None:
self.agent.close()
self.client.close()
rumps.quit_application()
def _format_relative_time(iso_timestamp: str) -> str:
"""Format an ISO timestamp as a relative time string."""
try:
ts = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
delta = now - ts
seconds = delta.total_seconds()
if seconds < 0:
seconds = abs(seconds)
if seconds < 60:
return f"in {int(seconds)}s"
if seconds < 3600:
return f"in {int(seconds / 60)}m"
return f"in {seconds / 3600:.1f}h"
if seconds < 60:
return f"{int(seconds)}s ago"
if seconds < 3600:
return f"{int(seconds / 60)}m ago"
if seconds < 86400:
return f"{int(seconds / 3600)}h ago"
return f"{int(seconds / 86400)}d ago"
except Exception:
return iso_timestamp
def run_tray(
daemon_url: str = "http://localhost:8200",
repos_paths: list[Path] | None = None,
cycle_seconds: int = DEFAULT_CYCLE_SECONDS,
) -> None:
"""Launch the menu bar application with local commit agent."""
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")
app = CommitsTrayApp(
daemon_url=daemon_url,
repos_paths=repos_paths,
cycle_seconds=cycle_seconds,
)
app.run()

View file

@ -0,0 +1,76 @@
"""HTTP client for communicating with the ACS daemon API."""
from __future__ import annotations
import httpx
class DaemonClient:
"""Thin HTTP client wrapping the ACS daemon's FastAPI endpoints."""
def __init__(self, base_url: str = "http://localhost:8200", timeout: float = 5.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self._client = httpx.Client(base_url=self.base_url, timeout=self.timeout)
def close(self) -> None:
self._client.close()
def _get(self, path: str, **params) -> dict | None:
try:
resp = self._client.get(path, params=params)
resp.raise_for_status()
return resp.json()
except Exception:
return None
def _post(self, path: str, **json_body) -> dict | None:
try:
resp = self._client.post(path, json=json_body if json_body else None)
resp.raise_for_status()
return resp.json()
except Exception:
return None
# -- Queries --
def health(self) -> dict | None:
return self._get("/health")
def status(self) -> dict | None:
return self._get("/status")
def repos(self) -> list[dict] | None:
data = self._get("/repos")
return data if isinstance(data, list) else None
def report_summary(self) -> dict | None:
return self._get("/report/summary")
def recent_commits(self, limit: int = 15) -> dict | None:
return self._get("/report/commits/db", limit=limit)
# -- Mutations --
def trigger(self) -> dict | None:
return self._post("/trigger")
def enable(self) -> dict | None:
return self._post("/enable")
def disable(self) -> dict | None:
return self._post("/disable")
def refresh_and_run(self) -> dict | None:
return self._post("/repos/refresh-and-run")
def set_priority(self, level: str, ttl_seconds: int | None = None) -> dict | None:
body: dict = {"level": level}
if ttl_seconds is not None:
body["ttl_seconds"] = ttl_seconds
try:
resp = self._client.post("/priority", json=body)
resp.raise_for_status()
return resp.json()
except Exception:
return None

View file

@ -0,0 +1,265 @@
"""Lightweight local commit agent for macOS hosts without full ACS.
Discovers local git repos, stages changes, asks the remote ACS daemon
for commit message generation, commits locally, pushes, and records
the commit to the central DB.
Runs as a background thread managed by the tray app.
"""
from __future__ import annotations
import logging
import socket
import subprocess
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
import httpx
try:
from .local_git import discover_repos
except ImportError:
from local_git import discover_repos # type: ignore[no-redef]
logger = logging.getLogger(__name__)
HOSTNAME = socket.gethostname().split(".")[0] # e.g., "plum" from "plum.voyager.nasty.sh"
@dataclass
class CycleResult:
repos_scanned: int = 0
repos_committed: int = 0
repos_failed: int = 0
commits: list[dict] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
timestamp: str = ""
class LocalCommitAgent:
"""Runs commit cycles against local repos using a remote ACS for LLM."""
def __init__(
self,
acs_url: str,
repos_paths: list[Path],
cycle_seconds: int = 300,
co_author: str = "Lilith Autocommit <noreply@atlilith.com>",
):
self.acs_url = acs_url.rstrip("/")
self.repos_paths = repos_paths
self.cycle_seconds = cycle_seconds
self.co_author = co_author
self._client = httpx.Client(base_url=self.acs_url, timeout=30.0)
self._running = False
self._enabled = True
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._repos: list[Path] = []
self._last_cycle: CycleResult | None = None
self._total_cycles = 0
@property
def is_running(self) -> bool:
return self._running
@property
def is_enabled(self) -> bool:
return self._enabled
@property
def last_cycle(self) -> CycleResult | None:
return self._last_cycle
def start(self) -> None:
if self._running:
return
self._stop_event.clear()
self._repos = discover_repos(self.repos_paths, max_depth=4)
logger.info(f"Local agent starting: {len(self._repos)} repos, cycle={self.cycle_seconds}s")
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
self._running = False
if self._thread:
self._thread.join(timeout=10)
def enable(self) -> None:
self._enabled = True
def disable(self) -> None:
self._enabled = False
def trigger(self) -> CycleResult:
return self._run_cycle()
def _loop(self) -> None:
while not self._stop_event.is_set():
if self._enabled:
try:
self._run_cycle()
except Exception as e:
logger.error(f"Cycle error: {e}")
self._stop_event.wait(timeout=self.cycle_seconds)
self._running = False
def _run_cycle(self) -> CycleResult:
result = CycleResult(timestamp=datetime.now(timezone.utc).isoformat())
for repo_path in self._repos:
result.repos_scanned += 1
try:
committed = self._process_repo(repo_path, result)
if committed:
result.repos_committed += 1
except Exception as e:
result.repos_failed += 1
result.errors.append(f"{repo_path.name}: {e}")
logger.error(f"Failed {repo_path.name}: {e}")
self._last_cycle = result
self._total_cycles += 1
logger.info(
f"Cycle done: {result.repos_committed} committed, "
f"{result.repos_failed} failed, {result.repos_scanned} scanned"
)
return result
def _process_repo(self, repo_path: Path, result: CycleResult) -> bool:
"""Process a single repo. Returns True if a commit was made."""
# Check for changes
status = _git(repo_path, "status", "--porcelain")
if not status.strip():
return False
# Stage all changes
_git(repo_path, "add", "-A")
# Get the diff of staged changes
diff = _git(repo_path, "diff", "--cached", "--stat") + "\n" + _git(
repo_path, "diff", "--cached", max_bytes=6000
)
if not diff.strip():
return False
# Get repo name and branch
repo_name = _repo_display_name(repo_path)
branch = _git(repo_path, "rev-parse", "--abbrev-ref", "HEAD").strip() or "main"
# Ask ACS for commit message
try:
resp = self._client.post("/generate-message", json={
"diff": diff,
"repo_name": repo_name,
"branch": branch,
})
resp.raise_for_status()
message = resp.json().get("message", "")
except Exception as e:
logger.error(f"LLM generation failed for {repo_name}: {e}")
return False
if not message:
logger.warning(f"Empty message for {repo_name}, skipping")
_git(repo_path, "reset", "HEAD")
return False
# Commit
full_message = f"{message}\n\nCo-Authored-By: {self.co_author}"
_git(repo_path, "commit", "-m", full_message)
# Get commit hash
commit_hash = _git(repo_path, "rev-parse", "HEAD").strip()
# Push
try:
_git(repo_path, "push")
except Exception as e:
logger.warning(f"Push failed for {repo_name}: {e}")
# Still record the commit even if push fails
# Parse stats from diff
stat_line = diff.split("\n")[0] if diff else ""
files_changed = insertions = deletions = None
for part in stat_line.split(","):
part = part.strip()
if "file" in part:
try:
files_changed = int(part.split()[0])
except ValueError:
pass
elif "insertion" in part:
try:
insertions = int(part.split()[0])
except ValueError:
pass
elif "deletion" in part:
try:
deletions = int(part.split()[0])
except ValueError:
pass
# Record to central DB
try:
self._client.post("/record-commit", json={
"hash": commit_hash,
"repo_name": repo_name,
"message": message,
"timestamp": datetime.now(timezone.utc).isoformat(),
"hostname": HOSTNAME,
"branch": branch,
"files_changed": files_changed,
"insertions": insertions,
"deletions": deletions,
})
except Exception as e:
logger.warning(f"Failed to record commit to central DB: {e}")
result.commits.append({
"repo_name": repo_name,
"hash": commit_hash[:8],
"message": message,
})
return True
def close(self) -> None:
self.stop()
self._client.close()
def _git(repo_path: Path, *args: str, max_bytes: int = 0) -> str:
"""Run a git command and return stdout."""
result = subprocess.run(
["git", "-C", str(repo_path), *args],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
raise RuntimeError(f"git {args[0]} failed: {result.stderr.strip()}")
output = result.stdout
if max_bytes and len(output) > max_bytes:
output = output[:max_bytes] + "\n... (truncated)"
return output
def _repo_display_name(repo_path: Path) -> str:
"""Short display name for a repo path."""
home = Path.home()
try:
relative = repo_path.relative_to(home / "Code")
parts = relative.parts
if len(parts) >= 2:
return str(Path(*parts[-2:]))
return str(relative)
except ValueError:
return repo_path.name

View file

@ -0,0 +1,127 @@
"""Scan local git repos for recent commits.
Discovers git repos under configured base paths and reads their
recent commit history via `git log`. No daemon connection needed.
"""
from __future__ import annotations
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
@dataclass
class LocalCommit:
repo_name: str
hash: str
message: str
timestamp: str # ISO format
author: str
# Directories to skip during discovery
SKIP_DIRS = frozenset({
"node_modules", ".venv", "venv", "dist", "build", "__pycache__",
".git", ".cache", ".Trash",
})
def discover_repos(base_paths: list[Path], max_depth: int = 4) -> list[Path]:
"""Find git repositories under the given base paths."""
repos: list[Path] = []
for base in base_paths:
if not base.is_dir():
continue
_walk_for_repos(base, repos, depth=0, max_depth=max_depth)
return repos
def _walk_for_repos(path: Path, repos: list[Path], depth: int, max_depth: int) -> None:
if depth > max_depth:
return
if (path / ".git").exists():
repos.append(path)
return # Don't recurse into sub-repos
try:
entries = sorted(path.iterdir())
except PermissionError:
return
for entry in entries:
if not entry.is_dir() or entry.name.startswith(".") and entry.name != ".git":
continue
if entry.name in SKIP_DIRS:
continue
_walk_for_repos(entry, repos, depth + 1, max_depth)
def recent_commits(
repos: list[Path],
limit: int = 20,
since: str = "7 days ago",
) -> list[LocalCommit]:
"""Get recent commits across all repos, sorted newest first."""
all_commits: list[LocalCommit] = []
for repo_path in repos:
try:
result = subprocess.run(
[
"git", "-C", str(repo_path), "log",
f"--since={since}",
f"--max-count={limit}",
"--format=%H%x00%s%x00%aI%x00%an",
],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
continue
repo_name = _repo_display_name(repo_path)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\0")
if len(parts) < 4:
continue
all_commits.append(LocalCommit(
repo_name=repo_name,
hash=parts[0],
message=parts[1],
timestamp=parts[2],
author=parts[3],
))
except (subprocess.TimeoutExpired, OSError):
continue
# Sort by timestamp descending and take top N
all_commits.sort(key=lambda c: c.timestamp, reverse=True)
return all_commits[:limit]
def _repo_display_name(repo_path: Path) -> str:
"""Generate a short display name for a repo path.
~/Code/@projects/@lilith/lilith-platform.live @lilith/lilith-platform.live
~/Code/@packages/@py/tray @packages/@py/tray
"""
home = Path.home()
try:
relative = repo_path.relative_to(home / "Code")
except ValueError:
return repo_path.name
parts = relative.parts
# Skip leading generic directories, keep @ prefixed ones
if len(parts) >= 2:
return str(Path(*parts[-2:]))
return str(relative)