auto-commit-service/src/auto_commit_service/git/operations.py
2026-04-19 00:25:06 -07:00

579 lines
19 KiB
Python

"""Async git command wrappers using subprocess."""
import asyncio
import logging
import os
from pathlib import Path
from .repository import GitStatus, CommitResult, PushResult
logger = logging.getLogger(__name__)
def unquote_git_path(path: str) -> str:
"""Strip git's porcelain quoting from paths with spaces/special chars."""
if path.startswith('"') and path.endswith('"'):
return path[1:-1]
return path
class GitError(Exception):
"""Base exception for git operations."""
def __init__(self, message: str, stderr: str = "", returncode: int = 1):
super().__init__(message)
self.stderr = stderr
self.returncode = returncode
class MergeConflictError(GitError):
"""Raised when there's a merge conflict."""
pass
class PushRejectedError(GitError):
"""Raised when push is rejected by remote."""
pass
async def _run_git_command(
*args: str,
cwd: Path,
check: bool = True,
stdin: bytes | None = None,
env: dict[str, str] | None = None,
) -> tuple[str, str, int]:
"""Run a git command asynchronously.
Uses asyncio subprocess with argument list (safe, no shell injection).
This is equivalent to Node.js execFile - arguments are passed directly
to the process without shell interpretation.
Args:
*args: Git command arguments
cwd: Working directory for the command
check: Raise GitError on non-zero exit code
stdin: Optional stdin data to send to the process
env: Optional env overrides merged onto os.environ
"""
merged_env = {**os.environ, **(env or {})}
# asyncio.create_subprocess_exec is safe - no shell, args passed directly
create_process = asyncio.create_subprocess_exec
proc = await create_process(
"git",
*args,
cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if stdin is not None else None,
env=merged_env,
)
stdout, stderr = await proc.communicate(input=stdin)
stdout_str = stdout.decode().strip()
stderr_str = stderr.decode().strip()
returncode = proc.returncode or 0
if check and returncode != 0:
raise GitError(
f"git {' '.join(args)} failed: {stderr_str}",
stderr=stderr_str,
returncode=returncode,
)
return stdout_str, stderr_str, returncode
async def git_status(repo_path: Path) -> GitStatus:
"""Get the git status of a repository."""
# Get porcelain status for parsing
stdout, _, _ = await _run_git_command("status", "--porcelain", "-b", cwd=repo_path)
lines = stdout.split("\n") if stdout else []
staged = []
modified = []
untracked = []
deleted = []
branch = "main"
ahead = 0
behind = 0
for line in lines:
if not line:
continue
if line.startswith("##"):
# Parse branch info: ## main...origin/main [ahead 1, behind 2]
branch_info = line[3:]
if "..." in branch_info:
branch = branch_info.split("...")[0]
else:
branch = branch_info.split()[0] if branch_info else "main"
if "[ahead " in line:
try:
ahead_part = line.split("[ahead ")[1].split("]")[0].split(",")[0]
ahead = int(ahead_part)
except (IndexError, ValueError):
pass
if "behind " in line:
try:
behind_part = line.split("behind ")[1].split("]")[0]
behind = int(behind_part)
except (IndexError, ValueError):
pass
continue
# Parse file status
if len(line) >= 3:
index_status = line[0]
worktree_status = line[1]
filepath = unquote_git_path(line[3:])
if index_status == "?" and worktree_status == "?":
untracked.append(filepath)
elif index_status == "D" or worktree_status == "D":
deleted.append(filepath)
elif index_status in "MARC":
staged.append(filepath)
elif worktree_status == "M":
modified.append(filepath)
has_changes = bool(staged or modified or untracked or deleted)
return GitStatus(
has_changes=has_changes,
staged=staged,
modified=modified,
untracked=untracked,
deleted=deleted,
branch=branch,
ahead=ahead,
behind=behind,
)
async def git_diff(repo_path: Path, staged: bool = False) -> str:
"""Get the diff of changes.
Args:
repo_path: Path to the repository
staged: If True, show only staged changes (--cached)
"""
args = ["diff"]
if staged:
args.append("--cached")
else:
args.append("HEAD")
stdout, _, _ = await _run_git_command(*args, cwd=repo_path, check=False)
return stdout
async def git_add_all(repo_path: Path) -> None:
"""Stage all changes including untracked files."""
await _run_git_command("add", "-A", cwd=repo_path)
async def git_check_ignored(repo_path: Path, files: list[str]) -> list[str]:
"""Check which files git will refuse to stage due to .gitignore.
Uses `git add --dry-run` to detect files that will fail to stage,
including tracked files now covered by .gitignore patterns.
Args:
repo_path: Path to the repository
files: List of file paths to check
Returns:
List of files that are safe to stage (not ignored)
"""
if not files:
return []
try:
# CRITICAL: The only reliable way to detect stageable files is to try staging them
# with --dry-run. Neither check-ignore nor ls-files -i work for tracked-but-ignored files.
# Batch to avoid ARG_MAX
BATCH_SIZE = 1000
safe_files = []
ignored_files = []
batches = [files[i : i + BATCH_SIZE] for i in range(0, len(files), BATCH_SIZE)]
for batch in batches:
# Try dry-run add - exit code 0 = success, 128 = some files ignored
stdout, stderr, returncode = await _run_git_command(
"add", "--dry-run", "--",
*batch,
cwd=repo_path,
check=False,
)
if returncode == 0:
# All files in batch are safe
safe_files.extend(batch)
elif (returncode == 1 or returncode == 128) and "ignored" in stderr.lower():
# Some files are ignored - parse error message to find which ones
# Git error format: "The following paths are ignored by one of your .gitignore files:\npath1\npath2"
# Exit code 1 = some files ignored, 128 = all files ignored
batch_ignored = []
lines = stderr.split("\n")
for i, line in enumerate(lines):
if "following paths are ignored" in line.lower():
# Next lines until "hint:" are the ignored paths
for ignored_line in lines[i + 1:]:
if ignored_line.startswith("hint:") or not ignored_line.strip():
break
ignored_path = ignored_line.strip()
if ignored_path:
batch_ignored.append(ignored_path)
ignored_files.append(ignored_path)
# Files not in ignored list are safe
safe_files.extend([f for f in batch if f not in batch_ignored and not any(f.startswith(ig + "/") for ig in batch_ignored)])
else:
# Unknown error - be conservative and include all files
logger.warning(f"git add --dry-run returned {returncode}: {stderr}, including all files from batch")
safe_files.extend(batch)
if ignored_files:
logger.info(f"Filtered {len(ignored_files)} gitignored files (detected via dry-run)")
return safe_files
except GitError as e:
# If check fails, return all files (safer than blocking commits)
logger.warning(f"git ignore check failed in {repo_path}: {e}, proceeding without filter")
return files
async def git_add_specific(repo_path: Path, files: list[str]) -> None:
"""Stage specific files only, filtering out gitignored files.
Args:
repo_path: Path to the repository
files: List of file paths to stage
"""
if not files:
return
# Filter out gitignored files before staging
stageable_files = await git_check_ignored(repo_path, files)
if not stageable_files:
logger.warning(f"All {len(files)} files are gitignored, nothing to stage")
return
if len(stageable_files) < len(files):
ignored_count = len(files) - len(stageable_files)
logger.debug(f"Filtered out {ignored_count} gitignored files, staging {len(stageable_files)}")
# Batch git add if too many files to avoid ARG_MAX limit
# Most systems have ARG_MAX ~2MB, conservatively batch at 1000 files
BATCH_SIZE = 1000
if len(stageable_files) > BATCH_SIZE:
logger.info(f"Batching git add for {len(stageable_files)} files ({BATCH_SIZE} per batch)")
for i in range(0, len(stageable_files), BATCH_SIZE):
batch = stageable_files[i : i + BATCH_SIZE]
logger.debug(f"Adding batch {i // BATCH_SIZE + 1}/{(len(stageable_files) + BATCH_SIZE - 1) // BATCH_SIZE}")
await _run_git_command("add", "--", *batch, cwd=repo_path)
else:
# Add files in a single command for efficiency
await _run_git_command("add", "--", *stageable_files, cwd=repo_path)
async def git_commit(repo_path: Path, message: str) -> CommitResult:
"""Create a commit with the given message."""
try:
stdout, stderr, returncode = await _run_git_command(
"commit", "-m", message, cwd=repo_path, check=False
)
if returncode != 0:
# Check for "nothing to commit"
if "nothing to commit" in stdout or "nothing to commit" in stderr:
return CommitResult(
success=False,
error="Nothing to commit",
)
return CommitResult(
success=False,
error=stderr or stdout,
)
# Get commit hash
hash_stdout, _, _ = await _run_git_command(
"rev-parse", "--short", "HEAD", cwd=repo_path
)
return CommitResult(
success=True,
commit_hash=hash_stdout,
message=message,
)
except GitError as e:
return CommitResult(
success=False,
error=str(e),
)
async def git_push(
repo_path: Path,
remote: str = "origin",
branch: str = "main",
) -> PushResult:
"""Push commits to remote.
Uses --no-verify to bypass pre-push hooks (e.g. adversarial-view generators
that require GPU services). Uses -u to set upstream tracking so that
subsequent `git status -b` reports ahead/behind correctly.
"""
try:
_, stderr, returncode = await _run_git_command(
"push", "--no-verify", "-u", remote, branch, cwd=repo_path, check=False
)
if returncode != 0:
rejected = "rejected" in stderr.lower() or "non-fast-forward" in stderr.lower()
if rejected:
raise PushRejectedError(
f"Push rejected: {stderr}",
stderr=stderr,
returncode=returncode,
)
return PushResult(
success=False,
remote=remote,
branch=branch,
error=stderr,
rejected=rejected,
)
return PushResult(
success=True,
remote=remote,
branch=branch,
)
except PushRejectedError:
raise
except GitError as e:
return PushResult(
success=False,
remote=remote,
branch=branch,
error=str(e),
)
async def _abort_rebase_verified(repo_path: Path) -> bool:
"""Run rebase --abort and verify the rebase state is cleared.
Escalates to --quit + reset --hard ORIG_HEAD if abort doesn't clear state.
Returns True if repo is clean after recovery, False if unrecoverable.
"""
rebase_merge = repo_path / ".git" / "rebase-merge"
rebase_apply = repo_path / ".git" / "rebase-apply"
await _run_git_command("rebase", "--abort", cwd=repo_path, check=False)
if not rebase_merge.exists() and not rebase_apply.exists():
return True
logger.warning(f"rebase --abort didn't clear state in {repo_path}, escalating to --quit")
await _run_git_command("rebase", "--quit", cwd=repo_path, check=False)
if not rebase_merge.exists() and not rebase_apply.exists():
return True
logger.error(f"rebase --quit also failed in {repo_path}, resetting to ORIG_HEAD")
await _run_git_command("reset", "--hard", "ORIG_HEAD", cwd=repo_path, check=False)
return not rebase_merge.exists() and not rebase_apply.exists()
async def pre_cycle_recover(repo_path: Path) -> dict:
"""Detect and recover from orphan rebase/merge state left by a prior cycle.
Returns dict: {was_stuck: bool, recovered: bool, state: str}
"""
rebase_merge = repo_path / ".git" / "rebase-merge"
rebase_apply = repo_path / ".git" / "rebase-apply"
merge_head = repo_path / ".git" / "MERGE_HEAD"
if rebase_merge.exists() or rebase_apply.exists():
logger.warning(f"Orphan rebase detected in {repo_path}, attempting recovery")
recovered = await _abort_rebase_verified(repo_path)
return {"was_stuck": True, "recovered": recovered, "state": "rebase"}
if merge_head.exists():
logger.warning(f"Orphan merge detected in {repo_path}, aborting")
await _run_git_command("merge", "--abort", cwd=repo_path, check=False)
return {"was_stuck": True, "recovered": not merge_head.exists(), "state": "merge"}
return {"was_stuck": False, "recovered": False, "state": "clean"}
async def git_pull_rebase(
repo_path: Path,
remote: str = "origin",
branch: str = "main",
) -> bool:
"""Pull with rebase to resolve diverged history.
Returns True if successful, raises MergeConflictError on conflicts.
"""
try:
_, stderr, returncode = await _run_git_command(
"pull", "--rebase", remote, branch, cwd=repo_path, check=False
)
if returncode != 0:
if "conflict" in stderr.lower() or "CONFLICT" in stderr:
from .conflict_resolution import try_auto_resolve_lockfiles
resolution = await try_auto_resolve_lockfiles(repo_path)
if resolution["resolved"]:
_, cont_stderr, cont_rc = await _run_git_command(
"rebase", "--continue",
cwd=repo_path, check=False,
env={"GIT_EDITOR": "true"},
)
if cont_rc == 0:
logger.info(
f"Auto-resolved lockfile conflicts in {repo_path}: "
f"{resolution['files_resolved']}"
)
return True
logger.warning(
f"rebase --continue failed after auto-resolution: {cont_stderr}"
)
if not await _abort_rebase_verified(repo_path):
raise GitError(
f"Rebase conflict and abort failed — repo stuck in {repo_path}",
stderr=stderr,
returncode=returncode,
)
raise MergeConflictError(
f"Merge conflict during rebase: {stderr}",
stderr=stderr,
returncode=returncode,
)
raise GitError(
f"Pull rebase failed: {stderr}",
stderr=stderr,
returncode=returncode,
)
return True
except (MergeConflictError, GitError):
raise
except Exception as e:
raise GitError(f"Unexpected error during pull rebase: {e}")
async def git_fetch(
repo_path: Path,
remote: str = "origin",
) -> bool:
"""Fetch from remote to update refs without touching the working tree.
Returns True on success, False on failure (non-fatal).
"""
try:
_, stderr, returncode = await _run_git_command(
"fetch", remote, cwd=repo_path, check=False
)
if returncode != 0:
logger.warning(f"git fetch failed in {repo_path}: {stderr}")
return False
return True
except Exception as e:
logger.warning(f"git fetch error in {repo_path}: {e}")
return False
async def pre_cycle_sync(
repo_path: Path,
remote: str = "origin",
branch: str = "main",
) -> dict:
"""Sync repo with remote before a commit cycle.
Sequence: recover orphan state -> fetch -> check if behind -> skip if dirty -> pull --rebase.
Never stashes — dirty trees are left alone and committed normally.
All failures are non-fatal (logged and returned in result dict).
Returns dict with keys: fetched, pulled, behind_count, skipped_dirty, recovered, error
"""
result = {
"fetched": False,
"pulled": False,
"behind_count": 0,
"skipped_dirty": False,
"recovered": False,
"error": None,
}
recovery = await pre_cycle_recover(repo_path)
if recovery["was_stuck"]:
result["recovered"] = recovery["recovered"]
if not recovery["recovered"]:
result["error"] = f"orphan_{recovery['state']}_unrecovered"
return result
# Step 1: Fetch
if not await git_fetch(repo_path, remote):
result["error"] = "fetch_failed"
return result
result["fetched"] = True
# Step 2: Check if behind (re-read status after fetch updates refs)
try:
status = await git_status(repo_path)
except Exception as e:
result["error"] = f"status_failed: {e}"
return result
result["behind_count"] = status.behind
if status.behind == 0:
return result # Nothing to pull
# Step 3: Skip pull if working tree is dirty — multiple agents may be
# working concurrently, so stashing is unsafe. The dirty changes will
# get committed this cycle, pushed, and the next cycle will pull clean.
if status.has_changes:
result["skipped_dirty"] = True
logger.debug(
f"Skipping pull for {repo_path}: dirty working tree "
f"({status.behind} commits behind, will sync after commit)"
)
return result
# Step 4: Pull --rebase (clean tree only)
try:
await git_pull_rebase(repo_path, remote, branch)
result["pulled"] = True
except MergeConflictError as e:
logger.warning(f"Conflict during pre-cycle pull in {repo_path}: {e}")
result["error"] = "merge_conflict"
except GitError as e:
logger.warning(f"Pull failed in {repo_path}: {e}")
result["error"] = f"pull_failed: {e}"
return result
async def git_log_recent(
repo_path: Path,
count: int = 5,
) -> list[str]:
"""Get recent commit messages for style reference."""
stdout, _, _ = await _run_git_command(
"log", f"-{count}", "--format=%s", cwd=repo_path, check=False
)
return [line for line in stdout.split("\n") if line]