118 lines
4.6 KiB
Python
118 lines
4.6 KiB
Python
"""Reap pass: a dead worker's assignment is deactivated so its task re-surfaces.
|
|
|
|
When a worker dies its tmux pane vanishes and the liveness pass marks the
|
|
session `closed`, but nothing ever flipped the assignment inactive — so the
|
|
task kept *looking* assigned and the scheduler never re-offered it. The reap
|
|
pass deactivates active assignments whose session is closed-and-stale, gated
|
|
on a grace window so a momentarily-misflagged live worker is never reaped.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from uuid import UUID
|
|
|
|
from claire.domain import TaskStatus
|
|
from claire.pull import pull
|
|
from claire.rclaude import SessionRow, TmuxRow
|
|
from claire.web import service
|
|
|
|
_CWD = "/var/home/lilith/Code/@projects/@claire"
|
|
_SLUG = "var-home-lilith-Code--projects--claire"
|
|
|
|
|
|
class _FakeRclaude:
|
|
def __init__(self, sessions: list[SessionRow], tmux: list[TmuxRow]):
|
|
self._sessions = sessions
|
|
self._tmux = tmux
|
|
|
|
def list_sessions(self) -> list[SessionRow]:
|
|
return list(self._sessions)
|
|
|
|
def list_tmux(self) -> list[TmuxRow]:
|
|
return list(self._tmux)
|
|
|
|
def triage(self) -> list:
|
|
return []
|
|
|
|
|
|
def _session(uuid_hex: str, mtime: int) -> SessionRow:
|
|
return SessionRow(
|
|
host="apricot", uuid=UUID(uuid_hex), snippet="x", cwd=_CWD, mtime_epoch=mtime,
|
|
)
|
|
|
|
|
|
def _active(conn, assignment_id: UUID) -> bool:
|
|
row = conn.execute(
|
|
"SELECT active FROM assignments WHERE id = ?", (str(assignment_id),)
|
|
).fetchone()
|
|
return bool(row["active"])
|
|
|
|
|
|
def _setup_in_progress_task(conn, gen) -> UUID:
|
|
service.create_project(conn, gen, name="proj")
|
|
task = service.add_task(conn, gen, project="proj", title="t", priority=1)
|
|
service.transition_task_state(
|
|
conn, gen, task_id=task.id, to_state=TaskStatus.IN_PROGRESS
|
|
)
|
|
return task.id
|
|
|
|
|
|
def test_reaps_only_closed_and_stale_assignment(conn, gen) -> None:
|
|
"""Three workers on one in-progress task: dead-stale reaped; closed-fresh
|
|
and alive both protected."""
|
|
now = int(datetime.now(timezone.utc).timestamp())
|
|
dead = "11111111-1111-1111-1111-111111111111" # old → closed + stale
|
|
fresh_closed = "22222222-2222-2222-2222-222222222222" # closed but within grace
|
|
alive = "33333333-3333-3333-3333-333333333333" # live pane → alive
|
|
|
|
task_id = _setup_in_progress_task(conn, gen)
|
|
a_dead = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
|
|
a_fresh = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(fresh_closed))
|
|
a_alive = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(alive))
|
|
|
|
fake = _FakeRclaude(
|
|
sessions=[
|
|
_session(dead, 1_700_000_000), # years old
|
|
_session(fresh_closed, now - 300), # 5 min ago — inside the 30-min grace
|
|
_session(alive, now - 60), # freshest → claims the one live pane
|
|
],
|
|
# One live pane at the workspace → only the freshest session is alive.
|
|
tmux=[TmuxRow(host="apricot", session_name=f"claude-natalie-{_SLUG}-1779320000", detail="")],
|
|
)
|
|
|
|
stats = pull(conn, gen, rclaude=fake)
|
|
|
|
assert stats.assignments_reaped == 1
|
|
assert _active(conn, a_dead.id) is False # dead + stale → reaped
|
|
assert _active(conn, a_fresh.id) is True # closed but fresh → protected
|
|
assert _active(conn, a_alive.id) is True # alive → protected
|
|
|
|
|
|
def test_reap_is_idempotent(conn, gen) -> None:
|
|
"""A second pull over unchanged fleet reaps nothing — the dead assignment
|
|
is already inactive."""
|
|
dead = "44444444-4444-4444-4444-444444444444"
|
|
task_id = _setup_in_progress_task(conn, gen)
|
|
service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
|
|
fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[])
|
|
|
|
first = pull(conn, gen, rclaude=fake)
|
|
second = pull(conn, gen, rclaude=fake)
|
|
|
|
assert first.assignments_reaped == 1
|
|
assert second.assignments_reaped == 0
|
|
|
|
|
|
def test_reaped_task_becomes_dispatchable_again(conn, gen) -> None:
|
|
"""After reaping, the in-progress task has no active assignment, which is
|
|
exactly what the scheduler treats as unassigned open work."""
|
|
dead = "55555555-5555-5555-5555-555555555555"
|
|
task_id = _setup_in_progress_task(conn, gen)
|
|
service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
|
|
fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[])
|
|
|
|
pull(conn, gen, rclaude=fake)
|
|
|
|
active_for_task = service.read.list_assignments(conn, task_id=task_id, active_only=True)
|
|
assert active_for_task == []
|