auto-commit-service/src/auto_commit_service/app.py
2026-01-05 15:24:28 -08:00

402 lines
13 KiB
Python

"""FastAPI application factory."""
import asyncio
import logging
from datetime import datetime
from fastapi import FastAPI, HTTPException
from lilith_ml_service_base import (
create_ml_service,
LifespanManager,
HealthChecker,
)
from .config import AutoCommitSettings
from .llm import LlamaCommitClient
from .models import DaemonStatus, HealthResponse, TriggerResponse
from .recovery import ErrorHandler
from .scheduler import CommitDaemon
logger = logging.getLogger(__name__)
def create_auto_commit_service(
settings: AutoCommitSettings | None = None,
) -> FastAPI:
"""Create the auto-commit service application.
Args:
settings: Service settings (creates default if None)
Returns:
Configured FastAPI application
"""
if settings is None:
settings = AutoCommitSettings(service_name="auto-commit-service")
# Initialize components
llm_client = LlamaCommitClient(
base_url=settings.llama_service_url,
model=settings.llama_model,
timeout=settings.llama_timeout,
)
error_handler = ErrorHandler(settings)
daemon = CommitDaemon(
settings=settings,
llm_client=llm_client,
error_handler=error_handler,
)
# Setup lifespan manager
lifespan = LifespanManager()
health = HealthChecker()
daemon_task: asyncio.Task | None = None
@lifespan.on_startup
async def start_daemon() -> None:
"""Start the commit daemon as a background task."""
nonlocal daemon_task
logger.info("Starting auto-commit daemon...")
# Store references in lifespan state
lifespan.set_state("daemon", daemon)
lifespan.set_state("llm_client", llm_client)
# Start daemon in background
if settings.enabled:
daemon_task = asyncio.create_task(daemon.start())
lifespan.set_state("daemon_task", daemon_task)
logger.info("Daemon task started")
else:
logger.info("Daemon disabled, not starting")
@lifespan.on_shutdown
async def stop_daemon() -> None:
"""Stop the daemon gracefully."""
nonlocal daemon_task
logger.info("Shutting down auto-commit daemon...")
await daemon.stop()
if daemon_task and not daemon_task.done():
daemon_task.cancel()
try:
await daemon_task
except asyncio.CancelledError:
pass
await llm_client.close()
logger.info("Daemon shutdown complete")
@health.check("llama_service")
async def check_llama() -> bool:
return await llm_client.is_available()
@health.check("repositories")
async def check_repos() -> bool:
return all(repo.exists for repo in daemon.repos)
# Create FastAPI app
app = create_ml_service(
title="Auto-Commit Service",
description="Automated commit message generation and git operations",
version="0.1.0",
settings=settings,
lifespan_manager=lifespan,
health_checker=health,
)
# Store daemon reference
app.state.daemon = daemon
# --- API Endpoints ---
@app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
"""Check service health."""
llama_available = await llm_client.is_available()
repos_accessible = all(repo.exists for repo in daemon.repos)
service_crashed = daemon.service_crashed
if service_crashed:
status = "error"
elif llama_available and repos_accessible and daemon.is_running:
status = "ok"
elif llama_available and repos_accessible:
status = "degraded"
else:
status = "error"
error = None
if service_crashed:
error = "llama-service has crashed"
elif not llama_available:
error = "llama-service not available"
elif not repos_accessible:
missing = [r.name for r in daemon.repos if not r.exists]
error = f"Missing repos: {', '.join(missing)}"
return HealthResponse(
status=status,
daemon_running=daemon.is_running,
llama_service_available=llama_available,
repos_accessible=repos_accessible,
error=error,
)
@app.get("/status", response_model=DaemonStatus)
async def get_status() -> DaemonStatus:
"""Get current daemon status and last cycle results."""
return DaemonStatus(
running=daemon.is_running,
enabled=daemon.is_enabled,
cycle_interval_seconds=settings.cycle_interval_seconds,
repos=[r.name for r in daemon.repos],
last_cycle=daemon.last_cycle,
next_cycle_at=daemon.next_cycle_at,
service_crashed=daemon.service_crashed,
service_health=daemon.service_health,
last_health_check=daemon.last_health_check,
)
@app.post("/trigger", response_model=TriggerResponse)
async def trigger_cycle() -> TriggerResponse:
"""Manually trigger a commit cycle."""
try:
cycle_result = await daemon.trigger_cycle()
# Check if service failed to start
if cycle_result.repos_processed == 0 and daemon.service_crashed:
raise HTTPException(
status_code=503,
detail="llama-service has crashed",
)
return TriggerResponse(
triggered=True,
message=f"Cycle completed: {cycle_result.repos_committed} committed, "
f"{cycle_result.repos_failed} failed",
cycle_result=cycle_result,
)
except HTTPException:
raise
except Exception as e:
logger.exception("Error during manual trigger")
raise HTTPException(
status_code=500,
detail=str(e),
)
@app.post("/enable")
async def enable_daemon() -> dict:
"""Enable the daemon."""
daemon.enable()
# Start the daemon task if not running
nonlocal daemon_task
if not daemon.is_running:
daemon_task = asyncio.create_task(daemon.start())
return {"enabled": True, "message": "Daemon enabled"}
@app.post("/disable")
async def disable_daemon() -> dict:
"""Disable the daemon."""
daemon.disable()
return {"enabled": False, "message": "Daemon will stop after current cycle"}
@app.get("/repos")
async def list_repos() -> list[dict]:
"""List configured repositories."""
return [
{
"name": repo.name,
"path": str(repo.path),
"exists": repo.exists,
"remote": repo.remote,
"branch": repo.branch,
}
for repo in daemon.repos
]
@app.get("/history")
async def get_history(limit: int = 10) -> dict:
"""Get last N cycle results."""
return {
"cycles": daemon.get_history(limit),
"total_cycles": daemon.total_cycles,
}
@app.post("/repos/refresh")
async def refresh_repos() -> dict:
"""Force immediate repo cache refresh."""
if not settings.recursive_discovery:
raise HTTPException(
status_code=400,
detail="Recursive discovery not enabled",
)
old_count = len(daemon.repos)
daemon.repos = daemon._discover_and_cache_repos()
new_count = len(daemon.repos)
return {
"refreshed": True,
"old_count": old_count,
"new_count": new_count,
"repos": [r.name for r in daemon.repos],
}
@app.post("/repos/refresh-and-run")
async def refresh_and_run() -> dict:
"""Refresh repo cache and immediately run a commit cycle."""
if not settings.recursive_discovery:
raise HTTPException(
status_code=400,
detail="Recursive discovery not enabled",
)
try:
# Refresh repos
old_count = len(daemon.repos)
daemon.repos = daemon._discover_and_cache_repos()
new_count = len(daemon.repos)
# Run cycle (will auto-start service if needed)
cycle_result = await daemon.trigger_cycle()
# Check if service failed to start
if cycle_result.repos_processed == 0 and daemon.service_crashed:
raise HTTPException(
status_code=503,
detail="llama-service has crashed",
)
return {
"refreshed": True,
"old_count": old_count,
"new_count": new_count,
"repos": [r.name for r in daemon.repos],
"cycle_triggered": True,
"repos_committed": cycle_result.repos_committed,
"repos_failed": cycle_result.repos_failed,
"cycle_result": cycle_result.model_dump(),
}
except HTTPException:
raise
except Exception as e:
logger.exception("Error during refresh-and-run")
raise HTTPException(
status_code=500,
detail=str(e),
)
@app.get("/report")
async def get_report() -> dict:
"""Comprehensive report: history + per-repo status + errors."""
return {
"last_cycles": daemon.get_history(10),
"repos": [
{
"name": r.name,
"path": str(r.path),
"last_commit": daemon.get_repo_last_commit(r.name),
"error_count": daemon.get_repo_error_count(r.name),
}
for r in daemon.repos
],
"errors": daemon.get_error_history(20),
}
@app.get("/report/summary")
async def get_report_summary() -> dict:
"""Enhanced report with health checks, success tracking, and error categorization."""
# Service health (already implemented)
llm_health = await llm_client.health_check()
# Last success tracking
last_full_success = daemon.get_last_fully_successful_cycle()
# Per-repo enhanced data
repos_summary = [
{
"name": r.name,
"path": str(r.path),
"last_commit": daemon.get_repo_last_commit(r.name),
"last_success": daemon.get_repo_last_success_timestamp(r.name),
"error_count": daemon.get_repo_error_count(r.name),
}
for r in daemon.repos
]
# Error categorization
error_categories = daemon.categorize_errors()
return {
"service_health": {
"llama_service": llm_health,
"daemon_running": daemon._running,
"daemon_enabled": daemon._enabled,
"next_cycle_at": daemon.next_cycle_at,
"cycle_interval_seconds": settings.cycle_interval_seconds,
},
"success_tracking": {
"last_fully_successful_cycle": (
last_full_success.model_dump() if last_full_success else None
),
"total_cycles_run": daemon.total_cycles,
},
"repos": repos_summary,
"errors": {
"categories": error_categories,
"total_errors": sum(cat["count"] for cat in error_categories.values()),
},
"last_cycles": daemon.get_history(5), # Recent context
}
@app.get("/report/commits")
async def get_commit_history(limit: int = 20) -> dict:
"""Get recent commits grouped by repository (from persistent history)."""
from collections import defaultdict
# Read from persistent history file (survives reboots)
recent_commits = daemon.commit_history.get_recent_commits(limit=limit)
# Group by repository
commits_by_repo = defaultdict(list)
for commit in recent_commits:
commits_by_repo[commit["repo_name"]].append({
"hash": commit["hash"],
"message": commit["message"],
"timestamp": commit["timestamp"],
})
# Sort repos by most recent commit
sorted_repos = sorted(
commits_by_repo.items(),
key=lambda x: x[1][0]["timestamp"] if x[1] else "",
reverse=True,
)
return {
"repos": [
{
"repo_name": repo_name,
"commit_count": len(commits),
"commits": commits,
}
for repo_name, commits in sorted_repos
],
"total_commits": len(recent_commits),
"limit": limit,
}
return app