205 lines
7.7 KiB
Python
205 lines
7.7 KiB
Python
"""Tests for rebase/merge orphan-state recovery logic."""
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from auto_commit_service.git.operations import (
|
|
_abort_rebase_verified,
|
|
pre_cycle_recover,
|
|
pre_cycle_sync,
|
|
)
|
|
|
|
|
|
def _git(args: list[str], cwd: Path) -> subprocess.CompletedProcess:
|
|
return subprocess.run(["git"] + args, cwd=cwd, check=True, capture_output=True)
|
|
|
|
|
|
def _make_conflicted_rebase(tmp_path: Path) -> Path:
|
|
"""Return a repo stuck mid-rebase with a conflict.
|
|
|
|
Sets up two divergent branches on the same file, starts rebase,
|
|
which halts at the conflict — leaving .git/rebase-merge in place.
|
|
"""
|
|
repo = tmp_path / "repo"
|
|
repo.mkdir()
|
|
_git(["init"], repo)
|
|
_git(["config", "user.email", "test@test.com"], repo)
|
|
_git(["config", "user.name", "Test User"], repo)
|
|
|
|
# Base commit
|
|
(repo / "file.txt").write_text("base\n")
|
|
_git(["add", "."], repo)
|
|
_git(["commit", "-m", "base"], repo)
|
|
|
|
# Branch A: modify file.txt
|
|
_git(["checkout", "-b", "branch-a"], repo)
|
|
(repo / "file.txt").write_text("branch-a change\n")
|
|
_git(["add", "file.txt"], repo)
|
|
_git(["commit", "-m", "branch-a commit"], repo)
|
|
|
|
# Back to master: diverge on same file
|
|
_git(["checkout", "master"], repo)
|
|
(repo / "file.txt").write_text("master change\n")
|
|
_git(["add", "file.txt"], repo)
|
|
_git(["commit", "-m", "master commit"], repo)
|
|
|
|
# Start rebase of master onto branch-a — will conflict and halt
|
|
subprocess.run(
|
|
["git", "rebase", "branch-a"],
|
|
cwd=repo,
|
|
capture_output=True,
|
|
# Non-zero expected — conflict halts rebase
|
|
)
|
|
|
|
assert (repo / ".git" / "rebase-merge").exists(), "Expected repo to be stuck in rebase"
|
|
return repo
|
|
|
|
|
|
class TestAbortRebaseVerified:
|
|
async def test_abort_rebase_verified_clean_path(self, tmp_path: Path) -> None:
|
|
repo = _make_conflicted_rebase(tmp_path)
|
|
orig_head_ref = (repo / ".git" / "ORIG_HEAD").read_text().strip()
|
|
|
|
result = await _abort_rebase_verified(repo)
|
|
|
|
assert result is True
|
|
assert not (repo / ".git" / "rebase-merge").exists()
|
|
assert not (repo / ".git" / "rebase-apply").exists()
|
|
# HEAD should be back at ORIG_HEAD (the pre-rebase tip)
|
|
head = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=repo, capture_output=True, text=True, check=True,
|
|
).stdout.strip()
|
|
assert head == orig_head_ref
|
|
|
|
async def test_abort_rebase_verified_escalates_to_quit(self, tmp_path: Path) -> None:
|
|
"""When --abort is a no-op (leaves state behind), escalate to --quit."""
|
|
repo = _make_conflicted_rebase(tmp_path)
|
|
|
|
abort_call_count = 0
|
|
original_run = __import__(
|
|
"auto_commit_service.git.operations", fromlist=["_run_git_command"]
|
|
)._run_git_command
|
|
|
|
async def patched_run(*args, **kwargs):
|
|
nonlocal abort_call_count
|
|
if args == ("rebase", "--abort") or list(args) == ["rebase", "--abort"]:
|
|
abort_call_count += 1
|
|
# First abort call: run the real command but then re-create state
|
|
# to simulate a partially-cleared scenario where --abort "succeeded"
|
|
# but didn't fully remove the directory.
|
|
result = await original_run(*args, **kwargs)
|
|
if abort_call_count == 1:
|
|
# Re-create the sentinel to simulate abort not cleaning up
|
|
rebase_merge = kwargs["cwd"] / ".git" / "rebase-merge"
|
|
rebase_merge.mkdir(exist_ok=True)
|
|
(rebase_merge / "head-name").write_text("refs/heads/master\n")
|
|
return result
|
|
return await original_run(*args, **kwargs)
|
|
|
|
with patch(
|
|
"auto_commit_service.git.operations._run_git_command",
|
|
side_effect=patched_run,
|
|
):
|
|
result = await _abort_rebase_verified(repo)
|
|
|
|
# --quit path ran and cleaned up
|
|
assert result is True
|
|
assert not (repo / ".git" / "rebase-merge").exists()
|
|
|
|
|
|
class TestPreCycleRecover:
|
|
async def test_pre_cycle_recover_detects_rebase_state(self, tmp_path: Path) -> None:
|
|
repo = tmp_path / "repo"
|
|
repo.mkdir()
|
|
_git(["init"], repo)
|
|
_git(["config", "user.email", "test@test.com"], repo)
|
|
_git(["config", "user.name", "Test User"], repo)
|
|
(repo / "file.txt").write_text("content\n")
|
|
_git(["add", "."], repo)
|
|
_git(["commit", "-m", "initial"], repo)
|
|
|
|
# Manually plant orphan rebase state
|
|
rebase_merge = repo / ".git" / "rebase-merge"
|
|
rebase_merge.mkdir()
|
|
(rebase_merge / "head-name").write_text("refs/heads/master\n")
|
|
(rebase_merge / "onto").write_text("deadbeef\n")
|
|
|
|
result = await pre_cycle_recover(repo)
|
|
|
|
assert result["was_stuck"] is True
|
|
assert result["state"] == "rebase"
|
|
# Recovery may fail because ORIG_HEAD doesn't exist, but it must detect
|
|
assert "recovered" in result
|
|
|
|
async def test_pre_cycle_recover_detects_merge_state(self, tmp_path: Path) -> None:
|
|
repo = tmp_path / "repo"
|
|
repo.mkdir()
|
|
_git(["init"], repo)
|
|
_git(["config", "user.email", "test@test.com"], repo)
|
|
_git(["config", "user.name", "Test User"], repo)
|
|
(repo / "file.txt").write_text("content\n")
|
|
_git(["add", "."], repo)
|
|
_git(["commit", "-m", "initial"], repo)
|
|
|
|
# Plant orphan merge state
|
|
merge_head = repo / ".git" / "MERGE_HEAD"
|
|
merge_head.write_text("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef\n")
|
|
|
|
result = await pre_cycle_recover(repo)
|
|
|
|
assert result["was_stuck"] is True
|
|
assert result["state"] == "merge"
|
|
# merge --abort removes MERGE_HEAD (git will do that even if hash is fake)
|
|
assert not merge_head.exists()
|
|
assert result["recovered"] is True
|
|
|
|
async def test_pre_cycle_recover_clean_repo_noop(self, tmp_path: Path) -> None:
|
|
repo = tmp_path / "repo"
|
|
repo.mkdir()
|
|
_git(["init"], repo)
|
|
_git(["config", "user.email", "test@test.com"], repo)
|
|
_git(["config", "user.name", "Test User"], repo)
|
|
(repo / "file.txt").write_text("content\n")
|
|
_git(["add", "."], repo)
|
|
_git(["commit", "-m", "initial"], repo)
|
|
|
|
result = await pre_cycle_recover(repo)
|
|
|
|
assert result["was_stuck"] is False
|
|
assert result["state"] == "clean"
|
|
|
|
|
|
class TestPreCycleSyncRecoveryGating:
|
|
async def test_pre_cycle_sync_refuses_when_recovery_fails(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
"""pre_cycle_sync must return early with orphan_rebase_unrecovered if recovery fails."""
|
|
repo = tmp_path / "repo"
|
|
repo.mkdir()
|
|
_git(["init"], repo)
|
|
_git(["config", "user.email", "test@test.com"], repo)
|
|
_git(["config", "user.name", "Test User"], repo)
|
|
(repo / "file.txt").write_text("content\n")
|
|
_git(["add", "."], repo)
|
|
_git(["commit", "-m", "initial"], repo)
|
|
|
|
# Plant orphan rebase state that will survive all abort attempts
|
|
rebase_merge = repo / ".git" / "rebase-merge"
|
|
rebase_merge.mkdir()
|
|
(rebase_merge / "head-name").write_text("refs/heads/master\n")
|
|
|
|
# Patch _abort_rebase_verified to always return False (unrecoverable)
|
|
with patch(
|
|
"auto_commit_service.git.operations._abort_rebase_verified",
|
|
new=AsyncMock(return_value=False),
|
|
):
|
|
result = await pre_cycle_sync(repo, "origin", "master")
|
|
|
|
assert result["error"] == "orphan_rebase_unrecovered"
|
|
assert result["fetched"] is False
|
|
assert result["pulled"] is False
|
|
assert result["recovered"] is False
|