diff --git a/pyproject.toml b/pyproject.toml index 4fc2f55..ad1b1ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,9 @@ dependencies = [ "pyyaml>=6.0", "uvicorn>=0.30.0", "typer>=0.12.0", + "sqlalchemy[asyncio]>=2.0.0", + "asyncpg>=0.29.0", + "aiosqlite>=0.19.0", ] [project.scripts] diff --git a/src/auto_commit_service/app.py b/src/auto_commit_service/app.py index 0d7a937..2d3d180 100644 --- a/src/auto_commit_service/app.py +++ b/src/auto_commit_service/app.py @@ -14,6 +14,7 @@ from lilith_fastapi_service_base import ( ) from .config import AutoCommitSettings +from .database import DatabaseManager from .llm import LlamaCommitClient from .models import DaemonStatus, HealthResponse, TriggerResponse from .recovery import ErrorHandler @@ -36,6 +37,11 @@ async def create_auto_commit_service( if settings is None: settings = AutoCommitSettings(service_name="auto-commit-service") + # Initialize database + db_manager = DatabaseManager(db_path=settings.database_path) + db_manager.MAX_SIZE_MB = settings.database_max_size_mb + db_manager.RETENTION_DAYS = settings.database_retention_days + # Initialize components llm_client = LlamaCommitClient( base_url=settings.llama_service_url, @@ -48,6 +54,7 @@ async def create_auto_commit_service( settings=settings, llm_client=llm_client, error_handler=error_handler, + db_manager=db_manager, ) # Setup lifespan manager @@ -63,9 +70,14 @@ async def create_auto_commit_service( logger.info("Starting auto-commit daemon...") + # Initialize database + await db_manager.connect() + logger.info("Database connected: %s", db_manager.database_url) + # Store references in lifespan state lifespan.set_state("daemon", daemon) lifespan.set_state("llm_client", llm_client) + lifespan.set_state("db_manager", db_manager) # Start daemon in background if settings.enabled: @@ -92,6 +104,7 @@ async def create_auto_commit_service( pass await llm_client.close() + await db_manager.disconnect() logger.info("Daemon shutdown complete") @health.check("llama_service") diff --git a/src/auto_commit_service/config.py b/src/auto_commit_service/config.py index ad11d8f..35924aa 100644 --- a/src/auto_commit_service/config.py +++ b/src/auto_commit_service/config.py @@ -56,10 +56,24 @@ class AutoCommitSettings(BaseServiceSettings): description="Repository names to ignore (exclude from processing)", ) - # Persistent commit history + # Database settings (replaces JSON file storage) + database_path: Path = Field( + default=Path("~/.cache/commits/postgres").expanduser(), + description="PostgreSQL data directory (falls back to SQLite if PG unavailable)", + ) + database_max_size_mb: int = Field( + default=500, + description="Maximum database size in MB (triggers rolling data pruning)", + ) + database_retention_days: int = Field( + default=90, + description="Days to retain commit/cycle/error data before pruning", + ) + + # Legacy: Persistent commit history (deprecated, migrating to database) commit_history_file: Path = Field( default=Path("~/.config/commits/commit-history.json").expanduser(), - description="Persistent commit history file (survives reboots)", + description="Legacy JSON history file (will be migrated to database)", ) commit_history_max_per_daemon: int = Field( default=20, diff --git a/src/auto_commit_service/database/__init__.py b/src/auto_commit_service/database/__init__.py new file mode 100644 index 0000000..8294a9e --- /dev/null +++ b/src/auto_commit_service/database/__init__.py @@ -0,0 +1,6 @@ +"""PostgreSQL database for auto-commit service.""" + +from .connection import DatabaseManager +from .models import Commit, Cycle, Error, RepositoryStatus + +__all__ = ["DatabaseManager", "Commit", "Cycle", "Error", "RepositoryStatus"] diff --git a/src/auto_commit_service/database/connection.py b/src/auto_commit_service/database/connection.py new file mode 100644 index 0000000..ff9522a --- /dev/null +++ b/src/auto_commit_service/database/connection.py @@ -0,0 +1,220 @@ +"""Database connection manager with size limits and automatic pruning.""" + +import asyncio +import logging +from datetime import datetime, timedelta +from pathlib import Path +from typing import AsyncIterator + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +logger = logging.getLogger(__name__) + + +class DatabaseManager: + """Manages PostgreSQL connection with rolling data retention.""" + + MAX_SIZE_MB = 500 + PRUNE_CHECK_INTERVAL_SECONDS = 300 # 5 minutes + RETENTION_DAYS = 90 # Keep 90 days of data + SIZE_THRESHOLD_MB = 450 # Start pruning at 90% capacity + + def __init__(self, db_path: Path | None = None): + """Initialize database manager. + + Args: + db_path: Path to PostgreSQL data directory (defaults to ~/.cache/commits/postgres) + """ + if db_path is None: + db_path = Path("~/.cache/commits/postgres").expanduser() + + self.db_path = db_path + self.db_path.mkdir(parents=True, exist_ok=True) + + # Use PostgreSQL with local socket connection + # Fallback to SQLite if PostgreSQL not available + self.database_url = self._get_database_url() + self.engine: AsyncEngine | None = None + self.session_factory: async_sessionmaker[AsyncSession] | None = None + self._prune_task: asyncio.Task | None = None + + def _get_database_url(self) -> str: + """Get database URL with fallback logic.""" + import os + + # Check if PostgreSQL is available + pg_host = os.getenv("POSTGRES_HOST", "localhost") + pg_port = os.getenv("POSTGRES_PORT", "5432") + pg_user = os.getenv("POSTGRES_USER", "lilith") + pg_pass = os.getenv("POSTGRES_PASSWORD", "") + pg_db = os.getenv("POSTGRES_DB", "auto_commit") + + # Try PostgreSQL first + if pg_pass: + return f"postgresql+asyncpg://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}" + + # Fallback to SQLite with size limits + db_file = self.db_path / "auto_commit.db" + logger.warning( + "PostgreSQL credentials not found, falling back to SQLite at %s", db_file + ) + return f"sqlite+aiosqlite:///{db_file}" + + async def connect(self) -> None: + """Initialize database connection and create tables.""" + self.engine = create_async_engine( + self.database_url, + echo=False, + pool_pre_ping=True, + ) + + self.session_factory = async_sessionmaker( + self.engine, + class_=AsyncSession, + expire_on_commit=False, + ) + + # Create tables + from .models import Base + + async with self.engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + logger.info("Database initialized: %s", self.database_url) + + # Start background pruning task + self._prune_task = asyncio.create_task(self._prune_loop()) + + async def disconnect(self) -> None: + """Close database connection.""" + if self._prune_task: + self._prune_task.cancel() + try: + await self._prune_task + except asyncio.CancelledError: + pass + + if self.engine: + await self.engine.dispose() + self.engine = None + + logger.info("Database connection closed") + + async def session(self) -> AsyncIterator[AsyncSession]: + """Get database session context manager. + + Yields: + AsyncSession for database operations + """ + if not self.session_factory: + raise RuntimeError("Database not connected. Call connect() first.") + + async with self.session_factory() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_database_size_mb(self) -> float: + """Get current database size in MB. + + Returns: + Database size in megabytes + """ + if "sqlite" in self.database_url: + # SQLite: check file size + db_file = self.db_path / "auto_commit.db" + if db_file.exists(): + return db_file.stat().st_size / (1024 * 1024) + return 0.0 + + # PostgreSQL: query database size + async with self.session() as session: + result = await session.execute( + text("SELECT pg_database_size(current_database())") + ) + size_bytes = result.scalar() + return size_bytes / (1024 * 1024) if size_bytes else 0.0 + + async def prune_old_data(self, force: bool = False) -> dict[str, int]: + """Prune old data to maintain size limits. + + Args: + force: Force pruning even if under threshold + + Returns: + Dictionary with counts of deleted records + """ + from .models import Commit, Cycle, Error + + size_mb = await self.get_database_size_mb() + logger.info("Database size: %.2f MB / %d MB", size_mb, self.MAX_SIZE_MB) + + if not force and size_mb < self.SIZE_THRESHOLD_MB: + return {"commits": 0, "cycles": 0, "errors": 0} + + cutoff_date = datetime.utcnow() - timedelta(days=self.RETENTION_DAYS) + deleted = {"commits": 0, "cycles": 0, "errors": 0} + + async with self.session() as session: + # Delete old commits + result = await session.execute( + text("DELETE FROM commits WHERE timestamp < :cutoff"), + {"cutoff": cutoff_date}, + ) + deleted["commits"] = result.rowcount or 0 + + # Delete old cycles + result = await session.execute( + text("DELETE FROM cycles WHERE started_at < :cutoff"), + {"cutoff": cutoff_date}, + ) + deleted["cycles"] = result.rowcount or 0 + + # Delete old errors + result = await session.execute( + text("DELETE FROM errors WHERE timestamp < :cutoff"), + {"cutoff": cutoff_date}, + ) + deleted["errors"] = result.rowcount or 0 + + await session.commit() + + # Vacuum to reclaim space + if "postgres" in self.database_url: + async with self.engine.connect() as conn: + await conn.execute(text("VACUUM")) + elif "sqlite" in self.database_url: + async with self.engine.connect() as conn: + await conn.execute(text("VACUUM")) + + new_size_mb = await self.get_database_size_mb() + logger.info( + "Pruned %d commits, %d cycles, %d errors. Size: %.2f MB -> %.2f MB", + deleted["commits"], + deleted["cycles"], + deleted["errors"], + size_mb, + new_size_mb, + ) + + return deleted + + async def _prune_loop(self) -> None: + """Background task to periodically prune old data.""" + while True: + try: + await asyncio.sleep(self.PRUNE_CHECK_INTERVAL_SECONDS) + await self.prune_old_data() + except asyncio.CancelledError: + break + except Exception as e: + logger.exception("Error in prune loop: %s", e) diff --git a/src/auto_commit_service/database/models.py b/src/auto_commit_service/database/models.py new file mode 100644 index 0000000..5d6cbd1 --- /dev/null +++ b/src/auto_commit_service/database/models.py @@ -0,0 +1,108 @@ +"""SQLAlchemy models for auto-commit database.""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import Index, String, Text, Integer, Float, Boolean, DateTime, JSON +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +class Base(DeclarativeBase): + """Base class for all database models.""" + + pass + + +class Commit(Base): + """Commit record.""" + + __tablename__ = "commits" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + hash: Mapped[str] = mapped_column(String(40), unique=True, index=True) + repo_name: Mapped[str] = mapped_column(String(255), index=True) + message: Mapped[str] = mapped_column(Text) + timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) + daemon_id: Mapped[Optional[str]] = mapped_column(String(16), nullable=True) + branch: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + files_changed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + insertions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + deletions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + + __table_args__ = ( + Index("idx_commit_timestamp", "timestamp"), + Index("idx_commit_repo_timestamp", "repo_name", "timestamp"), + ) + + +class Cycle(Base): + """Commit cycle execution record.""" + + __tablename__ = "cycles" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + daemon_id: Mapped[str] = mapped_column(String(16), index=True) + started_at: Mapped[datetime] = mapped_column(DateTime, index=True) + completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + duration_seconds: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + repos_processed: Mapped[int] = mapped_column(Integer, default=0) + repos_committed: Mapped[int] = mapped_column(Integer, default=0) + repos_failed: Mapped[int] = mapped_column(Integer, default=0) + repos_skipped: Mapped[int] = mapped_column(Integer, default=0) + success: Mapped[bool] = mapped_column(Boolean, default=False) + error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + __table_args__ = ( + Index("idx_cycle_daemon_started", "daemon_id", "started_at"), + Index("idx_cycle_started", "started_at"), + ) + + +class Error(Base): + """Error log record.""" + + __tablename__ = "errors" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + daemon_id: Mapped[str] = mapped_column(String(16), index=True) + repo_name: Mapped[str] = mapped_column(String(255), index=True) + timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) + error_type: Mapped[str] = mapped_column(String(100), index=True) + error_message: Mapped[str] = mapped_column(Text) + traceback: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + cycle_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + + __table_args__ = ( + Index("idx_error_timestamp", "timestamp"), + Index("idx_error_repo_timestamp", "repo_name", "timestamp"), + Index("idx_error_type", "error_type"), + ) + + +class RepositoryStatus(Base): + """Repository status tracking.""" + + __tablename__ = "repository_status" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + repo_name: Mapped[str] = mapped_column(String(255), unique=True, index=True) + daemon_id: Mapped[str] = mapped_column(String(16), index=True) + last_commit_hash: Mapped[Optional[str]] = mapped_column(String(40), nullable=True) + last_commit_timestamp: Mapped[Optional[datetime]] = mapped_column( + DateTime, nullable=True + ) + last_success_timestamp: Mapped[Optional[datetime]] = mapped_column( + DateTime, nullable=True, index=True + ) + last_error_timestamp: Mapped[Optional[datetime]] = mapped_column( + DateTime, nullable=True + ) + total_commits: Mapped[int] = mapped_column(Integer, default=0) + total_errors: Mapped[int] = mapped_column(Integer, default=0) + consecutive_errors: Mapped[int] = mapped_column(Integer, default=0) + extra_data: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True) + updated_at: Mapped[datetime] = mapped_column( + DateTime, default=datetime.utcnow, onupdate=datetime.utcnow + ) + + __table_args__ = (Index("idx_repo_status_updated", "updated_at"),) diff --git a/src/auto_commit_service/database/repository.py b/src/auto_commit_service/database/repository.py new file mode 100644 index 0000000..27d426b --- /dev/null +++ b/src/auto_commit_service/database/repository.py @@ -0,0 +1,306 @@ +"""Repository pattern for database operations.""" + +import logging +from datetime import datetime, timedelta +from typing import List, Optional + +from sqlalchemy import select, func, and_, or_ +from sqlalchemy.ext.asyncio import AsyncSession + +from .models import Commit, Cycle, Error, RepositoryStatus + +logger = logging.getLogger(__name__) + + +class CommitRepository: + """Repository for commit operations.""" + + @staticmethod + async def create_commit( + session: AsyncSession, + hash: str, + repo_name: str, + message: str, + timestamp: datetime, + daemon_id: Optional[str] = None, + branch: Optional[str] = None, + files_changed: Optional[int] = None, + insertions: Optional[int] = None, + deletions: Optional[int] = None, + ) -> Commit: + """Create a new commit record.""" + commit = Commit( + hash=hash, + repo_name=repo_name, + message=message, + timestamp=timestamp, + daemon_id=daemon_id, + branch=branch, + files_changed=files_changed, + insertions=insertions, + deletions=deletions, + ) + session.add(commit) + await session.flush() + return commit + + @staticmethod + async def get_recent_commits( + session: AsyncSession, + limit: int = 20, + daemon_id: Optional[str] = None, + repo_name: Optional[str] = None, + ) -> List[Commit]: + """Get recent commits, newest first.""" + query = select(Commit).order_by(Commit.timestamp.desc()).limit(limit) + + if daemon_id: + query = query.where(Commit.daemon_id == daemon_id) + if repo_name: + query = query.where(Commit.repo_name == repo_name) + + result = await session.execute(query) + return list(result.scalars().all()) + + @staticmethod + async def get_commits_by_repo( + session: AsyncSession, + repo_name: str, + limit: int = 10, + ) -> List[Commit]: + """Get commits for a specific repository.""" + query = ( + select(Commit) + .where(Commit.repo_name == repo_name) + .order_by(Commit.timestamp.desc()) + .limit(limit) + ) + result = await session.execute(query) + return list(result.scalars().all()) + + +class CycleRepository: + """Repository for cycle operations.""" + + @staticmethod + async def create_cycle( + session: AsyncSession, + daemon_id: str, + started_at: datetime, + ) -> Cycle: + """Create a new cycle record.""" + cycle = Cycle( + daemon_id=daemon_id, + started_at=started_at, + ) + session.add(cycle) + await session.flush() + return cycle + + @staticmethod + async def update_cycle( + session: AsyncSession, + cycle: Cycle, + completed_at: datetime, + repos_processed: int, + repos_committed: int, + repos_failed: int, + repos_skipped: int, + success: bool, + error_message: Optional[str] = None, + ) -> Cycle: + """Update cycle with completion data.""" + cycle.completed_at = completed_at + cycle.duration_seconds = (completed_at - cycle.started_at).total_seconds() + cycle.repos_processed = repos_processed + cycle.repos_committed = repos_committed + cycle.repos_failed = repos_failed + cycle.repos_skipped = repos_skipped + cycle.success = success + cycle.error_message = error_message + await session.flush() + return cycle + + @staticmethod + async def get_recent_cycles( + session: AsyncSession, + limit: int = 10, + daemon_id: Optional[str] = None, + ) -> List[Cycle]: + """Get recent cycles, newest first.""" + query = select(Cycle).order_by(Cycle.started_at.desc()).limit(limit) + + if daemon_id: + query = query.where(Cycle.daemon_id == daemon_id) + + result = await session.execute(query) + return list(result.scalars().all()) + + @staticmethod + async def get_last_successful_cycle( + session: AsyncSession, + daemon_id: str, + ) -> Optional[Cycle]: + """Get the last successful cycle.""" + query = ( + select(Cycle) + .where(and_(Cycle.daemon_id == daemon_id, Cycle.success == True)) + .order_by(Cycle.started_at.desc()) + .limit(1) + ) + result = await session.execute(query) + return result.scalar_one_or_none() + + +class ErrorRepository: + """Repository for error operations.""" + + @staticmethod + async def create_error( + session: AsyncSession, + daemon_id: str, + repo_name: str, + error_type: str, + error_message: str, + traceback: Optional[str] = None, + cycle_id: Optional[int] = None, + ) -> Error: + """Create a new error record.""" + error = Error( + daemon_id=daemon_id, + repo_name=repo_name, + timestamp=datetime.utcnow(), + error_type=error_type, + error_message=error_message, + traceback=traceback, + cycle_id=cycle_id, + ) + session.add(error) + await session.flush() + return error + + @staticmethod + async def get_recent_errors( + session: AsyncSession, + limit: int = 20, + repo_name: Optional[str] = None, + ) -> List[Error]: + """Get recent errors, newest first.""" + query = select(Error).order_by(Error.timestamp.desc()).limit(limit) + + if repo_name: + query = query.where(Error.repo_name == repo_name) + + result = await session.execute(query) + return list(result.scalars().all()) + + @staticmethod + async def get_error_count_by_repo( + session: AsyncSession, + repo_name: str, + since: Optional[datetime] = None, + ) -> int: + """Get error count for a repository.""" + query = select(func.count(Error.id)).where(Error.repo_name == repo_name) + + if since: + query = query.where(Error.timestamp >= since) + + result = await session.execute(query) + return result.scalar() or 0 + + @staticmethod + async def categorize_errors( + session: AsyncSession, + daemon_id: str, + ) -> dict[str, int]: + """Get error counts by type.""" + query = ( + select(Error.error_type, func.count(Error.id)) + .where(Error.daemon_id == daemon_id) + .group_by(Error.error_type) + ) + result = await session.execute(query) + return {error_type: count for error_type, count in result.all()} + + +class RepositoryStatusRepository: + """Repository for repository status operations.""" + + @staticmethod + async def upsert_status( + session: AsyncSession, + repo_name: str, + daemon_id: str, + last_commit_hash: Optional[str] = None, + last_commit_timestamp: Optional[datetime] = None, + last_success_timestamp: Optional[datetime] = None, + last_error_timestamp: Optional[datetime] = None, + increment_commits: int = 0, + increment_errors: int = 0, + reset_consecutive_errors: bool = False, + ) -> RepositoryStatus: + """Update or create repository status.""" + # Try to get existing status + query = select(RepositoryStatus).where(RepositoryStatus.repo_name == repo_name) + result = await session.execute(query) + status = result.scalar_one_or_none() + + if status is None: + # Create new status + status = RepositoryStatus( + repo_name=repo_name, + daemon_id=daemon_id, + total_commits=increment_commits, + total_errors=increment_errors, + ) + session.add(status) + else: + # Update existing status + status.daemon_id = daemon_id + status.total_commits += increment_commits + status.total_errors += increment_errors + + # Update fields + if last_commit_hash: + status.last_commit_hash = last_commit_hash + if last_commit_timestamp: + status.last_commit_timestamp = last_commit_timestamp + if last_success_timestamp: + status.last_success_timestamp = last_success_timestamp + if last_error_timestamp: + status.last_error_timestamp = last_error_timestamp + + if reset_consecutive_errors: + status.consecutive_errors = 0 + elif increment_errors > 0: + status.consecutive_errors = (status.consecutive_errors or 0) + increment_errors + + status.updated_at = datetime.utcnow() + + await session.flush() + return status + + @staticmethod + async def get_status( + session: AsyncSession, + repo_name: str, + ) -> Optional[RepositoryStatus]: + """Get repository status.""" + query = select(RepositoryStatus).where(RepositoryStatus.repo_name == repo_name) + result = await session.execute(query) + return result.scalar_one_or_none() + + @staticmethod + async def get_all_statuses( + session: AsyncSession, + daemon_id: Optional[str] = None, + ) -> List[RepositoryStatus]: + """Get all repository statuses.""" + query = select(RepositoryStatus).order_by(RepositoryStatus.repo_name) + + if daemon_id: + query = query.where(RepositoryStatus.daemon_id == daemon_id) + + result = await session.execute(query) + return list(result.scalars().all()) diff --git a/src/auto_commit_service/scheduler/daemon.py b/src/auto_commit_service/scheduler/daemon.py index 91fe9fe..0ceed55 100644 --- a/src/auto_commit_service/scheduler/daemon.py +++ b/src/auto_commit_service/scheduler/daemon.py @@ -28,6 +28,7 @@ class CommitDaemon: settings: AutoCommitSettings, llm_client: LlamaCommitClient, error_handler: ErrorHandler | None = None, + db_manager: "DatabaseManager | None" = None, ): """Initialize the daemon. @@ -35,10 +36,12 @@ class CommitDaemon: settings: Service settings llm_client: LLM client for message generation error_handler: Optional error handler for recovery + db_manager: Database manager for persistent storage """ self.settings = settings self.llm_client = llm_client self.error_handler = error_handler + self.db_manager = db_manager # Initialize service manager if settings.llama_service_autostart: @@ -69,12 +72,15 @@ class CommitDaemon: self._cycle_history: deque[CycleResult] = deque(maxlen=100) self._total_cycles = 0 - # Persistent commit history (survives reboots) + # Persistent commit history (legacy JSON, will be replaced by database) self.commit_history = CommitHistory( file_path=settings.commit_history_file, max_size=settings.commit_history_max_per_daemon * 10, # Global pool for all daemons ) + # Generate unique daemon ID for database tracking + self.daemon_id = str(uuid.uuid4())[:8] + # State self._running = False self._enabled = settings.enabled