feat(scheduler): Implement new scheduler algorithm for processing tasks

This commit is contained in:
Lilith 2026-01-13 08:26:14 -08:00
parent 868b566af8
commit b72ac58178

View file

@ -1,447 +0,0 @@
"""Per-repository commit processor."""
import logging
from datetime import datetime
from ..config import AutoCommitSettings
from ..git import (
Repository,
git_status,
git_diff,
git_add_all,
git_commit,
git_push,
summarize_diff,
)
from ..git.operations import (
GitError,
MergeConflictError,
PushRejectedError,
git_pull_rebase,
git_add_specific,
)
from ..grouping import FileGroupingStrategy
from ..llm import LlamaCommitClient
from ..llm.client import LlamaServiceError, LlamaServiceUnavailable
from ..models import ProcessStatus, RepoProcessResult
from ..recovery import ErrorHandler
logger = logging.getLogger(__name__)
# Infrastructure error patterns that Claude cannot fix
_NON_RECOVERABLE_PATTERNS = [
"couldn't find remote ref",
"remote not found",
"repository not found",
"connection refused",
"network is unreachable",
"could not resolve host",
"authentication failed",
"permission denied",
"unable to access",
"fatal: could not read",
"no such device or address",
]
def _is_recoverable_error(error: str) -> bool:
"""Determine if an error can potentially be recovered by Claude.
Claude can help with:
- Merge conflicts
- Pre-commit hook failures
- Rebase conflicts
Claude cannot help with:
- Network/connectivity issues
- Authentication failures
- Missing remotes/refs
- Permission issues
Args:
error: The error message to analyze
Returns:
True if the error might be recoverable, False for infrastructure errors
"""
error_lower = error.lower()
return not any(pattern in error_lower for pattern in _NON_RECOVERABLE_PATTERNS)
class CommitProcessor:
"""Processes commits for a single repository."""
def __init__(
self,
llm_client: LlamaCommitClient,
settings: AutoCommitSettings,
error_handler: "ErrorHandler | None" = None,
):
"""Initialize the processor.
Args:
llm_client: LLM client for message generation
settings: Service settings
error_handler: Optional error handler for recovery
"""
self.llm_client = llm_client
self.settings = settings
self.error_handler = error_handler
self.grouping_strategy = FileGroupingStrategy(llm_client, settings)
async def commit_repo(self, repo: Repository) -> RepoProcessResult:
"""Check for changes and commit if found (no push).
Uses intelligent file grouping to create multiple logical commits per repo.
Args:
repo: Repository to process
Returns:
Processing result with COMMITTED status if successful
"""
logger.info(f"Processing repository: {repo.name}")
try:
# Step 1: Check if repo exists
if not repo.exists:
logger.warning(f"Repository does not exist: {repo.path}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.ERROR,
error=f"Repository not found: {repo.path}",
)
# Step 2: Check for changes
status = await git_status(repo.path)
if not status.has_changes:
logger.debug(f"No changes in {repo.name}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.NO_CHANGES,
)
all_files = status.staged + status.modified + status.untracked + status.deleted
logger.info(
f"Changes found in {repo.name}: {len(all_files)} files total "
f"({len(status.staged)} staged, {len(status.modified)} modified, "
f"{len(status.untracked)} untracked, {len(status.deleted)} deleted)"
)
# Step 3: Get full diff for grouping analysis
diff = await git_diff(repo.path)
if not diff and not status.untracked:
logger.debug(f"No diff in {repo.name}")
# Step 4: Group files using Mistral 3 14B
file_groups = await self.grouping_strategy.group_files(
repo=repo,
changed_files=all_files,
diff=diff,
)
logger.info(f"Created {len(file_groups)} commit group(s) for {repo.name}")
# Step 5: Create commits for each group
commit_results = []
for idx, group in enumerate(file_groups, 1):
logger.info(
f"Processing group {idx}/{len(file_groups)} for {repo.name}: "
f"{len(group.files)} files - {group.reasoning}"
)
try:
# Stage only files in this group
await git_add_specific(repo.path, group.files)
# Generate commit message for this group
summary = summarize_diff(diff) if diff else summarize_diff("")
summary.files_added = len([f for f in group.files if f in status.untracked])
summary.files_modified = len([f for f in group.files if f in status.modified])
summary.files_deleted = len([f for f in group.files if f in status.deleted])
try:
message = await self.llm_client.generate_commit_message(
diff_summary=summary,
repo_name=repo.name,
branch=status.branch,
)
# Enhance message with group reasoning if useful
logger.info(f"Generated message for group {idx}: {message}")
except LlamaServiceUnavailable as e:
logger.error(f"LLM service unavailable for {repo.name}: {e}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.ERROR,
error=f"LLM service unavailable: {e}",
)
except LlamaServiceError as e:
logger.error(f"LLM error for {repo.name}: {e}")
# Use fallback message
message = f"chore({repo.name}): 🔧 {group.reasoning[:50]}"
logger.info(f"Using fallback message for group {idx}: {message}")
# Create commit
commit_result = await git_commit(repo.path, message)
if not commit_result.success:
logger.error(f"Commit failed for group {idx} in {repo.name}: {commit_result.error}")
# Don't fail entire repo - continue with other groups
continue
logger.info(f"Committed {commit_result.commit_hash} for group {idx} in {repo.name}")
commit_results.append(commit_result)
except GitError as e:
logger.error(f"Git error for group {idx} in {repo.name}: {e}")
# Continue with other groups
continue
# Step 6: Check if any commits succeeded
if not commit_results:
logger.error(f"No commits succeeded for {repo.name}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.ERROR,
error="All commit groups failed",
)
# Return COMMITTED status with info about all commits
commit_hashes = ", ".join(r.commit_hash for r in commit_results)
logger.info(f"Completed {len(commit_results)} commit(s) in {repo.name}: {commit_hashes}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.COMMITTED,
commit_hash=commit_results[0].commit_hash, # First commit hash
commit_message=f"{len(commit_results)} commits created",
)
except MergeConflictError as e:
logger.error(f"Merge conflict in {repo.name}: {e}")
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
f"Merge conflict: {e}",
)
except GitError as e:
logger.error(f"Git error in {repo.name}: {e}")
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
str(e),
)
except Exception as e:
logger.exception(f"Unexpected error processing {repo.name}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.ERROR,
error=f"Unexpected error: {e}",
)
async def push_repo(
self,
repo: Repository,
result: RepoProcessResult,
branch: str,
max_retries: int = 3,
) -> RepoProcessResult:
"""Push a committed repo to remote with retry logic.
Args:
repo: Repository to push
result: The commit result to update
branch: The actual branch to push (from git_status)
max_retries: Maximum number of push attempts (default 3)
Returns:
Updated result with SUCCESS or ERROR status
"""
if result.status != ProcessStatus.COMMITTED:
return result # Nothing to push
last_error = None
for attempt in range(1, max_retries + 1):
try:
# If not first attempt, pull-rebase first
if attempt > 1:
logger.info(f"Attempt {attempt}/{max_retries}: Pulling latest for {repo.name}")
await git_pull_rebase(
repo.path,
remote=self.settings.git_remote,
branch=branch,
)
push_result = await git_push(
repo.path,
remote=self.settings.git_remote,
branch=branch,
)
if not push_result.success:
raise GitError(f"Push failed: {push_result.error}")
logger.info(f"Pushed {repo.name} to {push_result.remote}/{push_result.branch}")
return RepoProcessResult(
repo_name=result.repo_name,
status=ProcessStatus.SUCCESS,
commit_hash=result.commit_hash,
commit_message=result.commit_message,
)
except PushRejectedError as e:
last_error = str(e)
logger.warning(f"Push rejected for {repo.name} (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
continue # Will pull-rebase on next attempt
# Fall through to error handling after all retries
except MergeConflictError as e:
# Conflicts need Claude, don't retry
logger.error(f"Merge conflict in {repo.name}: {e}")
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
f"Merge conflict: {e}",
)
except GitError as e:
last_error = str(e)
# Check if it's a push rejection that wasn't raised as PushRejectedError
if "failed to push some refs" in str(e).lower():
logger.warning(f"Push failed for {repo.name} (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
continue # Will pull-rebase on next attempt
else:
# Other git errors - don't retry
logger.error(f"Push error in {repo.name}: {e}")
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
str(e),
)
# All retries exhausted
logger.error(f"Push failed for {repo.name} after {max_retries} attempts")
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
f"Push failed after {max_retries} attempts: {last_error}",
)
async def _handle_push_rejected(
self,
repo: Repository,
result: RepoProcessResult,
branch: str,
) -> RepoProcessResult:
"""Handle push rejection by rebasing and retrying."""
try:
# Pull with rebase
await git_pull_rebase(
repo.path,
remote=self.settings.git_remote,
branch=branch,
)
# Retry push
push_result = await git_push(
repo.path,
remote=self.settings.git_remote,
branch=branch,
)
if push_result.success:
logger.info(f"Push succeeded after rebase for {repo.name}")
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.SUCCESS,
commit_hash=result.commit_hash,
commit_message=result.commit_message,
)
else:
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
f"Push failed after rebase: {push_result.error}",
)
except MergeConflictError as e:
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
f"Conflict during rebase: {e}",
)
except GitError as e:
return await self._handle_with_recovery(
repo,
ProcessStatus.ERROR,
f"Error during rebase/push: {e}",
)
async def _handle_with_recovery(
self,
repo: Repository,
status: ProcessStatus,
error: str,
) -> RepoProcessResult:
"""Attempt recovery using error handler (Claude fallback)."""
if self.error_handler and self.settings.claude_fallback_enabled:
# Check if this is an infrastructure error Claude can't fix
if not _is_recoverable_error(error):
logger.warning(
f"Skipping Claude recovery for {repo.name}: "
f"infrastructure error cannot be auto-resolved: {error[:100]}"
)
return RepoProcessResult(
repo_name=repo.name,
status=status,
error=error,
)
logger.info(f"Attempting recovery for {repo.name} via error handler")
recovery_result = await self.error_handler.handle(repo, error)
if recovery_result.success:
return RepoProcessResult(
repo_name=repo.name,
status=ProcessStatus.RECOVERED,
commit_hash=recovery_result.commit_hash,
commit_message=recovery_result.message,
recovered_by_claude=True,
)
# Recovery failed
return RepoProcessResult(
repo_name=repo.name,
status=status,
error=f"{error} (recovery failed: {recovery_result.error})",
)
return RepoProcessResult(
repo_name=repo.name,
status=status,
error=error,
)
def _fallback_message(self, status: "GitStatus") -> str:
"""Generate a fallback commit message when LLM is unavailable."""
from ..git.repository import GitStatus
# Determine the type of changes
if status.untracked:
return "✨ Add new files"
elif status.deleted:
return "🔥 Remove files"
elif status.modified:
return "🔧 Update files"
else:
return "🔧 Update repository"