feat(database): add PostgreSQL storage with 500MB rolling data retention

- Created database module with DatabaseManager, models, and repositories
- SQLAlchemy async support with PostgreSQL (asyncpg) and SQLite (aiosqlite) fallback
- Database stored in ~/.cache/commits/postgres (or auto_commit.db for SQLite)
- Automatic size monitoring and pruning every 5 minutes
- Configurable retention: 500MB max size, 90 days default data retention
- Rolling window: auto-prunes oldest commits/cycles/errors when threshold reached
- Models: Commit, Cycle, Error, RepositoryStatus with proper indexes
- Repository pattern for clean database operations
- Integrated into app lifespan (connects on startup, disconnects on shutdown)
- Updated daemon to accept db_manager parameter

Database features:
- Size limit enforcement (default: 500MB)
- Automatic VACUUM after pruning
- Time-based retention (default: 90 days)
- Background pruning task (every 5 minutes)
- Comprehensive error logging and categorization
- Repository status tracking with consecutive error counts

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Lilith 2026-01-14 11:02:09 -08:00
parent 023a134967
commit 0ea54d2c1a
8 changed files with 679 additions and 3 deletions

View file

@ -13,6 +13,9 @@ dependencies = [
"pyyaml>=6.0",
"uvicorn>=0.30.0",
"typer>=0.12.0",
"sqlalchemy[asyncio]>=2.0.0",
"asyncpg>=0.29.0",
"aiosqlite>=0.19.0",
]
[project.scripts]

View file

@ -14,6 +14,7 @@ from lilith_fastapi_service_base import (
)
from .config import AutoCommitSettings
from .database import DatabaseManager
from .llm import LlamaCommitClient
from .models import DaemonStatus, HealthResponse, TriggerResponse
from .recovery import ErrorHandler
@ -36,6 +37,11 @@ async def create_auto_commit_service(
if settings is None:
settings = AutoCommitSettings(service_name="auto-commit-service")
# Initialize database
db_manager = DatabaseManager(db_path=settings.database_path)
db_manager.MAX_SIZE_MB = settings.database_max_size_mb
db_manager.RETENTION_DAYS = settings.database_retention_days
# Initialize components
llm_client = LlamaCommitClient(
base_url=settings.llama_service_url,
@ -48,6 +54,7 @@ async def create_auto_commit_service(
settings=settings,
llm_client=llm_client,
error_handler=error_handler,
db_manager=db_manager,
)
# Setup lifespan manager
@ -63,9 +70,14 @@ async def create_auto_commit_service(
logger.info("Starting auto-commit daemon...")
# Initialize database
await db_manager.connect()
logger.info("Database connected: %s", db_manager.database_url)
# Store references in lifespan state
lifespan.set_state("daemon", daemon)
lifespan.set_state("llm_client", llm_client)
lifespan.set_state("db_manager", db_manager)
# Start daemon in background
if settings.enabled:
@ -92,6 +104,7 @@ async def create_auto_commit_service(
pass
await llm_client.close()
await db_manager.disconnect()
logger.info("Daemon shutdown complete")
@health.check("llama_service")

View file

@ -56,10 +56,24 @@ class AutoCommitSettings(BaseServiceSettings):
description="Repository names to ignore (exclude from processing)",
)
# Persistent commit history
# Database settings (replaces JSON file storage)
database_path: Path = Field(
default=Path("~/.cache/commits/postgres").expanduser(),
description="PostgreSQL data directory (falls back to SQLite if PG unavailable)",
)
database_max_size_mb: int = Field(
default=500,
description="Maximum database size in MB (triggers rolling data pruning)",
)
database_retention_days: int = Field(
default=90,
description="Days to retain commit/cycle/error data before pruning",
)
# Legacy: Persistent commit history (deprecated, migrating to database)
commit_history_file: Path = Field(
default=Path("~/.config/commits/commit-history.json").expanduser(),
description="Persistent commit history file (survives reboots)",
description="Legacy JSON history file (will be migrated to database)",
)
commit_history_max_per_daemon: int = Field(
default=20,

View file

@ -0,0 +1,6 @@
"""PostgreSQL database for auto-commit service."""
from .connection import DatabaseManager
from .models import Commit, Cycle, Error, RepositoryStatus
__all__ = ["DatabaseManager", "Commit", "Cycle", "Error", "RepositoryStatus"]

View file

@ -0,0 +1,220 @@
"""Database connection manager with size limits and automatic pruning."""
import asyncio
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import AsyncIterator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages PostgreSQL connection with rolling data retention."""
MAX_SIZE_MB = 500
PRUNE_CHECK_INTERVAL_SECONDS = 300 # 5 minutes
RETENTION_DAYS = 90 # Keep 90 days of data
SIZE_THRESHOLD_MB = 450 # Start pruning at 90% capacity
def __init__(self, db_path: Path | None = None):
"""Initialize database manager.
Args:
db_path: Path to PostgreSQL data directory (defaults to ~/.cache/commits/postgres)
"""
if db_path is None:
db_path = Path("~/.cache/commits/postgres").expanduser()
self.db_path = db_path
self.db_path.mkdir(parents=True, exist_ok=True)
# Use PostgreSQL with local socket connection
# Fallback to SQLite if PostgreSQL not available
self.database_url = self._get_database_url()
self.engine: AsyncEngine | None = None
self.session_factory: async_sessionmaker[AsyncSession] | None = None
self._prune_task: asyncio.Task | None = None
def _get_database_url(self) -> str:
"""Get database URL with fallback logic."""
import os
# Check if PostgreSQL is available
pg_host = os.getenv("POSTGRES_HOST", "localhost")
pg_port = os.getenv("POSTGRES_PORT", "5432")
pg_user = os.getenv("POSTGRES_USER", "lilith")
pg_pass = os.getenv("POSTGRES_PASSWORD", "")
pg_db = os.getenv("POSTGRES_DB", "auto_commit")
# Try PostgreSQL first
if pg_pass:
return f"postgresql+asyncpg://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}"
# Fallback to SQLite with size limits
db_file = self.db_path / "auto_commit.db"
logger.warning(
"PostgreSQL credentials not found, falling back to SQLite at %s", db_file
)
return f"sqlite+aiosqlite:///{db_file}"
async def connect(self) -> None:
"""Initialize database connection and create tables."""
self.engine = create_async_engine(
self.database_url,
echo=False,
pool_pre_ping=True,
)
self.session_factory = async_sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False,
)
# Create tables
from .models import Base
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database initialized: %s", self.database_url)
# Start background pruning task
self._prune_task = asyncio.create_task(self._prune_loop())
async def disconnect(self) -> None:
"""Close database connection."""
if self._prune_task:
self._prune_task.cancel()
try:
await self._prune_task
except asyncio.CancelledError:
pass
if self.engine:
await self.engine.dispose()
self.engine = None
logger.info("Database connection closed")
async def session(self) -> AsyncIterator[AsyncSession]:
"""Get database session context manager.
Yields:
AsyncSession for database operations
"""
if not self.session_factory:
raise RuntimeError("Database not connected. Call connect() first.")
async with self.session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_database_size_mb(self) -> float:
"""Get current database size in MB.
Returns:
Database size in megabytes
"""
if "sqlite" in self.database_url:
# SQLite: check file size
db_file = self.db_path / "auto_commit.db"
if db_file.exists():
return db_file.stat().st_size / (1024 * 1024)
return 0.0
# PostgreSQL: query database size
async with self.session() as session:
result = await session.execute(
text("SELECT pg_database_size(current_database())")
)
size_bytes = result.scalar()
return size_bytes / (1024 * 1024) if size_bytes else 0.0
async def prune_old_data(self, force: bool = False) -> dict[str, int]:
"""Prune old data to maintain size limits.
Args:
force: Force pruning even if under threshold
Returns:
Dictionary with counts of deleted records
"""
from .models import Commit, Cycle, Error
size_mb = await self.get_database_size_mb()
logger.info("Database size: %.2f MB / %d MB", size_mb, self.MAX_SIZE_MB)
if not force and size_mb < self.SIZE_THRESHOLD_MB:
return {"commits": 0, "cycles": 0, "errors": 0}
cutoff_date = datetime.utcnow() - timedelta(days=self.RETENTION_DAYS)
deleted = {"commits": 0, "cycles": 0, "errors": 0}
async with self.session() as session:
# Delete old commits
result = await session.execute(
text("DELETE FROM commits WHERE timestamp < :cutoff"),
{"cutoff": cutoff_date},
)
deleted["commits"] = result.rowcount or 0
# Delete old cycles
result = await session.execute(
text("DELETE FROM cycles WHERE started_at < :cutoff"),
{"cutoff": cutoff_date},
)
deleted["cycles"] = result.rowcount or 0
# Delete old errors
result = await session.execute(
text("DELETE FROM errors WHERE timestamp < :cutoff"),
{"cutoff": cutoff_date},
)
deleted["errors"] = result.rowcount or 0
await session.commit()
# Vacuum to reclaim space
if "postgres" in self.database_url:
async with self.engine.connect() as conn:
await conn.execute(text("VACUUM"))
elif "sqlite" in self.database_url:
async with self.engine.connect() as conn:
await conn.execute(text("VACUUM"))
new_size_mb = await self.get_database_size_mb()
logger.info(
"Pruned %d commits, %d cycles, %d errors. Size: %.2f MB -> %.2f MB",
deleted["commits"],
deleted["cycles"],
deleted["errors"],
size_mb,
new_size_mb,
)
return deleted
async def _prune_loop(self) -> None:
"""Background task to periodically prune old data."""
while True:
try:
await asyncio.sleep(self.PRUNE_CHECK_INTERVAL_SECONDS)
await self.prune_old_data()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception("Error in prune loop: %s", e)

View file

@ -0,0 +1,108 @@
"""SQLAlchemy models for auto-commit database."""
from datetime import datetime
from typing import Optional
from sqlalchemy import Index, String, Text, Integer, Float, Boolean, DateTime, JSON
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""Base class for all database models."""
pass
class Commit(Base):
"""Commit record."""
__tablename__ = "commits"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
hash: Mapped[str] = mapped_column(String(40), unique=True, index=True)
repo_name: Mapped[str] = mapped_column(String(255), index=True)
message: Mapped[str] = mapped_column(Text)
timestamp: Mapped[datetime] = mapped_column(DateTime, index=True)
daemon_id: Mapped[Optional[str]] = mapped_column(String(16), nullable=True)
branch: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
files_changed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
insertions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
deletions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
__table_args__ = (
Index("idx_commit_timestamp", "timestamp"),
Index("idx_commit_repo_timestamp", "repo_name", "timestamp"),
)
class Cycle(Base):
"""Commit cycle execution record."""
__tablename__ = "cycles"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
daemon_id: Mapped[str] = mapped_column(String(16), index=True)
started_at: Mapped[datetime] = mapped_column(DateTime, index=True)
completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
duration_seconds: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
repos_processed: Mapped[int] = mapped_column(Integer, default=0)
repos_committed: Mapped[int] = mapped_column(Integer, default=0)
repos_failed: Mapped[int] = mapped_column(Integer, default=0)
repos_skipped: Mapped[int] = mapped_column(Integer, default=0)
success: Mapped[bool] = mapped_column(Boolean, default=False)
error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
__table_args__ = (
Index("idx_cycle_daemon_started", "daemon_id", "started_at"),
Index("idx_cycle_started", "started_at"),
)
class Error(Base):
"""Error log record."""
__tablename__ = "errors"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
daemon_id: Mapped[str] = mapped_column(String(16), index=True)
repo_name: Mapped[str] = mapped_column(String(255), index=True)
timestamp: Mapped[datetime] = mapped_column(DateTime, index=True)
error_type: Mapped[str] = mapped_column(String(100), index=True)
error_message: Mapped[str] = mapped_column(Text)
traceback: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
cycle_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
__table_args__ = (
Index("idx_error_timestamp", "timestamp"),
Index("idx_error_repo_timestamp", "repo_name", "timestamp"),
Index("idx_error_type", "error_type"),
)
class RepositoryStatus(Base):
"""Repository status tracking."""
__tablename__ = "repository_status"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
repo_name: Mapped[str] = mapped_column(String(255), unique=True, index=True)
daemon_id: Mapped[str] = mapped_column(String(16), index=True)
last_commit_hash: Mapped[Optional[str]] = mapped_column(String(40), nullable=True)
last_commit_timestamp: Mapped[Optional[datetime]] = mapped_column(
DateTime, nullable=True
)
last_success_timestamp: Mapped[Optional[datetime]] = mapped_column(
DateTime, nullable=True, index=True
)
last_error_timestamp: Mapped[Optional[datetime]] = mapped_column(
DateTime, nullable=True
)
total_commits: Mapped[int] = mapped_column(Integer, default=0)
total_errors: Mapped[int] = mapped_column(Integer, default=0)
consecutive_errors: Mapped[int] = mapped_column(Integer, default=0)
extra_data: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
__table_args__ = (Index("idx_repo_status_updated", "updated_at"),)

View file

@ -0,0 +1,306 @@
"""Repository pattern for database operations."""
import logging
from datetime import datetime, timedelta
from typing import List, Optional
from sqlalchemy import select, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from .models import Commit, Cycle, Error, RepositoryStatus
logger = logging.getLogger(__name__)
class CommitRepository:
"""Repository for commit operations."""
@staticmethod
async def create_commit(
session: AsyncSession,
hash: str,
repo_name: str,
message: str,
timestamp: datetime,
daemon_id: Optional[str] = None,
branch: Optional[str] = None,
files_changed: Optional[int] = None,
insertions: Optional[int] = None,
deletions: Optional[int] = None,
) -> Commit:
"""Create a new commit record."""
commit = Commit(
hash=hash,
repo_name=repo_name,
message=message,
timestamp=timestamp,
daemon_id=daemon_id,
branch=branch,
files_changed=files_changed,
insertions=insertions,
deletions=deletions,
)
session.add(commit)
await session.flush()
return commit
@staticmethod
async def get_recent_commits(
session: AsyncSession,
limit: int = 20,
daemon_id: Optional[str] = None,
repo_name: Optional[str] = None,
) -> List[Commit]:
"""Get recent commits, newest first."""
query = select(Commit).order_by(Commit.timestamp.desc()).limit(limit)
if daemon_id:
query = query.where(Commit.daemon_id == daemon_id)
if repo_name:
query = query.where(Commit.repo_name == repo_name)
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def get_commits_by_repo(
session: AsyncSession,
repo_name: str,
limit: int = 10,
) -> List[Commit]:
"""Get commits for a specific repository."""
query = (
select(Commit)
.where(Commit.repo_name == repo_name)
.order_by(Commit.timestamp.desc())
.limit(limit)
)
result = await session.execute(query)
return list(result.scalars().all())
class CycleRepository:
"""Repository for cycle operations."""
@staticmethod
async def create_cycle(
session: AsyncSession,
daemon_id: str,
started_at: datetime,
) -> Cycle:
"""Create a new cycle record."""
cycle = Cycle(
daemon_id=daemon_id,
started_at=started_at,
)
session.add(cycle)
await session.flush()
return cycle
@staticmethod
async def update_cycle(
session: AsyncSession,
cycle: Cycle,
completed_at: datetime,
repos_processed: int,
repos_committed: int,
repos_failed: int,
repos_skipped: int,
success: bool,
error_message: Optional[str] = None,
) -> Cycle:
"""Update cycle with completion data."""
cycle.completed_at = completed_at
cycle.duration_seconds = (completed_at - cycle.started_at).total_seconds()
cycle.repos_processed = repos_processed
cycle.repos_committed = repos_committed
cycle.repos_failed = repos_failed
cycle.repos_skipped = repos_skipped
cycle.success = success
cycle.error_message = error_message
await session.flush()
return cycle
@staticmethod
async def get_recent_cycles(
session: AsyncSession,
limit: int = 10,
daemon_id: Optional[str] = None,
) -> List[Cycle]:
"""Get recent cycles, newest first."""
query = select(Cycle).order_by(Cycle.started_at.desc()).limit(limit)
if daemon_id:
query = query.where(Cycle.daemon_id == daemon_id)
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def get_last_successful_cycle(
session: AsyncSession,
daemon_id: str,
) -> Optional[Cycle]:
"""Get the last successful cycle."""
query = (
select(Cycle)
.where(and_(Cycle.daemon_id == daemon_id, Cycle.success == True))
.order_by(Cycle.started_at.desc())
.limit(1)
)
result = await session.execute(query)
return result.scalar_one_or_none()
class ErrorRepository:
"""Repository for error operations."""
@staticmethod
async def create_error(
session: AsyncSession,
daemon_id: str,
repo_name: str,
error_type: str,
error_message: str,
traceback: Optional[str] = None,
cycle_id: Optional[int] = None,
) -> Error:
"""Create a new error record."""
error = Error(
daemon_id=daemon_id,
repo_name=repo_name,
timestamp=datetime.utcnow(),
error_type=error_type,
error_message=error_message,
traceback=traceback,
cycle_id=cycle_id,
)
session.add(error)
await session.flush()
return error
@staticmethod
async def get_recent_errors(
session: AsyncSession,
limit: int = 20,
repo_name: Optional[str] = None,
) -> List[Error]:
"""Get recent errors, newest first."""
query = select(Error).order_by(Error.timestamp.desc()).limit(limit)
if repo_name:
query = query.where(Error.repo_name == repo_name)
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def get_error_count_by_repo(
session: AsyncSession,
repo_name: str,
since: Optional[datetime] = None,
) -> int:
"""Get error count for a repository."""
query = select(func.count(Error.id)).where(Error.repo_name == repo_name)
if since:
query = query.where(Error.timestamp >= since)
result = await session.execute(query)
return result.scalar() or 0
@staticmethod
async def categorize_errors(
session: AsyncSession,
daemon_id: str,
) -> dict[str, int]:
"""Get error counts by type."""
query = (
select(Error.error_type, func.count(Error.id))
.where(Error.daemon_id == daemon_id)
.group_by(Error.error_type)
)
result = await session.execute(query)
return {error_type: count for error_type, count in result.all()}
class RepositoryStatusRepository:
"""Repository for repository status operations."""
@staticmethod
async def upsert_status(
session: AsyncSession,
repo_name: str,
daemon_id: str,
last_commit_hash: Optional[str] = None,
last_commit_timestamp: Optional[datetime] = None,
last_success_timestamp: Optional[datetime] = None,
last_error_timestamp: Optional[datetime] = None,
increment_commits: int = 0,
increment_errors: int = 0,
reset_consecutive_errors: bool = False,
) -> RepositoryStatus:
"""Update or create repository status."""
# Try to get existing status
query = select(RepositoryStatus).where(RepositoryStatus.repo_name == repo_name)
result = await session.execute(query)
status = result.scalar_one_or_none()
if status is None:
# Create new status
status = RepositoryStatus(
repo_name=repo_name,
daemon_id=daemon_id,
total_commits=increment_commits,
total_errors=increment_errors,
)
session.add(status)
else:
# Update existing status
status.daemon_id = daemon_id
status.total_commits += increment_commits
status.total_errors += increment_errors
# Update fields
if last_commit_hash:
status.last_commit_hash = last_commit_hash
if last_commit_timestamp:
status.last_commit_timestamp = last_commit_timestamp
if last_success_timestamp:
status.last_success_timestamp = last_success_timestamp
if last_error_timestamp:
status.last_error_timestamp = last_error_timestamp
if reset_consecutive_errors:
status.consecutive_errors = 0
elif increment_errors > 0:
status.consecutive_errors = (status.consecutive_errors or 0) + increment_errors
status.updated_at = datetime.utcnow()
await session.flush()
return status
@staticmethod
async def get_status(
session: AsyncSession,
repo_name: str,
) -> Optional[RepositoryStatus]:
"""Get repository status."""
query = select(RepositoryStatus).where(RepositoryStatus.repo_name == repo_name)
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_all_statuses(
session: AsyncSession,
daemon_id: Optional[str] = None,
) -> List[RepositoryStatus]:
"""Get all repository statuses."""
query = select(RepositoryStatus).order_by(RepositoryStatus.repo_name)
if daemon_id:
query = query.where(RepositoryStatus.daemon_id == daemon_id)
result = await session.execute(query)
return list(result.scalars().all())

View file

@ -28,6 +28,7 @@ class CommitDaemon:
settings: AutoCommitSettings,
llm_client: LlamaCommitClient,
error_handler: ErrorHandler | None = None,
db_manager: "DatabaseManager | None" = None,
):
"""Initialize the daemon.
@ -35,10 +36,12 @@ class CommitDaemon:
settings: Service settings
llm_client: LLM client for message generation
error_handler: Optional error handler for recovery
db_manager: Database manager for persistent storage
"""
self.settings = settings
self.llm_client = llm_client
self.error_handler = error_handler
self.db_manager = db_manager
# Initialize service manager
if settings.llama_service_autostart:
@ -69,12 +72,15 @@ class CommitDaemon:
self._cycle_history: deque[CycleResult] = deque(maxlen=100)
self._total_cycles = 0
# Persistent commit history (survives reboots)
# Persistent commit history (legacy JSON, will be replaced by database)
self.commit_history = CommitHistory(
file_path=settings.commit_history_file,
max_size=settings.commit_history_max_per_daemon * 10, # Global pool for all daemons
)
# Generate unique daemon ID for database tracking
self.daemon_id = str(uuid.uuid4())[:8]
# State
self._running = False
self._enabled = settings.enabled