feat(git): Add auto-commit service with Git operations (commits, pushes, submodules) and database-backed state tracking

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-04-17 21:20:13 -07:00
parent 1cd8aa6a07
commit 6358be5902
12 changed files with 722 additions and 355 deletions

View file

@ -2,9 +2,10 @@
import asyncio
import logging
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from pathlib import Path
from typing import AsyncIterator
from typing import Any, AsyncIterator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
@ -44,7 +45,7 @@ class DatabaseManager:
self.database_url = f"sqlite+aiosqlite:///{self.db_path}"
self.engine: AsyncEngine | None = None
self.session_factory: async_sessionmaker[AsyncSession] | None = None
self._prune_task: asyncio.Task | None = None
self._prune_task: asyncio.Task[Any] | None = None
async def connect(self) -> None:
"""Initialize database connection and create tables."""
@ -66,6 +67,11 @@ class DatabaseManager:
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Apply schema migrations to add any missing columns
from .migrate import migrate_schema
await migrate_schema(self.engine)
logger.info("Database initialized: %s", self.database_url)
# Start background pruning task
@ -86,6 +92,7 @@ class DatabaseManager:
logger.info("Database connection closed")
@asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]:
"""Get database session context manager.
@ -139,27 +146,28 @@ class DatabaseManager:
text("DELETE FROM commits WHERE timestamp < :cutoff"),
{"cutoff": cutoff_date},
)
deleted["commits"] = result.rowcount or 0
deleted["commits"] = result.rowcount or 0 # type: ignore[attr-defined]
# Delete old cycles
result = await session.execute(
text("DELETE FROM cycles WHERE started_at < :cutoff"),
{"cutoff": cutoff_date},
)
deleted["cycles"] = result.rowcount or 0
deleted["cycles"] = result.rowcount or 0 # type: ignore[attr-defined]
# Delete old errors
result = await session.execute(
text("DELETE FROM errors WHERE timestamp < :cutoff"),
{"cutoff": cutoff_date},
)
deleted["errors"] = result.rowcount or 0
deleted["errors"] = result.rowcount or 0 # type: ignore[attr-defined]
await session.commit()
# Vacuum to reclaim space (SQLite)
async with self.engine.connect() as conn:
await conn.execute(text("VACUUM"))
if self.engine:
async with self.engine.connect() as conn:
await conn.execute(text("VACUUM"))
new_size_mb = await self.get_database_size_mb()
logger.info(

View file

@ -0,0 +1,90 @@
"""Database migration utilities.
Handles schema migrations for the auto-commit service database.
"""
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
logger = logging.getLogger(__name__)
async def migrate_schema(engine: AsyncEngine) -> None:
"""Apply schema migrations to bring database up to date.
Args:
engine: SQLAlchemy async engine
"""
async with engine.begin() as conn:
# Migration 001: Add monorepo tracking fields to commits table
await _add_monorepo_fields(conn)
# Migration 002: Add hostname tracking to commits, cycles, errors
await _add_hostname_field(conn)
logger.info("Schema migrations completed successfully")
async def _add_monorepo_fields(conn) -> None:
"""Add monorepo tracking fields to commits table.
Adds:
- is_monorepo_parent (BOOLEAN)
- submodule_count (INTEGER)
- submodule_metadata (JSON)
"""
# Check if columns already exist
result = await conn.execute(text("PRAGMA table_info(commits)"))
existing_columns = {row[1] for row in result}
migrations_needed = []
if "is_monorepo_parent" not in existing_columns:
migrations_needed.append(
"ALTER TABLE commits ADD COLUMN is_monorepo_parent BOOLEAN NOT NULL DEFAULT 0"
)
if "submodule_count" not in existing_columns:
migrations_needed.append(
"ALTER TABLE commits ADD COLUMN submodule_count INTEGER NOT NULL DEFAULT 0"
)
if "submodule_metadata" not in existing_columns:
migrations_needed.append(
"ALTER TABLE commits ADD COLUMN submodule_metadata JSON"
)
if migrations_needed:
logger.info("Adding monorepo tracking fields to commits table")
for migration in migrations_needed:
await conn.execute(text(migration))
logger.info(f"Applied: {migration}")
else:
logger.debug("Monorepo fields already exist in commits table")
async def _add_hostname_field(conn) -> None:
"""Add hostname field to commits, cycles, and errors tables.
Tracks which host produced each commit/cycle/error for multi-host setups.
"""
tables = ["commits", "cycles", "errors"]
for table in tables:
result = await conn.execute(text(f"PRAGMA table_info({table})"))
existing_columns = {row[1] for row in result}
if "hostname" not in existing_columns:
await conn.execute(
text(f"ALTER TABLE {table} ADD COLUMN hostname VARCHAR(255)")
)
logger.info(f"Added hostname column to {table}")
# Create index
await conn.execute(
text(f"CREATE INDEX IF NOT EXISTS idx_{table}_hostname ON {table}(hostname)")
)
logger.info(f"Created hostname index on {table}")
else:
logger.debug(f"hostname column already exists in {table}")

View file

@ -1,7 +1,7 @@
"""SQLAlchemy models for auto-commit database."""
from datetime import datetime
from typing import Optional
from typing import Any, Optional
from sqlalchemy import Index, String, Text, Integer, Float, Boolean, DateTime, JSON
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
@ -24,17 +24,54 @@ class Commit(Base):
message: Mapped[str] = mapped_column(Text)
timestamp: Mapped[datetime] = mapped_column(DateTime, index=True)
daemon_id: Mapped[Optional[str]] = mapped_column(String(16), nullable=True)
hostname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True)
branch: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
files_changed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
insertions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
deletions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# Monorepo/submodule tracking fields
is_monorepo_parent: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
submodule_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
submodule_metadata: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
__table_args__ = (
Index("idx_commit_timestamp", "timestamp"),
Index("idx_commit_repo_timestamp", "repo_name", "timestamp"),
)
class SubmoduleCommit(Base):
"""Submodule commit record for monorepo aggregations.
Tracks individual submodule commits that are aggregated into
parent monorepo commits. Enables drill-down from aggregated
commit messages like "Monorepo: 2 commit(s)" to see the actual
commits.
"""
__tablename__ = "submodule_commits"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
parent_commit_hash: Mapped[str] = mapped_column(String(40), index=True)
submodule_name: Mapped[str] = mapped_column(String(255), index=True)
commit_hash: Mapped[str] = mapped_column(String(40))
commit_message: Mapped[str] = mapped_column(Text)
author: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
timestamp: Mapped[datetime] = mapped_column(DateTime, index=True)
files_changed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
insertions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
deletions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index("idx_submodule_commit_parent", "parent_commit_hash"),
Index("idx_submodule_commit_name", "submodule_name"),
Index("idx_submodule_commit_timestamp", "timestamp"),
Index("idx_submodule_commit_parent_name", "parent_commit_hash", "submodule_name"),
)
class Cycle(Base):
"""Commit cycle execution record."""
@ -42,6 +79,7 @@ class Cycle(Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
daemon_id: Mapped[str] = mapped_column(String(16), index=True)
hostname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True)
started_at: Mapped[datetime] = mapped_column(DateTime, index=True)
completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
duration_seconds: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
@ -65,6 +103,7 @@ class Error(Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
daemon_id: Mapped[str] = mapped_column(String(16), index=True)
hostname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True)
repo_name: Mapped[str] = mapped_column(String(255), index=True)
timestamp: Mapped[datetime] = mapped_column(DateTime, index=True)
error_type: Mapped[str] = mapped_column(String(100), index=True)
@ -79,6 +118,20 @@ class Error(Base):
)
class Deployment(Base):
"""Deployment/upgrade tracking."""
__tablename__ = "deployments"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
timestamp: Mapped[datetime] = mapped_column(DateTime, index=True)
version: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
commit_hash: Mapped[Optional[str]] = mapped_column(String(40), nullable=True)
notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
__table_args__ = (Index("idx_deployment_timestamp", "timestamp"),)
class RepositoryStatus(Base):
"""Repository status tracking."""
@ -100,7 +153,7 @@ class RepositoryStatus(Base):
total_commits: Mapped[int] = mapped_column(Integer, default=0)
total_errors: Mapped[int] = mapped_column(Integer, default=0)
consecutive_errors: Mapped[int] = mapped_column(Integer, default=0)
extra_data: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
extra_data: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)

View file

@ -7,7 +7,7 @@ from typing import List, Optional
from sqlalchemy import select, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from .models import Commit, Cycle, Error, RepositoryStatus
from .models import Commit, Cycle, Deployment, Error, RepositoryStatus
logger = logging.getLogger(__name__)
@ -27,6 +27,7 @@ class CommitRepository:
files_changed: Optional[int] = None,
insertions: Optional[int] = None,
deletions: Optional[int] = None,
hostname: Optional[str] = None,
) -> Commit:
"""Create a new commit record."""
commit = Commit(
@ -35,6 +36,7 @@ class CommitRepository:
message=message,
timestamp=timestamp,
daemon_id=daemon_id,
hostname=hostname,
branch=branch,
files_changed=files_changed,
insertions=insertions,
@ -50,6 +52,7 @@ class CommitRepository:
limit: int = 20,
daemon_id: Optional[str] = None,
repo_name: Optional[str] = None,
hostname: Optional[str] = None,
) -> List[Commit]:
"""Get recent commits, newest first."""
query = select(Commit).order_by(Commit.timestamp.desc()).limit(limit)
@ -58,6 +61,8 @@ class CommitRepository:
query = query.where(Commit.daemon_id == daemon_id)
if repo_name:
query = query.where(Commit.repo_name == repo_name)
if hostname:
query = query.where(Commit.hostname == hostname)
result = await session.execute(query)
return list(result.scalars().all())
@ -304,3 +309,81 @@ class RepositoryStatusRepository:
result = await session.execute(query)
return list(result.scalars().all())
class DeploymentRepository:
"""Repository for deployment/upgrade tracking."""
@staticmethod
async def record_deployment(
session: AsyncSession,
version: Optional[str] = None,
commit_hash: Optional[str] = None,
notes: Optional[str] = None,
) -> Deployment:
"""Record a new deployment/upgrade."""
deployment = Deployment(
timestamp=datetime.utcnow(),
version=version,
commit_hash=commit_hash,
notes=notes,
)
session.add(deployment)
await session.flush()
return deployment
@staticmethod
async def get_last_deployment(
session: AsyncSession,
) -> Optional[Deployment]:
"""Get the most recent deployment."""
query = (
select(Deployment)
.order_by(Deployment.timestamp.desc())
.limit(1)
)
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_deployments(
session: AsyncSession,
limit: int = 10,
) -> List[Deployment]:
"""Get recent deployments."""
query = (
select(Deployment)
.order_by(Deployment.timestamp.desc())
.limit(limit)
)
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def get_commits_since_deployment(
session: AsyncSession,
deployment_id: Optional[int] = None,
) -> List[Commit]:
"""Get all commits since the specified deployment (or last deployment if not specified)."""
# Get deployment timestamp
if deployment_id:
deployment_query = select(Deployment).where(Deployment.id == deployment_id)
deployment_result = await session.execute(deployment_query)
deployment = deployment_result.scalar_one_or_none()
else:
deployment = await DeploymentRepository.get_last_deployment(session)
if not deployment:
# No deployment recorded, return all commits
commits_query = select(Commit).order_by(Commit.timestamp.desc())
commits_result = await session.execute(commits_query)
return list(commits_result.scalars().all())
# Get commits since deployment
commits_query = (
select(Commit)
.where(Commit.timestamp >= deployment.timestamp)
.order_by(Commit.timestamp.desc())
)
commits_result = await session.execute(commits_query)
return list(commits_result.scalars().all())

View file

@ -13,6 +13,17 @@ from .operations import (
)
from .diff_parser import summarize_diff, DiffSummary
from .discovery import discover_git_repos
from .submodules import (
SubmoduleInfo,
SubmoduleCommitResult,
has_submodules,
parse_gitmodules,
get_submodule_status,
get_dirty_submodules,
get_submodule_head,
stage_submodule_reference,
get_submodule_remote_and_branch,
)
__all__ = [
"Repository",
@ -27,4 +38,14 @@ __all__ = [
"summarize_diff",
"DiffSummary",
"discover_git_repos",
# Submodule support
"SubmoduleInfo",
"SubmoduleCommitResult",
"has_submodules",
"parse_gitmodules",
"get_submodule_status",
"get_dirty_submodules",
"get_submodule_head",
"stage_submodule_reference",
"get_submodule_remote_and_branch",
]

View file

@ -9,6 +9,13 @@ from .repository import GitStatus, CommitResult, PushResult
logger = logging.getLogger(__name__)
def unquote_git_path(path: str) -> str:
"""Strip git's porcelain quoting from paths with spaces/special chars."""
if path.startswith('"') and path.endswith('"'):
return path[1:-1]
return path
class GitError(Exception):
"""Base exception for git operations."""
@ -118,7 +125,7 @@ async def git_status(repo_path: Path) -> GitStatus:
if len(line) >= 3:
index_status = line[0]
worktree_status = line[1]
filepath = line[3:]
filepath = unquote_git_path(line[3:])
if index_status == "?" and worktree_status == "?":
untracked.append(filepath)

View file

@ -26,8 +26,12 @@ class Repository:
@property
def exists(self) -> bool:
"""Check if the repository exists."""
return self.path.exists() and self.git_dir.exists()
"""Check if the repository exists.
Supports both regular repos (.git is a directory) and
submodules (.git is a gitlink file).
"""
return self.path.exists() and (self.git_dir.is_dir() or self.git_dir.is_file())
def __str__(self) -> str:
return f"Repository({self.name} @ {self.path})"

View file

@ -0,0 +1,212 @@
"""Submodule detection and status querying for monorepo support.
Provides primitives for detecting git submodules, checking their dirty state,
and staging submodule reference updates in parent repositories.
"""
import configparser
import logging
from dataclasses import dataclass, field
from pathlib import Path
from .operations import _run_git_command, unquote_git_path
logger = logging.getLogger(__name__)
@dataclass
class SubmoduleInfo:
"""Information about a git submodule."""
name: str
path: str
url: str
current_hash: str | None = None
is_dirty: bool = False
has_untracked: bool = False
branch: str | None = None
@dataclass
class SubmoduleCommitResult:
"""Result of committing within a submodule."""
submodule_name: str
submodule_path: str
success: bool
commit_hashes: list[str] = field(default_factory=list)
push_success: bool = False
error: str | None = None
new_hash: str | None = None
def has_submodules(repo_path: Path) -> bool:
"""Check if a repository uses submodules.
Quick synchronous check for .gitmodules file existence.
"""
return (repo_path / ".gitmodules").is_file()
def parse_gitmodules(repo_path: Path) -> list[SubmoduleInfo]:
"""Parse .gitmodules file to extract submodule definitions.
Uses configparser since .gitmodules follows INI-like format:
[submodule "name"]
path = relative/path
url = git@host:org/repo.git
branch = main
"""
gitmodules_path = repo_path / ".gitmodules"
if not gitmodules_path.exists():
return []
config = configparser.ConfigParser()
config.read(str(gitmodules_path))
submodules: list[SubmoduleInfo] = []
for section in config.sections():
if section.startswith('submodule "') and section.endswith('"'):
name = section[len('submodule "'):-1]
elif section.startswith("submodule "):
name = section[len("submodule "):]
else:
continue
path = config.get(section, "path", fallback=name)
url = config.get(section, "url", fallback="")
branch = config.get(section, "branch", fallback=None)
submodules.append(SubmoduleInfo(
name=name,
path=path,
url=url,
branch=branch,
))
return submodules
async def get_submodule_status(repo_path: Path) -> list[SubmoduleInfo]:
"""Get status of all submodules including dirty state.
Combines `git submodule status` (parent perspective) with
`git status --porcelain` (internal check) to determine which
submodules have uncommitted changes.
"""
submodules = parse_gitmodules(repo_path)
if not submodules:
return []
# Get current hash and modification status from parent perspective
stdout, _, returncode = await _run_git_command(
"submodule", "status", cwd=repo_path, check=False
)
# Parse output lines:
# " <hash> <path> (<describe>)" → clean
# "+<hash> <path> (<describe>)" → different commit than parent tracks
# "-<hash> <path>" → not initialized
hash_map: dict[str, tuple[str, bool]] = {}
if returncode == 0 and stdout:
for line in stdout.splitlines():
if not line.strip():
continue
dirty_marker = line[0] if line else " "
parts = line[1:].strip().split(None, 2)
if len(parts) >= 2:
commit_hash = parts[0]
sm_path = parts[1]
hash_map[sm_path] = (commit_hash, dirty_marker == "+")
# Check parent's porcelain status for modified submodule paths
status_stdout, _, _ = await _run_git_command(
"status", "--porcelain", cwd=repo_path, check=False
)
dirty_paths: set[str] = set()
if status_stdout:
for line in status_stdout.splitlines():
if not line or len(line) < 3:
continue
filepath = unquote_git_path(line[3:].strip())
for sm in submodules:
if filepath == sm.path or filepath.startswith(sm.path + "/"):
dirty_paths.add(sm.path)
# Populate each submodule's status
for sm in submodules:
sm_abs_path = repo_path / sm.path
if not sm_abs_path.exists():
continue
if sm.path in hash_map:
sm.current_hash, _ = hash_map[sm.path]
if sm.path in dirty_paths:
sm.is_dirty = True
else:
# Check internally for uncommitted changes
internal_stdout, _, rc = await _run_git_command(
"status", "--porcelain", cwd=sm_abs_path, check=False
)
if rc == 0 and internal_stdout.strip():
sm.is_dirty = True
sm.has_untracked = "??" in internal_stdout
return submodules
async def get_dirty_submodules(repo_path: Path) -> list[SubmoduleInfo]:
"""Get only submodules that have uncommitted changes."""
all_subs = await get_submodule_status(repo_path)
return [sm for sm in all_subs if sm.is_dirty]
async def get_submodule_head(repo_path: Path, submodule_path: str) -> str | None:
"""Get the current HEAD commit hash of a submodule."""
sm_abs = repo_path / submodule_path
stdout, _, rc = await _run_git_command(
"rev-parse", "HEAD", cwd=sm_abs, check=False
)
return stdout.strip() if rc == 0 and stdout else None
async def stage_submodule_reference(repo_path: Path, submodule_path: str) -> None:
"""Stage a submodule reference update in the parent repo.
Equivalent to `git add <submodule_path>` in the parent.
"""
await _run_git_command("add", submodule_path, cwd=repo_path)
async def get_submodule_remote_and_branch(
repo_path: Path, submodule_path: str
) -> tuple[str, str]:
"""Get the remote and branch for a submodule.
Checks the submodule's own git config, falling back to 'origin' and 'main'.
"""
sm_abs = repo_path / submodule_path
# Get remote name
remote_stdout, _, rc = await _run_git_command(
"remote", cwd=sm_abs, check=False
)
remote = (
remote_stdout.strip().split("\n")[0]
if (rc == 0 and remote_stdout.strip())
else "origin"
)
# Get current branch
branch_stdout, _, rc = await _run_git_command(
"symbolic-ref", "--short", "HEAD", cwd=sm_abs, check=False
)
branch = (
branch_stdout.strip()
if (rc == 0 and branch_stdout.strip())
else "main"
)
return remote, branch

View file

@ -1,15 +0,0 @@
"""Service lifecycle management."""
from .manager import (
LlamaServiceManager,
ServiceHealth,
ServiceManagerError,
ServiceStartError,
)
__all__ = [
"LlamaServiceManager",
"ServiceHealth",
"ServiceManagerError",
"ServiceStartError",
]

View file

@ -1,327 +0,0 @@
"""Llama service lifecycle manager."""
import asyncio
import fcntl
import logging
import os
import signal
import sys
import time
from enum import Enum
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
class ServiceManagerError(Exception):
"""Base exception for service manager errors."""
class ServiceStartError(ServiceManagerError):
"""Failed to start service."""
class ServiceHealth(str, Enum):
"""Service health status."""
HEALTHY = "healthy"
CRASHED = "crashed"
UNREACHABLE = "unreachable"
class LlamaServiceManager:
"""Manages llama service lifecycle as subprocess."""
def __init__(
self,
service_url: str = "http://localhost:8000",
pid_file: Path | None = None,
lock_file: Path | None = None,
startup_timeout: float = 30.0,
health_check_timeout: float = 5.0,
model_id: str | None = None,
use_model_boss: bool = False,
):
self.service_url = service_url
self._pid_file = pid_file or Path.home() / ".config/commits/llama-service.pid"
self._lock_file = lock_file or Path.home() / ".config/commits/llama-service.lock"
self._startup_timeout = startup_timeout
self._health_check_timeout = health_check_timeout
self._model_id = model_id
self._use_model_boss = use_model_boss
self._spawned_pid: int | None = None
self._lock_fd: int | None = None
self._resolved_model_path: str | None = None
async def ensure_service_available(self) -> bool:
"""Ensure service is available, starting if necessary."""
health = await self.check_health()
if health == ServiceHealth.HEALTHY:
return True
if health == ServiceHealth.CRASHED:
logger.info("Detected crashed service (stale PID), cleaning up...")
self._cleanup_pid_file()
logger.info("Llama service unreachable, attempting to start...")
try:
if self._use_model_boss and self._model_id:
await self._resolve_model_path()
return await self.start_service()
except ServiceStartError as e:
logger.error(f"Failed to start llama-service: {e}")
return False
async def _resolve_model_path(self) -> None:
"""Resolve model ID to path via model-boss."""
try:
from lilith_model_boss import ensure_model
if self._model_id and not self._resolved_model_path:
logger.info(f"Resolving model via model-boss: {self._model_id}")
self._resolved_model_path = ensure_model(self._model_id)
logger.info(f"Resolved model path: {self._resolved_model_path}")
except ImportError:
raise ServiceStartError("model-boss not installed")
except Exception as e:
raise ServiceStartError(f"Failed to resolve model path: {e}")
async def start_service(self) -> bool:
"""Start llama service subprocess."""
pid = self._read_pid_file()
if pid and self._is_process_alive(pid):
logger.info(f"Service already running (PID: {pid})")
return True
if not self._acquire_lock():
await asyncio.sleep(2)
pid = self._read_pid_file()
if pid and self._is_process_alive(pid):
return True
return False
try:
pid = self._read_pid_file()
if pid and self._is_process_alive(pid):
return True
logger.info("Starting llama service subprocess...")
process = await self._spawn_service()
self._spawned_pid = process.pid
self._write_pid_file(process.pid)
logger.info(f"Llama service started (PID: {process.pid})")
if await self._wait_for_healthy(self._startup_timeout):
logger.info("✓ Llama service is healthy")
return True
else:
logger.error(f"✗ Service failed to start within {self._startup_timeout}s")
return False
except Exception as e:
logger.exception(f"Failed to start llama service: {e}")
return False
finally:
self._release_lock()
async def check_health(self) -> ServiceHealth:
"""Check service health."""
pid = self._read_pid_file()
if pid and not self._is_process_alive(pid):
return ServiceHealth.CRASHED
try:
async with httpx.AsyncClient(timeout=self._health_check_timeout) as client:
response = await client.get(f"{self.service_url}/health")
if response.status_code == 200:
data = response.json()
return ServiceHealth.HEALTHY if data.get("status") == "ok" else ServiceHealth.UNREACHABLE
return ServiceHealth.UNREACHABLE
except (httpx.ConnectError, httpx.TimeoutException):
return ServiceHealth.UNREACHABLE
except Exception:
return ServiceHealth.UNREACHABLE
async def stop_service(self) -> None:
"""Gracefully stop service if we own it."""
if self._spawned_pid is None or not self._is_process_alive(self._spawned_pid):
return
logger.info(f"Stopping llama service (PID: {self._spawned_pid})...")
try:
os.kill(self._spawned_pid, signal.SIGTERM)
for _ in range(10):
if not self._is_process_alive(self._spawned_pid):
self._cleanup_pid_file()
return
await asyncio.sleep(0.5)
os.kill(self._spawned_pid, signal.SIGKILL)
self._cleanup_pid_file()
except ProcessLookupError:
self._cleanup_pid_file()
except Exception as e:
logger.exception(f"Error stopping service: {e}")
def _acquire_lock(self) -> bool:
self._lock_file.parent.mkdir(parents=True, exist_ok=True)
try:
self._lock_fd = os.open(self._lock_file, os.O_CREAT | os.O_WRONLY)
fcntl.flock(self._lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except BlockingIOError:
return False
def _release_lock(self) -> None:
if self._lock_fd is not None:
try:
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
os.close(self._lock_fd)
self._lock_fd = None
except Exception:
pass
def _read_pid_file(self) -> int | None:
if not self._pid_file.exists():
return None
try:
content = self._pid_file.read_text().strip()
return int(content) if content else None
except Exception:
return None
def _write_pid_file(self, pid: int) -> None:
self._pid_file.parent.mkdir(parents=True, exist_ok=True)
self._pid_file.write_text(str(pid))
def _cleanup_pid_file(self) -> None:
try:
if self._pid_file.exists():
self._pid_file.unlink()
except Exception:
pass
def _is_process_alive(self, pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except OSError:
return False
def _find_default_model(self) -> str | None:
"""Find a suitable model in standard cache locations."""
cache_dir = Path.home() / ".cache" / "models"
if not cache_dir.exists():
return None
preferred = [
"qwen2.5-1.5b-instruct-q4_k_m.gguf",
"Ministral-3-3B-Instruct-2512-Q8_0.gguf",
]
for name in preferred:
path = cache_dir / name
if path.exists():
return str(path)
for gguf in cache_dir.glob("*.gguf"):
if gguf.stat().st_size < 5 * 1024 * 1024 * 1024:
return str(gguf)
return None
def _find_python_with_module(self, module_name: str) -> str:
"""Find a Python executable that can import the given module.
Tries multiple Python paths to find one with access to user-installed packages.
This handles the case where sys.executable is /usr/bin/python (system Python)
but packages are installed in user site-packages.
"""
import shutil
import subprocess
candidates = [
str(Path.home() / ".local" / "bin" / "python"),
str(Path.home() / ".local" / "bin" / "python3"),
shutil.which("python3"),
shutil.which("python"),
sys.executable,
]
for python_path in candidates:
if not python_path or not Path(python_path).exists():
continue
try:
result = subprocess.run(
[python_path, "-c", f"import {module_name}"],
capture_output=True,
timeout=5,
)
if result.returncode == 0:
logger.info(f"Found Python with {module_name}: {python_path}")
return python_path
except (subprocess.TimeoutExpired, FileNotFoundError, PermissionError):
continue
logger.warning(f"Could not find Python with {module_name}, using {sys.executable}")
return sys.executable
async def _spawn_service(self) -> asyncio.subprocess.Process:
"""Spawn service subprocess."""
python_path = self._find_python_with_module("llama_http")
cmd = [python_path, "-m", "llama_http"]
env = os.environ.copy()
model_path = self._resolved_model_path or env.get("LLAMA_SERVICE_MODEL_PATH")
if not model_path:
model_path = self._find_default_model()
if not model_path:
raise ServiceStartError(
"No model found. Either:\n"
" 1. Set LLAMA_SERVICE_MODEL_PATH\n"
" 2. Place a GGUF model in ~/.cache/models/"
)
# Configure environment for llama-http service
# Note: llama-http uses LLAMA_HTTP_ prefix for env vars
env["LLAMA_HTTP_MODEL_PATH"] = model_path
# Extract port from service_url and configure llama-http to use it
from urllib.parse import urlparse
port = urlparse(self.service_url).port or 8000
env["LLAMA_HTTP_PORT"] = str(port)
logger.info(f"Using model: {model_path}, port: {port}")
log_file = self._pid_file.parent / "llama-service.log"
log_file.parent.mkdir(parents=True, exist_ok=True)
with open(log_file, "a") as log:
log.write(f"\n=== Service started at {time.ctime()} ===\n")
log.write(f"Model: {model_path}\n")
process = await asyncio.create_subprocess_exec(
*cmd,
env=env,
stdout=log,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
return process
async def _wait_for_healthy(self, timeout: float) -> bool:
start = time.time()
while time.time() - start < timeout:
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(f"{self.service_url}/health")
if response.status_code == 200:
data = response.json()
if data.get("status") == "ok":
return True
except Exception:
pass
await asyncio.sleep(1)
return False

View file

@ -0,0 +1,5 @@
"""Services for auto-commit system."""
from .submodule_commit_service import SubmoduleCommitService
__all__ = ["SubmoduleCommitService"]

View file

@ -0,0 +1,226 @@
"""Service for persisting and retrieving submodule commit details."""
import logging
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ..activity_log import ActionType, ActivityLogger
from ..database.models import SubmoduleCommit
from ..git.submodules import SubmoduleCommitResult
logger = logging.getLogger(__name__)
@dataclass
class CommitStats:
"""Statistics about a commit."""
files: int
insertions: int
deletions: int
@dataclass
class CommitDetails:
"""Detailed information about a single commit."""
hash: str
author: str
email: str
timestamp: datetime
message: str
stats: CommitStats
class SubmoduleCommitService:
"""Service for persisting and retrieving submodule commit details.
Captures individual commit information from submodule processing so it can
be displayed when viewing monorepo aggregated commits.
"""
def __init__(self, activity_log: ActivityLogger) -> None:
"""Initialize submodule commit service.
Args:
activity_log: Activity logger for audit trail
"""
self.activity_log = activity_log
async def persist_submodule_commits(
self,
session: AsyncSession,
parent_commit_hash: str,
submodule_results: list[SubmoduleCommitResult],
) -> int:
"""Persist submodule commits to database and activity log.
Args:
session: Database session
parent_commit_hash: Hash of parent monorepo commit
submodule_results: List of submodule commit results
Returns:
Number of submodule commits persisted
"""
persisted_count = 0
for result in submodule_results:
if not result.success or not result.commit_hashes:
continue
for commit_hash in result.commit_hashes:
try:
# Get commit details from git
details = await self._get_commit_details(
submodule_path=result.submodule_path,
commit_hash=commit_hash,
)
# Create database record
submodule_commit = SubmoduleCommit(
parent_commit_hash=parent_commit_hash,
submodule_name=result.submodule_name,
commit_hash=details.hash,
commit_message=details.message,
author=details.author,
timestamp=details.timestamp,
files_changed=details.stats.files,
insertions=details.stats.insertions,
deletions=details.stats.deletions,
)
session.add(submodule_commit)
persisted_count += 1
# Activity log for audit trail
self.activity_log.log(
ActionType.SUBMODULE_COMMIT,
{
"parent_commit": parent_commit_hash,
"submodule": result.submodule_name,
"hash": details.hash,
"message": details.message,
"author": details.author,
"stats": {
"files": details.stats.files,
"insertions": details.stats.insertions,
"deletions": details.stats.deletions,
},
},
)
except Exception as e:
logger.error(
f"Failed to persist submodule commit {commit_hash} "
f"for {result.submodule_name}: {e}"
)
await session.flush()
return persisted_count
async def get_submodule_commits(
self, session: AsyncSession, parent_commit_hash: str
) -> list[SubmoduleCommit]:
"""Retrieve submodule commits for a parent commit.
Args:
session: Database session
parent_commit_hash: Hash of parent monorepo commit
Returns:
List of submodule commits ordered by submodule name and timestamp
"""
result = await session.execute(
select(SubmoduleCommit)
.where(SubmoduleCommit.parent_commit_hash == parent_commit_hash)
.order_by(SubmoduleCommit.submodule_name, SubmoduleCommit.timestamp)
)
return list(result.scalars().all())
async def _get_commit_details(
self, submodule_path: str, commit_hash: str
) -> CommitDetails:
"""Extract commit details using git show.
Args:
submodule_path: Path to submodule
commit_hash: Commit hash to inspect
Returns:
CommitDetails with all extracted information
"""
# Resolve absolute path
path = Path(submodule_path).resolve()
# Get commit metadata and stats
result = subprocess.run(
[
"git",
"-C",
str(path),
"show",
commit_hash,
"--format=%H|%an|%ae|%at|%s",
"--shortstat",
],
capture_output=True,
text=True,
check=True,
)
return self._parse_commit_details(result.stdout)
def _parse_commit_details(self, git_output: str) -> CommitDetails:
"""Parse git show output into structured CommitDetails.
Args:
git_output: Output from git show command
Returns:
Parsed CommitDetails
"""
lines = git_output.strip().split("\n")
# First line: hash|author|email|timestamp|subject
metadata = lines[0].split("|", 4)
commit_hash = metadata[0]
author = metadata[1]
email = metadata[2]
timestamp = datetime.fromtimestamp(int(metadata[3]))
subject = metadata[4] if len(metadata) > 4 else ""
# Find shortstat line (e.g., "3 files changed, 45 insertions(+), 12 deletions(-)")
stats = CommitStats(files=0, insertions=0, deletions=0)
for line in lines[1:]:
if "changed" in line or "insertion" in line or "deletion" in line:
# Parse shortstat
files_match = re.search(r"(\d+) files? changed", line)
if files_match:
stats.files = int(files_match.group(1))
insertions_match = re.search(r"(\d+) insertion", line)
if insertions_match:
stats.insertions = int(insertions_match.group(1))
deletions_match = re.search(r"(\d+) deletion", line)
if deletions_match:
stats.deletions = int(deletions_match.group(1))
break
return CommitDetails(
hash=commit_hash,
author=author,
email=email,
timestamp=timestamp,
message=subject,
stats=stats,
)