240 lines
8.4 KiB
Python
240 lines
8.4 KiB
Python
"""Tests for git operations module."""
|
|
|
|
import subprocess
|
|
|
|
import pytest
|
|
from pathlib import Path
|
|
|
|
from auto_commit_service.git import (
|
|
git_status,
|
|
git_diff,
|
|
git_add_all,
|
|
git_commit,
|
|
git_fetch,
|
|
pre_cycle_sync,
|
|
)
|
|
from auto_commit_service.git.repository import Repository
|
|
|
|
|
|
class TestGitStatus:
|
|
"""Tests for git_status function."""
|
|
|
|
async def test_clean_repo(self, temp_git_repo: Path) -> None:
|
|
"""Test status of a clean repository."""
|
|
status = await git_status(temp_git_repo)
|
|
|
|
assert not status.has_changes
|
|
assert status.staged == []
|
|
assert status.modified == []
|
|
assert status.untracked == []
|
|
|
|
async def test_with_untracked_file(self, temp_git_repo: Path) -> None:
|
|
"""Test status with an untracked file."""
|
|
(temp_git_repo / "new_file.txt").write_text("content")
|
|
|
|
status = await git_status(temp_git_repo)
|
|
|
|
assert status.has_changes
|
|
assert "new_file.txt" in status.untracked
|
|
|
|
async def test_with_modified_file(self, temp_git_repo: Path) -> None:
|
|
"""Test status with a modified file."""
|
|
(temp_git_repo / "README.md").write_text("# Modified\n")
|
|
|
|
status = await git_status(temp_git_repo)
|
|
|
|
assert status.has_changes
|
|
assert "README.md" in status.modified
|
|
|
|
|
|
class TestGitDiff:
|
|
"""Tests for git_diff function."""
|
|
|
|
async def test_no_changes(self, temp_git_repo: Path) -> None:
|
|
"""Test diff with no changes."""
|
|
diff = await git_diff(temp_git_repo)
|
|
assert diff == ""
|
|
|
|
async def test_with_changes(self, temp_git_repo: Path) -> None:
|
|
"""Test diff with modified file."""
|
|
(temp_git_repo / "README.md").write_text("# Modified content\n")
|
|
|
|
diff = await git_diff(temp_git_repo)
|
|
|
|
assert "README.md" in diff
|
|
assert "Modified content" in diff
|
|
|
|
|
|
class TestGitCommit:
|
|
"""Tests for git_commit function."""
|
|
|
|
async def test_commit_changes(self, temp_git_repo: Path) -> None:
|
|
"""Test committing staged changes."""
|
|
# Create and stage a file
|
|
(temp_git_repo / "new_file.py").write_text("print('hello')")
|
|
await git_add_all(temp_git_repo)
|
|
|
|
# Commit
|
|
result = await git_commit(temp_git_repo, "✨ Add new file")
|
|
|
|
assert result.success
|
|
assert result.commit_hash is not None
|
|
assert len(result.commit_hash) >= 7
|
|
|
|
async def test_commit_nothing_to_commit(self, temp_git_repo: Path) -> None:
|
|
"""Test commit with nothing to commit."""
|
|
result = await git_commit(temp_git_repo, "Empty commit")
|
|
|
|
assert not result.success
|
|
assert "Nothing to commit" in (result.error or "")
|
|
|
|
|
|
class TestRepository:
|
|
"""Tests for Repository class."""
|
|
|
|
def test_exists_valid_repo(self, temp_git_repo: Path) -> None:
|
|
"""Test exists property for valid repo."""
|
|
repo = Repository(name="test", path=temp_git_repo)
|
|
assert repo.exists
|
|
|
|
def test_exists_invalid_path(self, tmp_path: Path) -> None:
|
|
"""Test exists property for non-existent path."""
|
|
repo = Repository(name="test", path=tmp_path / "nonexistent")
|
|
assert not repo.exists
|
|
|
|
def test_exists_non_git_directory(self, tmp_path: Path) -> None:
|
|
"""Test exists property for directory without .git."""
|
|
non_git = tmp_path / "not-a-repo"
|
|
non_git.mkdir()
|
|
repo = Repository(name="test", path=non_git)
|
|
assert not repo.exists
|
|
|
|
|
|
def _make_bare_remote(tmp_path: Path, repo_path: Path) -> Path:
|
|
"""Create a bare remote and configure repo to push/fetch from it."""
|
|
bare = tmp_path / "remote.git"
|
|
subprocess.run(["git", "init", "--bare", str(bare)], check=True, capture_output=True)
|
|
subprocess.run(
|
|
["git", "remote", "add", "origin", str(bare)],
|
|
cwd=repo_path, check=True, capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["git", "push", "-u", "origin", "master"],
|
|
cwd=repo_path, check=True, capture_output=True,
|
|
)
|
|
return bare
|
|
|
|
|
|
def _clone_from_bare(tmp_path: Path, bare: Path, name: str) -> Path:
|
|
"""Clone from bare remote into a new working copy."""
|
|
clone = tmp_path / name
|
|
subprocess.run(
|
|
["git", "clone", str(bare), str(clone)],
|
|
check=True, capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["git", "config", "user.email", "test@test.com"],
|
|
cwd=clone, check=True, capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["git", "config", "user.name", "Test User"],
|
|
cwd=clone, check=True, capture_output=True,
|
|
)
|
|
return clone
|
|
|
|
|
|
class TestGitFetch:
|
|
"""Tests for git_fetch function."""
|
|
|
|
async def test_fetch_success(self, temp_git_repo: Path, tmp_path: Path) -> None:
|
|
"""Test fetching from a valid remote."""
|
|
bare = _make_bare_remote(tmp_path, temp_git_repo)
|
|
result = await git_fetch(temp_git_repo, "origin")
|
|
assert result is True
|
|
|
|
async def test_fetch_no_remote(self, temp_git_repo: Path) -> None:
|
|
"""Test fetch with no remote configured (graceful failure)."""
|
|
result = await git_fetch(temp_git_repo, "origin")
|
|
assert result is False
|
|
|
|
|
|
class TestPreCycleSync:
|
|
"""Tests for pre_cycle_sync function."""
|
|
|
|
async def test_not_behind(self, temp_git_repo: Path, tmp_path: Path) -> None:
|
|
"""Test sync when already up-to-date."""
|
|
_make_bare_remote(tmp_path, temp_git_repo)
|
|
|
|
result = await pre_cycle_sync(temp_git_repo, "origin", "master")
|
|
|
|
assert result["fetched"] is True
|
|
assert result["pulled"] is False
|
|
assert result["behind_count"] == 0
|
|
assert result["error"] is None
|
|
|
|
async def test_behind_clean_tree(self, temp_git_repo: Path, tmp_path: Path) -> None:
|
|
"""Test sync when behind with clean working tree — pulls without stash."""
|
|
bare = _make_bare_remote(tmp_path, temp_git_repo)
|
|
|
|
# Create a second clone, commit, push — making temp_git_repo behind
|
|
other = _clone_from_bare(tmp_path, bare, "other-clone")
|
|
(other / "other.txt").write_text("from other host")
|
|
subprocess.run(["git", "add", "."], cwd=other, check=True, capture_output=True)
|
|
subprocess.run(
|
|
["git", "commit", "-m", "commit from other host"],
|
|
cwd=other, check=True, capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["git", "push", "origin", "master"],
|
|
cwd=other, check=True, capture_output=True,
|
|
)
|
|
|
|
result = await pre_cycle_sync(temp_git_repo, "origin", "master")
|
|
|
|
assert result["fetched"] is True
|
|
assert result["pulled"] is True
|
|
assert result["behind_count"] == 1
|
|
assert result["skipped_dirty"] is False
|
|
assert result["error"] is None
|
|
# Verify the file arrived
|
|
assert (temp_git_repo / "other.txt").exists()
|
|
|
|
async def test_behind_dirty_tree_skips_pull(self, temp_git_repo: Path, tmp_path: Path) -> None:
|
|
"""Test sync when behind with dirty working tree — skips pull entirely."""
|
|
bare = _make_bare_remote(tmp_path, temp_git_repo)
|
|
|
|
# Push a commit from another clone
|
|
other = _clone_from_bare(tmp_path, bare, "other-clone")
|
|
(other / "other.txt").write_text("from other host")
|
|
subprocess.run(["git", "add", "."], cwd=other, check=True, capture_output=True)
|
|
subprocess.run(
|
|
["git", "commit", "-m", "commit from other host"],
|
|
cwd=other, check=True, capture_output=True,
|
|
)
|
|
subprocess.run(
|
|
["git", "push", "origin", "master"],
|
|
cwd=other, check=True, capture_output=True,
|
|
)
|
|
|
|
# Dirty the local working tree
|
|
(temp_git_repo / "local_wip.txt").write_text("work in progress")
|
|
|
|
result = await pre_cycle_sync(temp_git_repo, "origin", "master")
|
|
|
|
assert result["fetched"] is True
|
|
assert result["pulled"] is False
|
|
assert result["skipped_dirty"] is True
|
|
assert result["behind_count"] == 1
|
|
assert result["error"] is None
|
|
# Remote file NOT pulled (skipped due to dirty tree)
|
|
assert not (temp_git_repo / "other.txt").exists()
|
|
# Local WIP untouched
|
|
assert (temp_git_repo / "local_wip.txt").exists()
|
|
|
|
async def test_fetch_failure(self, temp_git_repo: Path) -> None:
|
|
"""Test sync with no remote — fetch fails gracefully."""
|
|
result = await pre_cycle_sync(temp_git_repo, "origin", "master")
|
|
|
|
assert result["fetched"] is False
|
|
assert result["pulled"] is False
|
|
assert result["error"] == "fetch_failed"
|