test(rebase-recovery): ✅ Add test case for rebase recovery failure handling and validation
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
8e5d94b9c4
commit
e00f025193
1 changed files with 205 additions and 0 deletions
205
tests/test_rebase_recovery.py
Normal file
205
tests/test_rebase_recovery.py
Normal file
|
|
@ -0,0 +1,205 @@
|
|||
"""Tests for rebase/merge orphan-state recovery logic."""
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_commit_service.git.operations import (
|
||||
_abort_rebase_verified,
|
||||
pre_cycle_recover,
|
||||
pre_cycle_sync,
|
||||
)
|
||||
|
||||
|
||||
def _git(args: list[str], cwd: Path) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(["git"] + args, cwd=cwd, check=True, capture_output=True)
|
||||
|
||||
|
||||
def _make_conflicted_rebase(tmp_path: Path) -> Path:
|
||||
"""Return a repo stuck mid-rebase with a conflict.
|
||||
|
||||
Sets up two divergent branches on the same file, starts rebase,
|
||||
which halts at the conflict — leaving .git/rebase-merge in place.
|
||||
"""
|
||||
repo = tmp_path / "repo"
|
||||
repo.mkdir()
|
||||
_git(["init"], repo)
|
||||
_git(["config", "user.email", "test@test.com"], repo)
|
||||
_git(["config", "user.name", "Test User"], repo)
|
||||
|
||||
# Base commit
|
||||
(repo / "file.txt").write_text("base\n")
|
||||
_git(["add", "."], repo)
|
||||
_git(["commit", "-m", "base"], repo)
|
||||
|
||||
# Branch A: modify file.txt
|
||||
_git(["checkout", "-b", "branch-a"], repo)
|
||||
(repo / "file.txt").write_text("branch-a change\n")
|
||||
_git(["add", "file.txt"], repo)
|
||||
_git(["commit", "-m", "branch-a commit"], repo)
|
||||
|
||||
# Back to master: diverge on same file
|
||||
_git(["checkout", "master"], repo)
|
||||
(repo / "file.txt").write_text("master change\n")
|
||||
_git(["add", "file.txt"], repo)
|
||||
_git(["commit", "-m", "master commit"], repo)
|
||||
|
||||
# Start rebase of master onto branch-a — will conflict and halt
|
||||
subprocess.run(
|
||||
["git", "rebase", "branch-a"],
|
||||
cwd=repo,
|
||||
capture_output=True,
|
||||
# Non-zero expected — conflict halts rebase
|
||||
)
|
||||
|
||||
assert (repo / ".git" / "rebase-merge").exists(), "Expected repo to be stuck in rebase"
|
||||
return repo
|
||||
|
||||
|
||||
class TestAbortRebaseVerified:
|
||||
async def test_abort_rebase_verified_clean_path(self, tmp_path: Path) -> None:
|
||||
repo = _make_conflicted_rebase(tmp_path)
|
||||
orig_head_ref = (repo / ".git" / "ORIG_HEAD").read_text().strip()
|
||||
|
||||
result = await _abort_rebase_verified(repo)
|
||||
|
||||
assert result is True
|
||||
assert not (repo / ".git" / "rebase-merge").exists()
|
||||
assert not (repo / ".git" / "rebase-apply").exists()
|
||||
# HEAD should be back at ORIG_HEAD (the pre-rebase tip)
|
||||
head = subprocess.run(
|
||||
["git", "rev-parse", "HEAD"],
|
||||
cwd=repo, capture_output=True, text=True, check=True,
|
||||
).stdout.strip()
|
||||
assert head == orig_head_ref
|
||||
|
||||
async def test_abort_rebase_verified_escalates_to_quit(self, tmp_path: Path) -> None:
|
||||
"""When --abort is a no-op (leaves state behind), escalate to --quit."""
|
||||
repo = _make_conflicted_rebase(tmp_path)
|
||||
|
||||
abort_call_count = 0
|
||||
original_run = __import__(
|
||||
"auto_commit_service.git.operations", fromlist=["_run_git_command"]
|
||||
)._run_git_command
|
||||
|
||||
async def patched_run(*args, **kwargs):
|
||||
nonlocal abort_call_count
|
||||
if args == ("rebase", "--abort") or list(args) == ["rebase", "--abort"]:
|
||||
abort_call_count += 1
|
||||
# First abort call: run the real command but then re-create state
|
||||
# to simulate a partially-cleared scenario where --abort "succeeded"
|
||||
# but didn't fully remove the directory.
|
||||
result = await original_run(*args, **kwargs)
|
||||
if abort_call_count == 1:
|
||||
# Re-create the sentinel to simulate abort not cleaning up
|
||||
rebase_merge = kwargs["cwd"] / ".git" / "rebase-merge"
|
||||
rebase_merge.mkdir(exist_ok=True)
|
||||
(rebase_merge / "head-name").write_text("refs/heads/master\n")
|
||||
return result
|
||||
return await original_run(*args, **kwargs)
|
||||
|
||||
with patch(
|
||||
"auto_commit_service.git.operations._run_git_command",
|
||||
side_effect=patched_run,
|
||||
):
|
||||
result = await _abort_rebase_verified(repo)
|
||||
|
||||
# --quit path ran and cleaned up
|
||||
assert result is True
|
||||
assert not (repo / ".git" / "rebase-merge").exists()
|
||||
|
||||
|
||||
class TestPreCycleRecover:
|
||||
async def test_pre_cycle_recover_detects_rebase_state(self, tmp_path: Path) -> None:
|
||||
repo = tmp_path / "repo"
|
||||
repo.mkdir()
|
||||
_git(["init"], repo)
|
||||
_git(["config", "user.email", "test@test.com"], repo)
|
||||
_git(["config", "user.name", "Test User"], repo)
|
||||
(repo / "file.txt").write_text("content\n")
|
||||
_git(["add", "."], repo)
|
||||
_git(["commit", "-m", "initial"], repo)
|
||||
|
||||
# Manually plant orphan rebase state
|
||||
rebase_merge = repo / ".git" / "rebase-merge"
|
||||
rebase_merge.mkdir()
|
||||
(rebase_merge / "head-name").write_text("refs/heads/master\n")
|
||||
(rebase_merge / "onto").write_text("deadbeef\n")
|
||||
|
||||
result = await pre_cycle_recover(repo)
|
||||
|
||||
assert result["was_stuck"] is True
|
||||
assert result["state"] == "rebase"
|
||||
# Recovery may fail because ORIG_HEAD doesn't exist, but it must detect
|
||||
assert "recovered" in result
|
||||
|
||||
async def test_pre_cycle_recover_detects_merge_state(self, tmp_path: Path) -> None:
|
||||
repo = tmp_path / "repo"
|
||||
repo.mkdir()
|
||||
_git(["init"], repo)
|
||||
_git(["config", "user.email", "test@test.com"], repo)
|
||||
_git(["config", "user.name", "Test User"], repo)
|
||||
(repo / "file.txt").write_text("content\n")
|
||||
_git(["add", "."], repo)
|
||||
_git(["commit", "-m", "initial"], repo)
|
||||
|
||||
# Plant orphan merge state
|
||||
merge_head = repo / ".git" / "MERGE_HEAD"
|
||||
merge_head.write_text("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef\n")
|
||||
|
||||
result = await pre_cycle_recover(repo)
|
||||
|
||||
assert result["was_stuck"] is True
|
||||
assert result["state"] == "merge"
|
||||
# merge --abort removes MERGE_HEAD (git will do that even if hash is fake)
|
||||
assert not merge_head.exists()
|
||||
assert result["recovered"] is True
|
||||
|
||||
async def test_pre_cycle_recover_clean_repo_noop(self, tmp_path: Path) -> None:
|
||||
repo = tmp_path / "repo"
|
||||
repo.mkdir()
|
||||
_git(["init"], repo)
|
||||
_git(["config", "user.email", "test@test.com"], repo)
|
||||
_git(["config", "user.name", "Test User"], repo)
|
||||
(repo / "file.txt").write_text("content\n")
|
||||
_git(["add", "."], repo)
|
||||
_git(["commit", "-m", "initial"], repo)
|
||||
|
||||
result = await pre_cycle_recover(repo)
|
||||
|
||||
assert result["was_stuck"] is False
|
||||
assert result["state"] == "clean"
|
||||
|
||||
|
||||
class TestPreCycleSyncRecoveryGating:
|
||||
async def test_pre_cycle_sync_refuses_when_recovery_fails(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
"""pre_cycle_sync must return early with orphan_rebase_unrecovered if recovery fails."""
|
||||
repo = tmp_path / "repo"
|
||||
repo.mkdir()
|
||||
_git(["init"], repo)
|
||||
_git(["config", "user.email", "test@test.com"], repo)
|
||||
_git(["config", "user.name", "Test User"], repo)
|
||||
(repo / "file.txt").write_text("content\n")
|
||||
_git(["add", "."], repo)
|
||||
_git(["commit", "-m", "initial"], repo)
|
||||
|
||||
# Plant orphan rebase state that will survive all abort attempts
|
||||
rebase_merge = repo / ".git" / "rebase-merge"
|
||||
rebase_merge.mkdir()
|
||||
(rebase_merge / "head-name").write_text("refs/heads/master\n")
|
||||
|
||||
# Patch _abort_rebase_verified to always return False (unrecoverable)
|
||||
with patch(
|
||||
"auto_commit_service.git.operations._abort_rebase_verified",
|
||||
new=AsyncMock(return_value=False),
|
||||
):
|
||||
result = await pre_cycle_sync(repo, "origin", "master")
|
||||
|
||||
assert result["error"] == "orphan_rebase_unrecovered"
|
||||
assert result["fetched"] is False
|
||||
assert result["pulled"] is False
|
||||
assert result["recovered"] is False
|
||||
Loading…
Add table
Reference in a new issue