"""Async git command wrappers using subprocess.""" import asyncio import logging from pathlib import Path from .repository import GitStatus, CommitResult, PushResult logger = logging.getLogger(__name__) class GitError(Exception): """Base exception for git operations.""" def __init__(self, message: str, stderr: str = "", returncode: int = 1): super().__init__(message) self.stderr = stderr self.returncode = returncode class MergeConflictError(GitError): """Raised when there's a merge conflict.""" pass class PushRejectedError(GitError): """Raised when push is rejected by remote.""" pass async def _run_git_command( *args: str, cwd: Path, check: bool = True, stdin: bytes | None = None, ) -> tuple[str, str, int]: """Run a git command asynchronously. Uses asyncio subprocess with argument list (safe, no shell injection). This is equivalent to Node.js execFile - arguments are passed directly to the process without shell interpretation. Args: *args: Git command arguments cwd: Working directory for the command check: Raise GitError on non-zero exit code stdin: Optional stdin data to send to the process """ # asyncio.create_subprocess_exec is safe - no shell, args passed directly create_process = asyncio.create_subprocess_exec proc = await create_process( "git", *args, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE if stdin is not None else None, ) stdout, stderr = await proc.communicate(input=stdin) stdout_str = stdout.decode().strip() stderr_str = stderr.decode().strip() returncode = proc.returncode or 0 if check and returncode != 0: raise GitError( f"git {' '.join(args)} failed: {stderr_str}", stderr=stderr_str, returncode=returncode, ) return stdout_str, stderr_str, returncode async def git_status(repo_path: Path) -> GitStatus: """Get the git status of a repository.""" # Get porcelain status for parsing stdout, _, _ = await _run_git_command("status", "--porcelain", "-b", cwd=repo_path) lines = stdout.split("\n") if stdout else [] staged = [] modified = [] untracked = [] deleted = [] branch = "main" ahead = 0 behind = 0 for line in lines: if not line: continue if line.startswith("##"): # Parse branch info: ## main...origin/main [ahead 1, behind 2] branch_info = line[3:] if "..." in branch_info: branch = branch_info.split("...")[0] else: branch = branch_info.split()[0] if branch_info else "main" if "[ahead " in line: try: ahead_part = line.split("[ahead ")[1].split("]")[0].split(",")[0] ahead = int(ahead_part) except (IndexError, ValueError): pass if "behind " in line: try: behind_part = line.split("behind ")[1].split("]")[0] behind = int(behind_part) except (IndexError, ValueError): pass continue # Parse file status if len(line) >= 3: index_status = line[0] worktree_status = line[1] filepath = line[3:] if index_status == "?" and worktree_status == "?": untracked.append(filepath) elif index_status == "D" or worktree_status == "D": deleted.append(filepath) elif index_status in "MARC": staged.append(filepath) elif worktree_status == "M": modified.append(filepath) has_changes = bool(staged or modified or untracked or deleted) return GitStatus( has_changes=has_changes, staged=staged, modified=modified, untracked=untracked, deleted=deleted, branch=branch, ahead=ahead, behind=behind, ) async def git_diff(repo_path: Path, staged: bool = False) -> str: """Get the diff of changes. Args: repo_path: Path to the repository staged: If True, show only staged changes (--cached) """ args = ["diff"] if staged: args.append("--cached") else: args.append("HEAD") stdout, _, _ = await _run_git_command(*args, cwd=repo_path, check=False) return stdout async def git_add_all(repo_path: Path) -> None: """Stage all changes including untracked files.""" await _run_git_command("add", "-A", cwd=repo_path) async def git_check_ignored(repo_path: Path, files: list[str]) -> list[str]: """Check which files are ignored by .gitignore. Args: repo_path: Path to the repository files: List of file paths to check Returns: List of files that are NOT ignored (safe to stage) """ if not files: return [] try: # Use git check-ignore to filter out ignored files # --stdin allows us to check multiple files efficiently # -v flag would show matches, but we want non-matches # Exit code 0 = files are ignored, 1 = files are NOT ignored file_list = "\n".join(files) stdout, stderr, returncode = await _run_git_command( "check-ignore", "--stdin", cwd=repo_path, check=False, stdin=file_list.encode() ) # Files in stdout are ignored - we want to exclude these ignored_files = set(stdout.strip().split("\n")) if stdout.strip() else set() # Return only non-ignored files return [f for f in files if f not in ignored_files] except GitError: # If check-ignore fails, return all files (safer than blocking commits) logger.warning(f"git check-ignore failed in {repo_path}, proceeding without filter") return files async def git_add_specific(repo_path: Path, files: list[str]) -> None: """Stage specific files only, filtering out gitignored files. Args: repo_path: Path to the repository files: List of file paths to stage """ if not files: return # Filter out gitignored files before staging stageable_files = await git_check_ignored(repo_path, files) if not stageable_files: logger.warning(f"All {len(files)} files are gitignored, nothing to stage") return if len(stageable_files) < len(files): ignored_count = len(files) - len(stageable_files) logger.debug(f"Filtered out {ignored_count} gitignored files, staging {len(stageable_files)}") # Batch git add if too many files to avoid ARG_MAX limit # Most systems have ARG_MAX ~2MB, conservatively batch at 1000 files BATCH_SIZE = 1000 if len(stageable_files) > BATCH_SIZE: logger.info(f"Batching git add for {len(stageable_files)} files ({BATCH_SIZE} per batch)") for i in range(0, len(stageable_files), BATCH_SIZE): batch = stageable_files[i : i + BATCH_SIZE] logger.debug(f"Adding batch {i // BATCH_SIZE + 1}/{(len(stageable_files) + BATCH_SIZE - 1) // BATCH_SIZE}") await _run_git_command("add", "--", *batch, cwd=repo_path) else: # Add files in a single command for efficiency await _run_git_command("add", "--", *stageable_files, cwd=repo_path) async def git_commit(repo_path: Path, message: str) -> CommitResult: """Create a commit with the given message.""" try: stdout, stderr, returncode = await _run_git_command( "commit", "-m", message, cwd=repo_path, check=False ) if returncode != 0: # Check for "nothing to commit" if "nothing to commit" in stdout or "nothing to commit" in stderr: return CommitResult( success=False, error="Nothing to commit", ) return CommitResult( success=False, error=stderr or stdout, ) # Get commit hash hash_stdout, _, _ = await _run_git_command( "rev-parse", "--short", "HEAD", cwd=repo_path ) return CommitResult( success=True, commit_hash=hash_stdout, message=message, ) except GitError as e: return CommitResult( success=False, error=str(e), ) async def git_push( repo_path: Path, remote: str = "origin", branch: str = "main", ) -> PushResult: """Push commits to remote.""" try: _, stderr, returncode = await _run_git_command( "push", remote, branch, cwd=repo_path, check=False ) if returncode != 0: rejected = "rejected" in stderr.lower() or "non-fast-forward" in stderr.lower() if rejected: raise PushRejectedError( f"Push rejected: {stderr}", stderr=stderr, returncode=returncode, ) return PushResult( success=False, remote=remote, branch=branch, error=stderr, rejected=rejected, ) return PushResult( success=True, remote=remote, branch=branch, ) except PushRejectedError: raise except GitError as e: return PushResult( success=False, remote=remote, branch=branch, error=str(e), ) async def git_pull_rebase( repo_path: Path, remote: str = "origin", branch: str = "main", ) -> bool: """Pull with rebase to resolve diverged history. Returns True if successful, raises MergeConflictError on conflicts. """ try: _, stderr, returncode = await _run_git_command( "pull", "--rebase", remote, branch, cwd=repo_path, check=False ) if returncode != 0: if "conflict" in stderr.lower() or "CONFLICT" in stderr: # Abort the rebase await _run_git_command("rebase", "--abort", cwd=repo_path, check=False) raise MergeConflictError( f"Merge conflict during rebase: {stderr}", stderr=stderr, returncode=returncode, ) raise GitError( f"Pull rebase failed: {stderr}", stderr=stderr, returncode=returncode, ) return True except (MergeConflictError, GitError): raise except Exception as e: raise GitError(f"Unexpected error during pull rebase: {e}") async def git_log_recent( repo_path: Path, count: int = 5, ) -> list[str]: """Get recent commit messages for style reference.""" stdout, _, _ = await _run_git_command( "log", f"-{count}", "--format=%s", cwd=repo_path, check=False ) return [line for line in stdout.split("\n") if line]