diff --git a/src/auto_commit_service/database/connection.py b/src/auto_commit_service/database/connection.py index 8b26532..01f8d9d 100644 --- a/src/auto_commit_service/database/connection.py +++ b/src/auto_commit_service/database/connection.py @@ -2,9 +2,10 @@ import asyncio import logging +from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path -from typing import AsyncIterator +from typing import Any, AsyncIterator from sqlalchemy import text from sqlalchemy.ext.asyncio import ( @@ -44,7 +45,7 @@ class DatabaseManager: self.database_url = f"sqlite+aiosqlite:///{self.db_path}" self.engine: AsyncEngine | None = None self.session_factory: async_sessionmaker[AsyncSession] | None = None - self._prune_task: asyncio.Task | None = None + self._prune_task: asyncio.Task[Any] | None = None async def connect(self) -> None: """Initialize database connection and create tables.""" @@ -66,6 +67,11 @@ class DatabaseManager: async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) + # Apply schema migrations to add any missing columns + from .migrate import migrate_schema + + await migrate_schema(self.engine) + logger.info("Database initialized: %s", self.database_url) # Start background pruning task @@ -86,6 +92,7 @@ class DatabaseManager: logger.info("Database connection closed") + @asynccontextmanager async def session(self) -> AsyncIterator[AsyncSession]: """Get database session context manager. @@ -139,27 +146,28 @@ class DatabaseManager: text("DELETE FROM commits WHERE timestamp < :cutoff"), {"cutoff": cutoff_date}, ) - deleted["commits"] = result.rowcount or 0 + deleted["commits"] = result.rowcount or 0 # type: ignore[attr-defined] # Delete old cycles result = await session.execute( text("DELETE FROM cycles WHERE started_at < :cutoff"), {"cutoff": cutoff_date}, ) - deleted["cycles"] = result.rowcount or 0 + deleted["cycles"] = result.rowcount or 0 # type: ignore[attr-defined] # Delete old errors result = await session.execute( text("DELETE FROM errors WHERE timestamp < :cutoff"), {"cutoff": cutoff_date}, ) - deleted["errors"] = result.rowcount or 0 + deleted["errors"] = result.rowcount or 0 # type: ignore[attr-defined] await session.commit() # Vacuum to reclaim space (SQLite) - async with self.engine.connect() as conn: - await conn.execute(text("VACUUM")) + if self.engine: + async with self.engine.connect() as conn: + await conn.execute(text("VACUUM")) new_size_mb = await self.get_database_size_mb() logger.info( diff --git a/src/auto_commit_service/database/migrate.py b/src/auto_commit_service/database/migrate.py new file mode 100644 index 0000000..112d0af --- /dev/null +++ b/src/auto_commit_service/database/migrate.py @@ -0,0 +1,90 @@ +"""Database migration utilities. + +Handles schema migrations for the auto-commit service database. +""" + +import logging +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +logger = logging.getLogger(__name__) + + +async def migrate_schema(engine: AsyncEngine) -> None: + """Apply schema migrations to bring database up to date. + + Args: + engine: SQLAlchemy async engine + """ + async with engine.begin() as conn: + # Migration 001: Add monorepo tracking fields to commits table + await _add_monorepo_fields(conn) + + # Migration 002: Add hostname tracking to commits, cycles, errors + await _add_hostname_field(conn) + + logger.info("Schema migrations completed successfully") + + +async def _add_monorepo_fields(conn) -> None: + """Add monorepo tracking fields to commits table. + + Adds: + - is_monorepo_parent (BOOLEAN) + - submodule_count (INTEGER) + - submodule_metadata (JSON) + """ + # Check if columns already exist + result = await conn.execute(text("PRAGMA table_info(commits)")) + existing_columns = {row[1] for row in result} + + migrations_needed = [] + + if "is_monorepo_parent" not in existing_columns: + migrations_needed.append( + "ALTER TABLE commits ADD COLUMN is_monorepo_parent BOOLEAN NOT NULL DEFAULT 0" + ) + + if "submodule_count" not in existing_columns: + migrations_needed.append( + "ALTER TABLE commits ADD COLUMN submodule_count INTEGER NOT NULL DEFAULT 0" + ) + + if "submodule_metadata" not in existing_columns: + migrations_needed.append( + "ALTER TABLE commits ADD COLUMN submodule_metadata JSON" + ) + + if migrations_needed: + logger.info("Adding monorepo tracking fields to commits table") + for migration in migrations_needed: + await conn.execute(text(migration)) + logger.info(f"Applied: {migration}") + else: + logger.debug("Monorepo fields already exist in commits table") + + +async def _add_hostname_field(conn) -> None: + """Add hostname field to commits, cycles, and errors tables. + + Tracks which host produced each commit/cycle/error for multi-host setups. + """ + tables = ["commits", "cycles", "errors"] + + for table in tables: + result = await conn.execute(text(f"PRAGMA table_info({table})")) + existing_columns = {row[1] for row in result} + + if "hostname" not in existing_columns: + await conn.execute( + text(f"ALTER TABLE {table} ADD COLUMN hostname VARCHAR(255)") + ) + logger.info(f"Added hostname column to {table}") + + # Create index + await conn.execute( + text(f"CREATE INDEX IF NOT EXISTS idx_{table}_hostname ON {table}(hostname)") + ) + logger.info(f"Created hostname index on {table}") + else: + logger.debug(f"hostname column already exists in {table}") diff --git a/src/auto_commit_service/database/models.py b/src/auto_commit_service/database/models.py index 5d6cbd1..c98b15d 100644 --- a/src/auto_commit_service/database/models.py +++ b/src/auto_commit_service/database/models.py @@ -1,7 +1,7 @@ """SQLAlchemy models for auto-commit database.""" from datetime import datetime -from typing import Optional +from typing import Any, Optional from sqlalchemy import Index, String, Text, Integer, Float, Boolean, DateTime, JSON from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column @@ -24,17 +24,54 @@ class Commit(Base): message: Mapped[str] = mapped_column(Text) timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) daemon_id: Mapped[Optional[str]] = mapped_column(String(16), nullable=True) + hostname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True) branch: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) files_changed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) insertions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) deletions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + # Monorepo/submodule tracking fields + is_monorepo_parent: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + submodule_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + submodule_metadata: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True) + __table_args__ = ( Index("idx_commit_timestamp", "timestamp"), Index("idx_commit_repo_timestamp", "repo_name", "timestamp"), ) +class SubmoduleCommit(Base): + """Submodule commit record for monorepo aggregations. + + Tracks individual submodule commits that are aggregated into + parent monorepo commits. Enables drill-down from aggregated + commit messages like "Monorepo: 2 commit(s)" to see the actual + commits. + """ + + __tablename__ = "submodule_commits" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + parent_commit_hash: Mapped[str] = mapped_column(String(40), index=True) + submodule_name: Mapped[str] = mapped_column(String(255), index=True) + commit_hash: Mapped[str] = mapped_column(String(40)) + commit_message: Mapped[str] = mapped_column(Text) + author: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) + files_changed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + insertions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + deletions: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index("idx_submodule_commit_parent", "parent_commit_hash"), + Index("idx_submodule_commit_name", "submodule_name"), + Index("idx_submodule_commit_timestamp", "timestamp"), + Index("idx_submodule_commit_parent_name", "parent_commit_hash", "submodule_name"), + ) + + class Cycle(Base): """Commit cycle execution record.""" @@ -42,6 +79,7 @@ class Cycle(Base): id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) daemon_id: Mapped[str] = mapped_column(String(16), index=True) + hostname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True) started_at: Mapped[datetime] = mapped_column(DateTime, index=True) completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) duration_seconds: Mapped[Optional[float]] = mapped_column(Float, nullable=True) @@ -65,6 +103,7 @@ class Error(Base): id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) daemon_id: Mapped[str] = mapped_column(String(16), index=True) + hostname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True) repo_name: Mapped[str] = mapped_column(String(255), index=True) timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) error_type: Mapped[str] = mapped_column(String(100), index=True) @@ -79,6 +118,20 @@ class Error(Base): ) +class Deployment(Base): + """Deployment/upgrade tracking.""" + + __tablename__ = "deployments" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) + version: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) + commit_hash: Mapped[Optional[str]] = mapped_column(String(40), nullable=True) + notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + __table_args__ = (Index("idx_deployment_timestamp", "timestamp"),) + + class RepositoryStatus(Base): """Repository status tracking.""" @@ -100,7 +153,7 @@ class RepositoryStatus(Base): total_commits: Mapped[int] = mapped_column(Integer, default=0) total_errors: Mapped[int] = mapped_column(Integer, default=0) consecutive_errors: Mapped[int] = mapped_column(Integer, default=0) - extra_data: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True) + extra_data: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow ) diff --git a/src/auto_commit_service/database/repository.py b/src/auto_commit_service/database/repository.py index 27d426b..da68a22 100644 --- a/src/auto_commit_service/database/repository.py +++ b/src/auto_commit_service/database/repository.py @@ -7,7 +7,7 @@ from typing import List, Optional from sqlalchemy import select, func, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession -from .models import Commit, Cycle, Error, RepositoryStatus +from .models import Commit, Cycle, Deployment, Error, RepositoryStatus logger = logging.getLogger(__name__) @@ -27,6 +27,7 @@ class CommitRepository: files_changed: Optional[int] = None, insertions: Optional[int] = None, deletions: Optional[int] = None, + hostname: Optional[str] = None, ) -> Commit: """Create a new commit record.""" commit = Commit( @@ -35,6 +36,7 @@ class CommitRepository: message=message, timestamp=timestamp, daemon_id=daemon_id, + hostname=hostname, branch=branch, files_changed=files_changed, insertions=insertions, @@ -50,6 +52,7 @@ class CommitRepository: limit: int = 20, daemon_id: Optional[str] = None, repo_name: Optional[str] = None, + hostname: Optional[str] = None, ) -> List[Commit]: """Get recent commits, newest first.""" query = select(Commit).order_by(Commit.timestamp.desc()).limit(limit) @@ -58,6 +61,8 @@ class CommitRepository: query = query.where(Commit.daemon_id == daemon_id) if repo_name: query = query.where(Commit.repo_name == repo_name) + if hostname: + query = query.where(Commit.hostname == hostname) result = await session.execute(query) return list(result.scalars().all()) @@ -304,3 +309,81 @@ class RepositoryStatusRepository: result = await session.execute(query) return list(result.scalars().all()) + + +class DeploymentRepository: + """Repository for deployment/upgrade tracking.""" + + @staticmethod + async def record_deployment( + session: AsyncSession, + version: Optional[str] = None, + commit_hash: Optional[str] = None, + notes: Optional[str] = None, + ) -> Deployment: + """Record a new deployment/upgrade.""" + deployment = Deployment( + timestamp=datetime.utcnow(), + version=version, + commit_hash=commit_hash, + notes=notes, + ) + session.add(deployment) + await session.flush() + return deployment + + @staticmethod + async def get_last_deployment( + session: AsyncSession, + ) -> Optional[Deployment]: + """Get the most recent deployment.""" + query = ( + select(Deployment) + .order_by(Deployment.timestamp.desc()) + .limit(1) + ) + result = await session.execute(query) + return result.scalar_one_or_none() + + @staticmethod + async def get_deployments( + session: AsyncSession, + limit: int = 10, + ) -> List[Deployment]: + """Get recent deployments.""" + query = ( + select(Deployment) + .order_by(Deployment.timestamp.desc()) + .limit(limit) + ) + result = await session.execute(query) + return list(result.scalars().all()) + + @staticmethod + async def get_commits_since_deployment( + session: AsyncSession, + deployment_id: Optional[int] = None, + ) -> List[Commit]: + """Get all commits since the specified deployment (or last deployment if not specified).""" + # Get deployment timestamp + if deployment_id: + deployment_query = select(Deployment).where(Deployment.id == deployment_id) + deployment_result = await session.execute(deployment_query) + deployment = deployment_result.scalar_one_or_none() + else: + deployment = await DeploymentRepository.get_last_deployment(session) + + if not deployment: + # No deployment recorded, return all commits + commits_query = select(Commit).order_by(Commit.timestamp.desc()) + commits_result = await session.execute(commits_query) + return list(commits_result.scalars().all()) + + # Get commits since deployment + commits_query = ( + select(Commit) + .where(Commit.timestamp >= deployment.timestamp) + .order_by(Commit.timestamp.desc()) + ) + commits_result = await session.execute(commits_query) + return list(commits_result.scalars().all()) diff --git a/src/auto_commit_service/git/__init__.py b/src/auto_commit_service/git/__init__.py index ca0ebe6..dce88ba 100644 --- a/src/auto_commit_service/git/__init__.py +++ b/src/auto_commit_service/git/__init__.py @@ -13,6 +13,17 @@ from .operations import ( ) from .diff_parser import summarize_diff, DiffSummary from .discovery import discover_git_repos +from .submodules import ( + SubmoduleInfo, + SubmoduleCommitResult, + has_submodules, + parse_gitmodules, + get_submodule_status, + get_dirty_submodules, + get_submodule_head, + stage_submodule_reference, + get_submodule_remote_and_branch, +) __all__ = [ "Repository", @@ -27,4 +38,14 @@ __all__ = [ "summarize_diff", "DiffSummary", "discover_git_repos", + # Submodule support + "SubmoduleInfo", + "SubmoduleCommitResult", + "has_submodules", + "parse_gitmodules", + "get_submodule_status", + "get_dirty_submodules", + "get_submodule_head", + "stage_submodule_reference", + "get_submodule_remote_and_branch", ] diff --git a/src/auto_commit_service/git/operations.py b/src/auto_commit_service/git/operations.py index 00fb2e0..6f2a760 100644 --- a/src/auto_commit_service/git/operations.py +++ b/src/auto_commit_service/git/operations.py @@ -9,6 +9,13 @@ from .repository import GitStatus, CommitResult, PushResult logger = logging.getLogger(__name__) +def unquote_git_path(path: str) -> str: + """Strip git's porcelain quoting from paths with spaces/special chars.""" + if path.startswith('"') and path.endswith('"'): + return path[1:-1] + return path + + class GitError(Exception): """Base exception for git operations.""" @@ -118,7 +125,7 @@ async def git_status(repo_path: Path) -> GitStatus: if len(line) >= 3: index_status = line[0] worktree_status = line[1] - filepath = line[3:] + filepath = unquote_git_path(line[3:]) if index_status == "?" and worktree_status == "?": untracked.append(filepath) diff --git a/src/auto_commit_service/git/repository.py b/src/auto_commit_service/git/repository.py index f612a61..a701c8e 100644 --- a/src/auto_commit_service/git/repository.py +++ b/src/auto_commit_service/git/repository.py @@ -26,8 +26,12 @@ class Repository: @property def exists(self) -> bool: - """Check if the repository exists.""" - return self.path.exists() and self.git_dir.exists() + """Check if the repository exists. + + Supports both regular repos (.git is a directory) and + submodules (.git is a gitlink file). + """ + return self.path.exists() and (self.git_dir.is_dir() or self.git_dir.is_file()) def __str__(self) -> str: return f"Repository({self.name} @ {self.path})" diff --git a/src/auto_commit_service/git/submodules.py b/src/auto_commit_service/git/submodules.py new file mode 100644 index 0000000..dc10ae7 --- /dev/null +++ b/src/auto_commit_service/git/submodules.py @@ -0,0 +1,212 @@ +"""Submodule detection and status querying for monorepo support. + +Provides primitives for detecting git submodules, checking their dirty state, +and staging submodule reference updates in parent repositories. +""" + +import configparser +import logging +from dataclasses import dataclass, field +from pathlib import Path + +from .operations import _run_git_command, unquote_git_path + +logger = logging.getLogger(__name__) + + +@dataclass +class SubmoduleInfo: + """Information about a git submodule.""" + + name: str + path: str + url: str + current_hash: str | None = None + is_dirty: bool = False + has_untracked: bool = False + branch: str | None = None + + +@dataclass +class SubmoduleCommitResult: + """Result of committing within a submodule.""" + + submodule_name: str + submodule_path: str + success: bool + commit_hashes: list[str] = field(default_factory=list) + push_success: bool = False + error: str | None = None + new_hash: str | None = None + + +def has_submodules(repo_path: Path) -> bool: + """Check if a repository uses submodules. + + Quick synchronous check for .gitmodules file existence. + """ + return (repo_path / ".gitmodules").is_file() + + +def parse_gitmodules(repo_path: Path) -> list[SubmoduleInfo]: + """Parse .gitmodules file to extract submodule definitions. + + Uses configparser since .gitmodules follows INI-like format: + [submodule "name"] + path = relative/path + url = git@host:org/repo.git + branch = main + """ + gitmodules_path = repo_path / ".gitmodules" + if not gitmodules_path.exists(): + return [] + + config = configparser.ConfigParser() + config.read(str(gitmodules_path)) + + submodules: list[SubmoduleInfo] = [] + for section in config.sections(): + if section.startswith('submodule "') and section.endswith('"'): + name = section[len('submodule "'):-1] + elif section.startswith("submodule "): + name = section[len("submodule "):] + else: + continue + + path = config.get(section, "path", fallback=name) + url = config.get(section, "url", fallback="") + branch = config.get(section, "branch", fallback=None) + + submodules.append(SubmoduleInfo( + name=name, + path=path, + url=url, + branch=branch, + )) + + return submodules + + +async def get_submodule_status(repo_path: Path) -> list[SubmoduleInfo]: + """Get status of all submodules including dirty state. + + Combines `git submodule status` (parent perspective) with + `git status --porcelain` (internal check) to determine which + submodules have uncommitted changes. + """ + submodules = parse_gitmodules(repo_path) + if not submodules: + return [] + + # Get current hash and modification status from parent perspective + stdout, _, returncode = await _run_git_command( + "submodule", "status", cwd=repo_path, check=False + ) + + # Parse output lines: + # " ()" → clean + # "+ ()" → different commit than parent tracks + # "- " → not initialized + hash_map: dict[str, tuple[str, bool]] = {} + if returncode == 0 and stdout: + for line in stdout.splitlines(): + if not line.strip(): + continue + dirty_marker = line[0] if line else " " + parts = line[1:].strip().split(None, 2) + if len(parts) >= 2: + commit_hash = parts[0] + sm_path = parts[1] + hash_map[sm_path] = (commit_hash, dirty_marker == "+") + + # Check parent's porcelain status for modified submodule paths + status_stdout, _, _ = await _run_git_command( + "status", "--porcelain", cwd=repo_path, check=False + ) + + dirty_paths: set[str] = set() + if status_stdout: + for line in status_stdout.splitlines(): + if not line or len(line) < 3: + continue + filepath = unquote_git_path(line[3:].strip()) + for sm in submodules: + if filepath == sm.path or filepath.startswith(sm.path + "/"): + dirty_paths.add(sm.path) + + # Populate each submodule's status + for sm in submodules: + sm_abs_path = repo_path / sm.path + if not sm_abs_path.exists(): + continue + + if sm.path in hash_map: + sm.current_hash, _ = hash_map[sm.path] + + if sm.path in dirty_paths: + sm.is_dirty = True + else: + # Check internally for uncommitted changes + internal_stdout, _, rc = await _run_git_command( + "status", "--porcelain", cwd=sm_abs_path, check=False + ) + if rc == 0 and internal_stdout.strip(): + sm.is_dirty = True + sm.has_untracked = "??" in internal_stdout + + return submodules + + +async def get_dirty_submodules(repo_path: Path) -> list[SubmoduleInfo]: + """Get only submodules that have uncommitted changes.""" + all_subs = await get_submodule_status(repo_path) + return [sm for sm in all_subs if sm.is_dirty] + + +async def get_submodule_head(repo_path: Path, submodule_path: str) -> str | None: + """Get the current HEAD commit hash of a submodule.""" + sm_abs = repo_path / submodule_path + stdout, _, rc = await _run_git_command( + "rev-parse", "HEAD", cwd=sm_abs, check=False + ) + return stdout.strip() if rc == 0 and stdout else None + + +async def stage_submodule_reference(repo_path: Path, submodule_path: str) -> None: + """Stage a submodule reference update in the parent repo. + + Equivalent to `git add ` in the parent. + """ + await _run_git_command("add", submodule_path, cwd=repo_path) + + +async def get_submodule_remote_and_branch( + repo_path: Path, submodule_path: str +) -> tuple[str, str]: + """Get the remote and branch for a submodule. + + Checks the submodule's own git config, falling back to 'origin' and 'main'. + """ + sm_abs = repo_path / submodule_path + + # Get remote name + remote_stdout, _, rc = await _run_git_command( + "remote", cwd=sm_abs, check=False + ) + remote = ( + remote_stdout.strip().split("\n")[0] + if (rc == 0 and remote_stdout.strip()) + else "origin" + ) + + # Get current branch + branch_stdout, _, rc = await _run_git_command( + "symbolic-ref", "--short", "HEAD", cwd=sm_abs, check=False + ) + branch = ( + branch_stdout.strip() + if (rc == 0 and branch_stdout.strip()) + else "main" + ) + + return remote, branch diff --git a/src/auto_commit_service/service/__init__.py b/src/auto_commit_service/service/__init__.py deleted file mode 100644 index f50258f..0000000 --- a/src/auto_commit_service/service/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Service lifecycle management.""" - -from .manager import ( - LlamaServiceManager, - ServiceHealth, - ServiceManagerError, - ServiceStartError, -) - -__all__ = [ - "LlamaServiceManager", - "ServiceHealth", - "ServiceManagerError", - "ServiceStartError", -] diff --git a/src/auto_commit_service/service/manager.py b/src/auto_commit_service/service/manager.py deleted file mode 100644 index 3f81d74..0000000 --- a/src/auto_commit_service/service/manager.py +++ /dev/null @@ -1,327 +0,0 @@ -"""Llama service lifecycle manager.""" - -import asyncio -import fcntl -import logging -import os -import signal -import sys -import time -from enum import Enum -from pathlib import Path - -import httpx - -logger = logging.getLogger(__name__) - - -class ServiceManagerError(Exception): - """Base exception for service manager errors.""" - - -class ServiceStartError(ServiceManagerError): - """Failed to start service.""" - - -class ServiceHealth(str, Enum): - """Service health status.""" - - HEALTHY = "healthy" - CRASHED = "crashed" - UNREACHABLE = "unreachable" - - -class LlamaServiceManager: - """Manages llama service lifecycle as subprocess.""" - - def __init__( - self, - service_url: str = "http://localhost:8000", - pid_file: Path | None = None, - lock_file: Path | None = None, - startup_timeout: float = 30.0, - health_check_timeout: float = 5.0, - model_id: str | None = None, - use_model_boss: bool = False, - ): - self.service_url = service_url - self._pid_file = pid_file or Path.home() / ".config/commits/llama-service.pid" - self._lock_file = lock_file or Path.home() / ".config/commits/llama-service.lock" - self._startup_timeout = startup_timeout - self._health_check_timeout = health_check_timeout - self._model_id = model_id - self._use_model_boss = use_model_boss - self._spawned_pid: int | None = None - self._lock_fd: int | None = None - self._resolved_model_path: str | None = None - - async def ensure_service_available(self) -> bool: - """Ensure service is available, starting if necessary.""" - health = await self.check_health() - - if health == ServiceHealth.HEALTHY: - return True - if health == ServiceHealth.CRASHED: - logger.info("Detected crashed service (stale PID), cleaning up...") - self._cleanup_pid_file() - - logger.info("Llama service unreachable, attempting to start...") - - try: - if self._use_model_boss and self._model_id: - await self._resolve_model_path() - return await self.start_service() - except ServiceStartError as e: - logger.error(f"Failed to start llama-service: {e}") - return False - - async def _resolve_model_path(self) -> None: - """Resolve model ID to path via model-boss.""" - try: - from lilith_model_boss import ensure_model - - if self._model_id and not self._resolved_model_path: - logger.info(f"Resolving model via model-boss: {self._model_id}") - self._resolved_model_path = ensure_model(self._model_id) - logger.info(f"Resolved model path: {self._resolved_model_path}") - except ImportError: - raise ServiceStartError("model-boss not installed") - except Exception as e: - raise ServiceStartError(f"Failed to resolve model path: {e}") - - async def start_service(self) -> bool: - """Start llama service subprocess.""" - pid = self._read_pid_file() - if pid and self._is_process_alive(pid): - logger.info(f"Service already running (PID: {pid})") - return True - - if not self._acquire_lock(): - await asyncio.sleep(2) - pid = self._read_pid_file() - if pid and self._is_process_alive(pid): - return True - return False - - try: - pid = self._read_pid_file() - if pid and self._is_process_alive(pid): - return True - - logger.info("Starting llama service subprocess...") - process = await self._spawn_service() - self._spawned_pid = process.pid - self._write_pid_file(process.pid) - logger.info(f"Llama service started (PID: {process.pid})") - - if await self._wait_for_healthy(self._startup_timeout): - logger.info("✓ Llama service is healthy") - return True - else: - logger.error(f"✗ Service failed to start within {self._startup_timeout}s") - return False - except Exception as e: - logger.exception(f"Failed to start llama service: {e}") - return False - finally: - self._release_lock() - - async def check_health(self) -> ServiceHealth: - """Check service health.""" - pid = self._read_pid_file() - if pid and not self._is_process_alive(pid): - return ServiceHealth.CRASHED - - try: - async with httpx.AsyncClient(timeout=self._health_check_timeout) as client: - response = await client.get(f"{self.service_url}/health") - if response.status_code == 200: - data = response.json() - return ServiceHealth.HEALTHY if data.get("status") == "ok" else ServiceHealth.UNREACHABLE - return ServiceHealth.UNREACHABLE - except (httpx.ConnectError, httpx.TimeoutException): - return ServiceHealth.UNREACHABLE - except Exception: - return ServiceHealth.UNREACHABLE - - async def stop_service(self) -> None: - """Gracefully stop service if we own it.""" - if self._spawned_pid is None or not self._is_process_alive(self._spawned_pid): - return - - logger.info(f"Stopping llama service (PID: {self._spawned_pid})...") - try: - os.kill(self._spawned_pid, signal.SIGTERM) - for _ in range(10): - if not self._is_process_alive(self._spawned_pid): - self._cleanup_pid_file() - return - await asyncio.sleep(0.5) - os.kill(self._spawned_pid, signal.SIGKILL) - self._cleanup_pid_file() - except ProcessLookupError: - self._cleanup_pid_file() - except Exception as e: - logger.exception(f"Error stopping service: {e}") - - def _acquire_lock(self) -> bool: - self._lock_file.parent.mkdir(parents=True, exist_ok=True) - try: - self._lock_fd = os.open(self._lock_file, os.O_CREAT | os.O_WRONLY) - fcntl.flock(self._lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - return True - except BlockingIOError: - return False - - def _release_lock(self) -> None: - if self._lock_fd is not None: - try: - fcntl.flock(self._lock_fd, fcntl.LOCK_UN) - os.close(self._lock_fd) - self._lock_fd = None - except Exception: - pass - - def _read_pid_file(self) -> int | None: - if not self._pid_file.exists(): - return None - try: - content = self._pid_file.read_text().strip() - return int(content) if content else None - except Exception: - return None - - def _write_pid_file(self, pid: int) -> None: - self._pid_file.parent.mkdir(parents=True, exist_ok=True) - self._pid_file.write_text(str(pid)) - - def _cleanup_pid_file(self) -> None: - try: - if self._pid_file.exists(): - self._pid_file.unlink() - except Exception: - pass - - def _is_process_alive(self, pid: int) -> bool: - try: - os.kill(pid, 0) - return True - except OSError: - return False - - def _find_default_model(self) -> str | None: - """Find a suitable model in standard cache locations.""" - cache_dir = Path.home() / ".cache" / "models" - if not cache_dir.exists(): - return None - - preferred = [ - "qwen2.5-1.5b-instruct-q4_k_m.gguf", - "Ministral-3-3B-Instruct-2512-Q8_0.gguf", - ] - - for name in preferred: - path = cache_dir / name - if path.exists(): - return str(path) - - for gguf in cache_dir.glob("*.gguf"): - if gguf.stat().st_size < 5 * 1024 * 1024 * 1024: - return str(gguf) - - return None - - def _find_python_with_module(self, module_name: str) -> str: - """Find a Python executable that can import the given module. - - Tries multiple Python paths to find one with access to user-installed packages. - This handles the case where sys.executable is /usr/bin/python (system Python) - but packages are installed in user site-packages. - """ - import shutil - import subprocess - - candidates = [ - str(Path.home() / ".local" / "bin" / "python"), - str(Path.home() / ".local" / "bin" / "python3"), - shutil.which("python3"), - shutil.which("python"), - sys.executable, - ] - - for python_path in candidates: - if not python_path or not Path(python_path).exists(): - continue - - try: - result = subprocess.run( - [python_path, "-c", f"import {module_name}"], - capture_output=True, - timeout=5, - ) - if result.returncode == 0: - logger.info(f"Found Python with {module_name}: {python_path}") - return python_path - except (subprocess.TimeoutExpired, FileNotFoundError, PermissionError): - continue - - logger.warning(f"Could not find Python with {module_name}, using {sys.executable}") - return sys.executable - - async def _spawn_service(self) -> asyncio.subprocess.Process: - """Spawn service subprocess.""" - python_path = self._find_python_with_module("llama_http") - cmd = [python_path, "-m", "llama_http"] - env = os.environ.copy() - - model_path = self._resolved_model_path or env.get("LLAMA_SERVICE_MODEL_PATH") - - if not model_path: - model_path = self._find_default_model() - - if not model_path: - raise ServiceStartError( - "No model found. Either:\n" - " 1. Set LLAMA_SERVICE_MODEL_PATH\n" - " 2. Place a GGUF model in ~/.cache/models/" - ) - - # Configure environment for llama-http service - # Note: llama-http uses LLAMA_HTTP_ prefix for env vars - env["LLAMA_HTTP_MODEL_PATH"] = model_path - # Extract port from service_url and configure llama-http to use it - from urllib.parse import urlparse - port = urlparse(self.service_url).port or 8000 - env["LLAMA_HTTP_PORT"] = str(port) - logger.info(f"Using model: {model_path}, port: {port}") - - log_file = self._pid_file.parent / "llama-service.log" - log_file.parent.mkdir(parents=True, exist_ok=True) - - with open(log_file, "a") as log: - log.write(f"\n=== Service started at {time.ctime()} ===\n") - log.write(f"Model: {model_path}\n") - - process = await asyncio.create_subprocess_exec( - *cmd, - env=env, - stdout=log, - stderr=asyncio.subprocess.STDOUT, - start_new_session=True, - ) - return process - - async def _wait_for_healthy(self, timeout: float) -> bool: - start = time.time() - while time.time() - start < timeout: - try: - async with httpx.AsyncClient(timeout=2.0) as client: - response = await client.get(f"{self.service_url}/health") - if response.status_code == 200: - data = response.json() - if data.get("status") == "ok": - return True - except Exception: - pass - await asyncio.sleep(1) - return False diff --git a/src/auto_commit_service/services/__init__.py b/src/auto_commit_service/services/__init__.py new file mode 100644 index 0000000..f0f08bd --- /dev/null +++ b/src/auto_commit_service/services/__init__.py @@ -0,0 +1,5 @@ +"""Services for auto-commit system.""" + +from .submodule_commit_service import SubmoduleCommitService + +__all__ = ["SubmoduleCommitService"] diff --git a/src/auto_commit_service/services/submodule_commit_service.py b/src/auto_commit_service/services/submodule_commit_service.py new file mode 100644 index 0000000..73c0926 --- /dev/null +++ b/src/auto_commit_service/services/submodule_commit_service.py @@ -0,0 +1,226 @@ +"""Service for persisting and retrieving submodule commit details.""" + +import logging +import re +import subprocess +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from ..activity_log import ActionType, ActivityLogger +from ..database.models import SubmoduleCommit +from ..git.submodules import SubmoduleCommitResult + +logger = logging.getLogger(__name__) + + +@dataclass +class CommitStats: + """Statistics about a commit.""" + + files: int + insertions: int + deletions: int + + +@dataclass +class CommitDetails: + """Detailed information about a single commit.""" + + hash: str + author: str + email: str + timestamp: datetime + message: str + stats: CommitStats + + +class SubmoduleCommitService: + """Service for persisting and retrieving submodule commit details. + + Captures individual commit information from submodule processing so it can + be displayed when viewing monorepo aggregated commits. + """ + + def __init__(self, activity_log: ActivityLogger) -> None: + """Initialize submodule commit service. + + Args: + activity_log: Activity logger for audit trail + """ + self.activity_log = activity_log + + async def persist_submodule_commits( + self, + session: AsyncSession, + parent_commit_hash: str, + submodule_results: list[SubmoduleCommitResult], + ) -> int: + """Persist submodule commits to database and activity log. + + Args: + session: Database session + parent_commit_hash: Hash of parent monorepo commit + submodule_results: List of submodule commit results + + Returns: + Number of submodule commits persisted + """ + persisted_count = 0 + + for result in submodule_results: + if not result.success or not result.commit_hashes: + continue + + for commit_hash in result.commit_hashes: + try: + # Get commit details from git + details = await self._get_commit_details( + submodule_path=result.submodule_path, + commit_hash=commit_hash, + ) + + # Create database record + submodule_commit = SubmoduleCommit( + parent_commit_hash=parent_commit_hash, + submodule_name=result.submodule_name, + commit_hash=details.hash, + commit_message=details.message, + author=details.author, + timestamp=details.timestamp, + files_changed=details.stats.files, + insertions=details.stats.insertions, + deletions=details.stats.deletions, + ) + + session.add(submodule_commit) + persisted_count += 1 + + # Activity log for audit trail + self.activity_log.log( + ActionType.SUBMODULE_COMMIT, + { + "parent_commit": parent_commit_hash, + "submodule": result.submodule_name, + "hash": details.hash, + "message": details.message, + "author": details.author, + "stats": { + "files": details.stats.files, + "insertions": details.stats.insertions, + "deletions": details.stats.deletions, + }, + }, + ) + + except Exception as e: + logger.error( + f"Failed to persist submodule commit {commit_hash} " + f"for {result.submodule_name}: {e}" + ) + + await session.flush() + return persisted_count + + async def get_submodule_commits( + self, session: AsyncSession, parent_commit_hash: str + ) -> list[SubmoduleCommit]: + """Retrieve submodule commits for a parent commit. + + Args: + session: Database session + parent_commit_hash: Hash of parent monorepo commit + + Returns: + List of submodule commits ordered by submodule name and timestamp + """ + result = await session.execute( + select(SubmoduleCommit) + .where(SubmoduleCommit.parent_commit_hash == parent_commit_hash) + .order_by(SubmoduleCommit.submodule_name, SubmoduleCommit.timestamp) + ) + return list(result.scalars().all()) + + async def _get_commit_details( + self, submodule_path: str, commit_hash: str + ) -> CommitDetails: + """Extract commit details using git show. + + Args: + submodule_path: Path to submodule + commit_hash: Commit hash to inspect + + Returns: + CommitDetails with all extracted information + """ + # Resolve absolute path + path = Path(submodule_path).resolve() + + # Get commit metadata and stats + result = subprocess.run( + [ + "git", + "-C", + str(path), + "show", + commit_hash, + "--format=%H|%an|%ae|%at|%s", + "--shortstat", + ], + capture_output=True, + text=True, + check=True, + ) + + return self._parse_commit_details(result.stdout) + + def _parse_commit_details(self, git_output: str) -> CommitDetails: + """Parse git show output into structured CommitDetails. + + Args: + git_output: Output from git show command + + Returns: + Parsed CommitDetails + """ + lines = git_output.strip().split("\n") + + # First line: hash|author|email|timestamp|subject + metadata = lines[0].split("|", 4) + commit_hash = metadata[0] + author = metadata[1] + email = metadata[2] + timestamp = datetime.fromtimestamp(int(metadata[3])) + subject = metadata[4] if len(metadata) > 4 else "" + + # Find shortstat line (e.g., "3 files changed, 45 insertions(+), 12 deletions(-)") + stats = CommitStats(files=0, insertions=0, deletions=0) + + for line in lines[1:]: + if "changed" in line or "insertion" in line or "deletion" in line: + # Parse shortstat + files_match = re.search(r"(\d+) files? changed", line) + if files_match: + stats.files = int(files_match.group(1)) + + insertions_match = re.search(r"(\d+) insertion", line) + if insertions_match: + stats.insertions = int(insertions_match.group(1)) + + deletions_match = re.search(r"(\d+) deletion", line) + if deletions_match: + stats.deletions = int(deletions_match.group(1)) + + break + + return CommitDetails( + hash=commit_hash, + author=author, + email=email, + timestamp=timestamp, + message=subject, + stats=stats, + )