auto-commit-service/src/auto_commit_service/scheduler/daemon.py

735 lines
27 KiB
Python

"""Main daemon loop for automated commits."""
import asyncio
import logging
import uuid
from collections import deque
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from ..commit_history import CommitHistory
from ..config import AutoCommitSettings
from ..git import Repository, discover_git_repos, git_status
from ..llm import LlamaCommitClient
from ..models import CycleResult, ProcessStatus, RepoProcessResult
from ..recovery import ErrorHandler
from ..service import LlamaServiceManager, ServiceHealth
from .processor import CommitProcessor
logger = logging.getLogger(__name__)
class CommitDaemon:
"""Main daemon that runs commit cycles."""
def __init__(
self,
settings: AutoCommitSettings,
llm_client: LlamaCommitClient,
error_handler: ErrorHandler | None = None,
):
"""Initialize the daemon.
Args:
settings: Service settings
llm_client: LLM client for message generation
error_handler: Optional error handler for recovery
"""
self.settings = settings
self.llm_client = llm_client
self.error_handler = error_handler
# Initialize service manager
if settings.llama_service_autostart:
logger.info(f"Initializing service manager (autostart enabled)")
self.service_manager = LlamaServiceManager(
service_url=settings.llama_service_url,
pid_file=settings.llama_service_pid_file,
lock_file=settings.llama_service_lock_file,
startup_timeout=settings.llama_service_startup_timeout,
health_check_timeout=5.0,
model_id=settings.llama_model_id,
use_model_boss=settings.use_model_boss,
)
else:
logger.warning("Service manager disabled (autostart=False)")
self.service_manager = None
# Initialize processor
self.processor = CommitProcessor(
llm_client=llm_client,
settings=settings,
error_handler=error_handler,
)
# Build repository list
self.repos = self._build_repos()
# Cache tracking
self._cache_updated_at: datetime | None = None
# Cycle history (keep last 100 cycles)
self._cycle_history: deque[CycleResult] = deque(maxlen=100)
self._total_cycles = 0
# Persistent commit history (survives reboots)
self.commit_history = CommitHistory(
file_path=settings.commit_history_file,
max_size=settings.commit_history_max_per_daemon * 10, # Global pool for all daemons
)
# State
self._running = False
self._enabled = settings.enabled
self._task: asyncio.Task | None = None
self._last_cycle: CycleResult | None = None
self._next_cycle_at: datetime | None = None
# Service health tracking
self._service_crashed = False
self._service_health: ServiceHealth | None = None
self._last_health_check: datetime | None = None
self._last_health_check_cycle = 0 # Track cycles since last check
self._service_restart_attempts = 0
self._last_crash_time: datetime | None = None
self._last_successful_restart: datetime | None = None
def _build_repos(self) -> list[Repository]:
"""Build the list of repositories to process."""
if self.settings.recursive_discovery:
# New mode: discover recursively
return self._discover_and_cache_repos()
elif self.settings.repo_names:
# Legacy mode: use explicit list
return self._build_repos_from_names()
else:
logger.warning("No repos configured (no repo_names and recursive_discovery=false)")
return []
def _build_repos_from_names(self) -> list[Repository]:
"""Build repos from explicit repo_names list (legacy mode)."""
repos = []
for name in self.settings.repo_names:
repo_path = self.settings.repos_base_path / name
repos.append(
Repository(
name=name,
path=repo_path,
remote=self.settings.git_remote,
branch=self.settings.git_branch,
)
)
return repos
def _discover_and_cache_repos(self) -> list[Repository]:
"""Discover repos recursively and cache results."""
# Use repos_base_paths if available, otherwise fall back to repos_base_path
base_paths = getattr(self.settings, 'repos_base_paths', None)
if not base_paths:
base_paths = [self.settings.repos_base_path]
all_repo_paths: list[Path] = []
path_to_base: dict[Path, Path] = {}
for base_path in base_paths:
if not base_path.exists():
logger.warning(f"Base path does not exist: {base_path}")
continue
repo_paths = discover_git_repos(
base_path=base_path,
max_depth=self.settings.recursive_depth,
exclude_patterns=self.settings.exclude_patterns,
ignore_repos=self.settings.ignore_repos,
)
for path in repo_paths:
if path not in path_to_base:
all_repo_paths.append(path)
path_to_base[path] = base_path
repos = []
for path in all_repo_paths:
base = path_to_base[path]
name = str(path.relative_to(base))
repos.append(
Repository(
name=name,
path=path,
remote=self.settings.git_remote,
branch=self.settings.git_branch,
)
)
self._cache_updated_at = datetime.now()
logger.info(f"Discovered {len(repos)} repositories across {len(base_paths)} base paths")
return repos
def _should_refresh_cache(self) -> bool:
"""Check if cache needs refresh."""
if not self.settings.recursive_discovery:
return False # Only refresh if using recursive discovery
if not self._cache_updated_at:
return True
elapsed = datetime.now() - self._cache_updated_at
return elapsed.total_seconds() > (self.settings.cache_update_minutes * 60)
@property
def is_running(self) -> bool:
"""Check if the daemon loop is running."""
return self._running
@property
def is_enabled(self) -> bool:
"""Check if the daemon is enabled."""
return self._enabled
@property
def last_cycle(self) -> CycleResult | None:
"""Get the last cycle result."""
return self._last_cycle
@property
def next_cycle_at(self) -> datetime | None:
"""Get the next scheduled cycle time."""
return self._next_cycle_at
@property
def total_cycles(self) -> int:
"""Get total number of cycles run."""
return self._total_cycles
def get_history(self, limit: int = 10) -> list[CycleResult]:
"""Get last N cycle results.
Args:
limit: Maximum number of cycles to return
Returns:
List of cycle results (newest first)
"""
history = list(self._cycle_history)
history.reverse() # Newest first
return history[:limit]
def get_repo_last_commit(self, repo_name: str) -> dict | None:
"""Get last commit info for a specific repo.
Args:
repo_name: Name of the repository
Returns:
Dict with commit info or None if not found
"""
# Search recent history for last commit to this repo
for cycle in reversed(self._cycle_history):
for result in cycle.results:
if result.repo_name == repo_name and result.commit_hash:
return {
"hash": result.commit_hash,
"message": result.commit_message,
"timestamp": result.timestamp,
"status": result.status.value,
}
return None
def get_repo_error_count(self, repo_name: str) -> int:
"""Count errors for a specific repo in recent history.
Args:
repo_name: Name of the repository
Returns:
Number of errors in last 100 cycles
"""
count = 0
for cycle in self._cycle_history:
for result in cycle.results:
if result.repo_name == repo_name and result.status == ProcessStatus.ERROR:
count += 1
return count
def get_error_history(self, limit: int = 20) -> list[dict]:
"""Get recent errors from all repos.
Args:
limit: Maximum number of errors to return
Returns:
List of error dicts (newest first)
"""
errors = []
for cycle in reversed(self._cycle_history):
for result in cycle.results:
if result.status == ProcessStatus.ERROR:
errors.append({
"cycle_id": cycle.cycle_id,
"timestamp": result.timestamp,
"repo_name": result.repo_name,
"error": result.error,
"recovered": result.recovered_by_claude,
})
if len(errors) >= limit:
return errors
return errors
def get_last_fully_successful_cycle(self) -> CycleResult | None:
"""Find the last cycle where all repos succeeded.
Returns:
CycleResult where repos_failed == 0 and repos_committed > 0, or None
"""
for cycle in reversed(self._cycle_history):
if cycle.repos_failed == 0 and cycle.repos_committed > 0:
return cycle
return None
def get_repo_last_success_timestamp(self, repo_name: str) -> datetime | None:
"""Get timestamp of last successful commit for a repo.
Args:
repo_name: Repository name to check
Returns:
Timestamp of last SUCCESS or RECOVERED status, or None
"""
for cycle in reversed(self._cycle_history):
for result in cycle.results:
if result.repo_name == repo_name:
if result.status in (ProcessStatus.SUCCESS, ProcessStatus.RECOVERED):
return result.timestamp
return None
def categorize_errors(self) -> dict[str, dict[str, Any]]:
"""Categorize recent errors by type.
Returns:
Dict mapping category name to {count, examples[]}
"""
errors = self.get_error_history(limit=100) # Analyze more for patterns
categories = {
"network": {
"patterns": ["connection", "timeout", "unreachable", "timed out", "network"],
"count": 0,
"examples": [],
},
"auth": {
"patterns": ["authentication", "permission", "denied", "ssh", "credentials", "publickey"],
"count": 0,
"examples": [],
},
"merge_conflict": {
"patterns": ["conflict", "merge failed", "divergent", "non-fast-forward"],
"count": 0,
"examples": [],
},
"git_state": {
"patterns": ["detached head", "not a git", "no such remote", "no upstream"],
"count": 0,
"examples": [],
},
"llm_service": {
"patterns": ["llama-service", "model not loaded", "generation failed", "llm"],
"count": 0,
"examples": [],
},
"other": {
"patterns": [],
"count": 0,
"examples": [],
},
}
for error_entry in errors:
error_text = error_entry.get("error", "").lower()
categorized = False
for cat_name, cat_data in categories.items():
if cat_name == "other":
continue
if any(pattern in error_text for pattern in cat_data["patterns"]):
cat_data["count"] += 1
if len(cat_data["examples"]) < 3: # Keep up to 3 examples
cat_data["examples"].append(error_entry)
categorized = True
break
if not categorized:
categories["other"]["count"] += 1
if len(categories["other"]["examples"]) < 3:
categories["other"]["examples"].append(error_entry)
# Remove patterns key and empty categories from response
return {
k: {"count": v["count"], "examples": v["examples"]}
for k, v in categories.items()
if v["count"] > 0
}
@property
def service_crashed(self) -> bool:
"""Check if the llama service has crashed."""
return self._service_crashed
@property
def service_health(self) -> str | None:
"""Get the current service health status."""
return self._service_health.value if self._service_health else None
@property
def last_health_check(self) -> datetime | None:
"""Get the last health check timestamp."""
return self._last_health_check
@property
def service_restart_attempts(self) -> int:
"""Get the number of restart attempts."""
return self._service_restart_attempts
@property
def service_last_crash_time(self) -> datetime | None:
"""Get the last crash timestamp."""
return self._last_crash_time
@property
def service_last_successful_restart(self) -> datetime | None:
"""Get the last successful restart timestamp."""
return self._last_successful_restart
async def _ensure_service_ready(self) -> bool:
"""Ensure llama service is available, starting if needed.
Returns:
True if service is available, False if crashed or unavailable
"""
if not self.service_manager:
# Autostart disabled, rely on external service
return True
# Check if we should perform health check this cycle
should_check = (
self.settings.llama_service_health_check_interval == 0 or
self._last_health_check_cycle >= self.settings.llama_service_health_check_interval
)
if should_check:
self._last_health_check = datetime.now()
self._last_health_check_cycle = 0
health = await self.service_manager.check_health()
self._service_health = health
if health == ServiceHealth.CRASHED or health == ServiceHealth.UNREACHABLE:
status_msg = "crashed (stale PID)" if health == ServiceHealth.CRASHED else "unreachable"
self._last_crash_time = datetime.now()
logger.warning(
f"Llama service {status_msg}, attempting restart "
f"(attempt 1/{self.settings.llama_service_max_restart_attempts})"
)
# Retry logic with exponential backoff
for attempt in range(1, self.settings.llama_service_max_restart_attempts + 1):
self._service_restart_attempts = attempt
started = await self.service_manager.ensure_service_available()
if started:
self._service_crashed = False
self._service_health = ServiceHealth.HEALTHY
self._service_restart_attempts = 0
self._last_successful_restart = datetime.now()
logger.info(f"✓ Llama service restarted successfully (attempt {attempt}/{self.settings.llama_service_max_restart_attempts})")
return True
# Failed this attempt
if attempt < self.settings.llama_service_max_restart_attempts:
backoff = self.settings.llama_service_restart_backoff_seconds * attempt
logger.warning(
f"✗ Restart attempt {attempt}/{self.settings.llama_service_max_restart_attempts} failed, "
f"retrying in {backoff:.1f}s..."
)
await asyncio.sleep(backoff)
else:
logger.error(
f"✗ Failed to restart llama service after {self.settings.llama_service_max_restart_attempts} attempts"
)
# All attempts failed
self._service_crashed = True
self._service_restart_attempts = self.settings.llama_service_max_restart_attempts
return False
# Service is healthy
self._service_crashed = False
self._service_restart_attempts = 0
return True
else:
# Skip health check this cycle
self._last_health_check_cycle += 1
return not self._service_crashed
def enable(self) -> None:
"""Enable the daemon."""
self._enabled = True
logger.info("Daemon enabled")
def disable(self) -> None:
"""Disable the daemon (will stop after current cycle)."""
self._enabled = False
logger.info("Daemon disabled")
async def start(self) -> None:
"""Start the daemon loop."""
if self._running:
logger.warning("Daemon is already running")
return
logger.info(
f"Starting commit daemon with {len(self.repos)} repos, "
f"cycle interval: {self.settings.cycle_interval_seconds}s"
)
self._running = True
try:
while self._running and self._enabled:
# Refresh cache if needed
if self._should_refresh_cache():
logger.info("Refreshing repository cache...")
self.repos = self._discover_and_cache_repos()
# Run a cycle
await self.run_cycle()
# Calculate next cycle time
self._next_cycle_at = datetime.now() + timedelta(
seconds=self.settings.cycle_interval_seconds
)
logger.info(
f"Next cycle at: {self._next_cycle_at.strftime('%H:%M:%S')}"
)
# Sleep until next cycle
await asyncio.sleep(self.settings.cycle_interval_seconds)
except asyncio.CancelledError:
logger.info("Daemon loop cancelled")
except Exception as e:
logger.exception(f"Daemon loop error: {e}")
finally:
self._running = False
self._next_cycle_at = None
logger.info("Daemon loop stopped")
async def stop(self) -> None:
"""Stop the daemon gracefully."""
logger.info("Stopping daemon...")
self._running = False
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def run_cycle(self, force: bool = False) -> CycleResult:
"""Process all repositories in a single cycle.
Args:
force: If True, run even if daemon is not running (for manual triggers)
Returns:
Cycle result with all repo results
"""
cycle_id = str(uuid.uuid4())[:8]
started_at = datetime.now()
results: list[RepoProcessResult] = []
logger.info(f"Starting cycle {cycle_id}")
# Ensure service is ready (auto-start if needed)
if not await self._ensure_service_ready():
logger.warning("Llama service not ready, skipping cycle")
cycle_result = CycleResult(
cycle_id=cycle_id,
started_at=started_at,
completed_at=datetime.now(),
repos_processed=0,
repos_committed=0,
repos_failed=0,
results=[],
)
self._last_cycle = cycle_result
self._cycle_history.append(cycle_result)
self._total_cycles += 1
return cycle_result
# Check LLM availability
if not await self.llm_client.is_available():
logger.warning("LLM service not available, skipping cycle")
cycle_result = CycleResult(
cycle_id=cycle_id,
started_at=started_at,
completed_at=datetime.now(),
repos_processed=0,
repos_committed=0,
repos_failed=0,
results=[],
)
self._last_cycle = cycle_result
self._cycle_history.append(cycle_result)
self._total_cycles += 1
return cycle_result
# Process each repo: commit → push → next
results: list[RepoProcessResult] = []
for repo in self.repos:
if not force and not self._running:
logger.info("Daemon stopped, aborting cycle")
break
# Get status first to know the branch and if we need to push
try:
status = await git_status(repo.path)
except Exception as e:
logger.error(f"[{cycle_id}] {repo.name}: Failed to get status - {e}")
results.append(RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.ERROR,
error=f"Failed to get git status: {e}",
))
continue
# Step 1: Commit
result = await self.processor.commit_repo(repo)
if result.status == ProcessStatus.NO_CHANGES:
# Check if we still need to push unpushed commits
if status.ahead > 0:
logger.info(f"[{cycle_id}] {repo.name}: No new changes, but {status.ahead} commits to push")
# Create a synthetic COMMITTED result for push
push_result = await self.processor.push_repo(
repo,
RepoProcessResult(repo_name=repo.name, status=ProcessStatus.COMMITTED),
branch=status.branch,
)
if push_result.status == ProcessStatus.SUCCESS:
logger.info(f"[{cycle_id}] {repo.name}: Pushed {status.ahead} commits successfully")
elif push_result.status == ProcessStatus.ERROR:
logger.error(f"[{cycle_id}] {repo.name}: Push failed - {push_result.error}")
results.append(push_result)
else:
logger.debug(f"[{cycle_id}] {repo.name}: No changes")
results.append(result)
continue
if result.status == ProcessStatus.ERROR:
logger.error(f"[{cycle_id}] {repo.name}: Commit failed - {result.error}")
results.append(result)
continue
# Step 2: Push (only if committed)
if result.status == ProcessStatus.COMMITTED:
logger.info(f"[{cycle_id}] {repo.name}: Committed {result.commit_hash}, pushing...")
result = await self.processor.push_repo(repo, result, branch=status.branch)
if result.status == ProcessStatus.SUCCESS:
logger.info(f"[{cycle_id}] {repo.name}: Pushed successfully")
elif result.status == ProcessStatus.RECOVERED:
logger.info(f"[{cycle_id}] {repo.name}: Recovered by Claude")
elif result.status == ProcessStatus.ERROR:
logger.error(f"[{cycle_id}] {repo.name}: Push failed - {result.error}")
results.append(result)
# Build cycle result
completed_at = datetime.now()
repos_committed = sum(
1 for r in results
if r.status in (ProcessStatus.SUCCESS, ProcessStatus.RECOVERED)
)
repos_failed = sum(
1 for r in results if r.status == ProcessStatus.ERROR
)
cycle_result = CycleResult(
cycle_id=cycle_id,
started_at=started_at,
completed_at=completed_at,
repos_processed=len(results),
repos_committed=repos_committed,
repos_failed=repos_failed,
results=results,
)
self._last_cycle = cycle_result
self._cycle_history.append(cycle_result)
self._total_cycles += 1
# Log summary
duration = (completed_at - started_at).total_seconds()
logger.info(
f"Cycle {cycle_id} completed in {duration:.1f}s: "
f"{repos_committed} committed, {repos_failed} failed, "
f"{len(results) - repos_committed - repos_failed} unchanged"
)
# Auto-refresh cache if any "Repository not found" errors occurred
if self.settings.recursive_discovery and repos_failed > 0:
not_found_errors = [
r for r in results
if r.status == ProcessStatus.ERROR and
r.error and "not found" in r.error.lower()
]
if not_found_errors:
logger.warning(
f"Detected {len(not_found_errors)} missing repositories, "
f"refreshing cache..."
)
old_count = len(self.repos)
self.repos = self._discover_and_cache_repos()
new_count = len(self.repos)
logger.info(
f"Cache refreshed: {old_count} -> {new_count} repos "
f"({new_count - old_count:+d})"
)
# Persist commits to history file (survives reboots)
self._persist_commits(cycle_result)
return cycle_result
def _persist_commits(self, cycle_result: CycleResult) -> None:
"""Persist committed repos to history file."""
commits_to_save = []
for result in cycle_result.results:
if result.commit_hash:
commits_to_save.append({
"hash": result.commit_hash,
"repo_name": result.repo_name,
"message": result.commit_message or "",
"timestamp": result.timestamp.isoformat(),
"daemon_id": None, # Could add daemon ID tracking
})
if commits_to_save:
try:
self.commit_history.append_commits(commits_to_save)
logger.debug(f"Persisted {len(commits_to_save)} commits to history")
except Exception as e:
logger.warning(f"Failed to persist commit history: {e}")
async def trigger_cycle(self) -> CycleResult:
"""Manually trigger a commit cycle.
Returns:
Cycle result
"""
logger.info("Manual cycle triggered")
return await self.run_cycle(force=True)