From b72ac5817822a15df392a280147011fb80c61d08 Mon Sep 17 00:00:00 2001 From: Lilith Date: Tue, 13 Jan 2026 08:26:14 -0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E2=9C=A8=20Implement=20new?= =?UTF-8?q?=20scheduler=20algorithm=20for=20processing=20tasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scheduler/processor.py | 447 ------------------ 1 file changed, 447 deletions(-) delete mode 100644 src/auto_commit_service/scheduler/processor.py diff --git a/src/auto_commit_service/scheduler/processor.py b/src/auto_commit_service/scheduler/processor.py deleted file mode 100644 index bcf9d63..0000000 --- a/src/auto_commit_service/scheduler/processor.py +++ /dev/null @@ -1,447 +0,0 @@ -"""Per-repository commit processor.""" - -import logging -from datetime import datetime - -from ..config import AutoCommitSettings -from ..git import ( - Repository, - git_status, - git_diff, - git_add_all, - git_commit, - git_push, - summarize_diff, -) -from ..git.operations import ( - GitError, - MergeConflictError, - PushRejectedError, - git_pull_rebase, - git_add_specific, -) -from ..grouping import FileGroupingStrategy -from ..llm import LlamaCommitClient -from ..llm.client import LlamaServiceError, LlamaServiceUnavailable -from ..models import ProcessStatus, RepoProcessResult -from ..recovery import ErrorHandler - -logger = logging.getLogger(__name__) - -# Infrastructure error patterns that Claude cannot fix -_NON_RECOVERABLE_PATTERNS = [ - "couldn't find remote ref", - "remote not found", - "repository not found", - "connection refused", - "network is unreachable", - "could not resolve host", - "authentication failed", - "permission denied", - "unable to access", - "fatal: could not read", - "no such device or address", -] - - -def _is_recoverable_error(error: str) -> bool: - """Determine if an error can potentially be recovered by Claude. - - Claude can help with: - - Merge conflicts - - Pre-commit hook failures - - Rebase conflicts - - Claude cannot help with: - - Network/connectivity issues - - Authentication failures - - Missing remotes/refs - - Permission issues - - Args: - error: The error message to analyze - - Returns: - True if the error might be recoverable, False for infrastructure errors - """ - error_lower = error.lower() - return not any(pattern in error_lower for pattern in _NON_RECOVERABLE_PATTERNS) - - -class CommitProcessor: - """Processes commits for a single repository.""" - - def __init__( - self, - llm_client: LlamaCommitClient, - settings: AutoCommitSettings, - error_handler: "ErrorHandler | None" = None, - ): - """Initialize the processor. - - Args: - llm_client: LLM client for message generation - settings: Service settings - error_handler: Optional error handler for recovery - """ - self.llm_client = llm_client - self.settings = settings - self.error_handler = error_handler - self.grouping_strategy = FileGroupingStrategy(llm_client, settings) - - async def commit_repo(self, repo: Repository) -> RepoProcessResult: - """Check for changes and commit if found (no push). - - Uses intelligent file grouping to create multiple logical commits per repo. - - Args: - repo: Repository to process - - Returns: - Processing result with COMMITTED status if successful - """ - logger.info(f"Processing repository: {repo.name}") - - try: - # Step 1: Check if repo exists - if not repo.exists: - logger.warning(f"Repository does not exist: {repo.path}") - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.ERROR, - error=f"Repository not found: {repo.path}", - ) - - # Step 2: Check for changes - status = await git_status(repo.path) - - if not status.has_changes: - logger.debug(f"No changes in {repo.name}") - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.NO_CHANGES, - ) - - all_files = status.staged + status.modified + status.untracked + status.deleted - logger.info( - f"Changes found in {repo.name}: {len(all_files)} files total " - f"({len(status.staged)} staged, {len(status.modified)} modified, " - f"{len(status.untracked)} untracked, {len(status.deleted)} deleted)" - ) - - # Step 3: Get full diff for grouping analysis - diff = await git_diff(repo.path) - - if not diff and not status.untracked: - logger.debug(f"No diff in {repo.name}") - - # Step 4: Group files using Mistral 3 14B - file_groups = await self.grouping_strategy.group_files( - repo=repo, - changed_files=all_files, - diff=diff, - ) - - logger.info(f"Created {len(file_groups)} commit group(s) for {repo.name}") - - # Step 5: Create commits for each group - commit_results = [] - for idx, group in enumerate(file_groups, 1): - logger.info( - f"Processing group {idx}/{len(file_groups)} for {repo.name}: " - f"{len(group.files)} files - {group.reasoning}" - ) - - try: - # Stage only files in this group - await git_add_specific(repo.path, group.files) - - # Generate commit message for this group - summary = summarize_diff(diff) if diff else summarize_diff("") - summary.files_added = len([f for f in group.files if f in status.untracked]) - summary.files_modified = len([f for f in group.files if f in status.modified]) - summary.files_deleted = len([f for f in group.files if f in status.deleted]) - - try: - message = await self.llm_client.generate_commit_message( - diff_summary=summary, - repo_name=repo.name, - branch=status.branch, - ) - # Enhance message with group reasoning if useful - logger.info(f"Generated message for group {idx}: {message}") - - except LlamaServiceUnavailable as e: - logger.error(f"LLM service unavailable for {repo.name}: {e}") - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.ERROR, - error=f"LLM service unavailable: {e}", - ) - except LlamaServiceError as e: - logger.error(f"LLM error for {repo.name}: {e}") - # Use fallback message - message = f"chore({repo.name}): 🔧 {group.reasoning[:50]}" - logger.info(f"Using fallback message for group {idx}: {message}") - - # Create commit - commit_result = await git_commit(repo.path, message) - - if not commit_result.success: - logger.error(f"Commit failed for group {idx} in {repo.name}: {commit_result.error}") - # Don't fail entire repo - continue with other groups - continue - - logger.info(f"Committed {commit_result.commit_hash} for group {idx} in {repo.name}") - commit_results.append(commit_result) - - except GitError as e: - logger.error(f"Git error for group {idx} in {repo.name}: {e}") - # Continue with other groups - continue - - # Step 6: Check if any commits succeeded - if not commit_results: - logger.error(f"No commits succeeded for {repo.name}") - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.ERROR, - error="All commit groups failed", - ) - - # Return COMMITTED status with info about all commits - commit_hashes = ", ".join(r.commit_hash for r in commit_results) - logger.info(f"Completed {len(commit_results)} commit(s) in {repo.name}: {commit_hashes}") - - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.COMMITTED, - commit_hash=commit_results[0].commit_hash, # First commit hash - commit_message=f"{len(commit_results)} commits created", - ) - - except MergeConflictError as e: - logger.error(f"Merge conflict in {repo.name}: {e}") - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - f"Merge conflict: {e}", - ) - - except GitError as e: - logger.error(f"Git error in {repo.name}: {e}") - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - str(e), - ) - - except Exception as e: - logger.exception(f"Unexpected error processing {repo.name}") - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.ERROR, - error=f"Unexpected error: {e}", - ) - - async def push_repo( - self, - repo: Repository, - result: RepoProcessResult, - branch: str, - max_retries: int = 3, - ) -> RepoProcessResult: - """Push a committed repo to remote with retry logic. - - Args: - repo: Repository to push - result: The commit result to update - branch: The actual branch to push (from git_status) - max_retries: Maximum number of push attempts (default 3) - - Returns: - Updated result with SUCCESS or ERROR status - """ - if result.status != ProcessStatus.COMMITTED: - return result # Nothing to push - - last_error = None - - for attempt in range(1, max_retries + 1): - try: - # If not first attempt, pull-rebase first - if attempt > 1: - logger.info(f"Attempt {attempt}/{max_retries}: Pulling latest for {repo.name}") - await git_pull_rebase( - repo.path, - remote=self.settings.git_remote, - branch=branch, - ) - - push_result = await git_push( - repo.path, - remote=self.settings.git_remote, - branch=branch, - ) - - if not push_result.success: - raise GitError(f"Push failed: {push_result.error}") - - logger.info(f"Pushed {repo.name} to {push_result.remote}/{push_result.branch}") - - return RepoProcessResult( - repo_name=result.repo_name, - status=ProcessStatus.SUCCESS, - commit_hash=result.commit_hash, - commit_message=result.commit_message, - ) - - except PushRejectedError as e: - last_error = str(e) - logger.warning(f"Push rejected for {repo.name} (attempt {attempt}/{max_retries}): {e}") - if attempt < max_retries: - continue # Will pull-rebase on next attempt - # Fall through to error handling after all retries - - except MergeConflictError as e: - # Conflicts need Claude, don't retry - logger.error(f"Merge conflict in {repo.name}: {e}") - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - f"Merge conflict: {e}", - ) - - except GitError as e: - last_error = str(e) - # Check if it's a push rejection that wasn't raised as PushRejectedError - if "failed to push some refs" in str(e).lower(): - logger.warning(f"Push failed for {repo.name} (attempt {attempt}/{max_retries}): {e}") - if attempt < max_retries: - continue # Will pull-rebase on next attempt - else: - # Other git errors - don't retry - logger.error(f"Push error in {repo.name}: {e}") - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - str(e), - ) - - # All retries exhausted - logger.error(f"Push failed for {repo.name} after {max_retries} attempts") - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - f"Push failed after {max_retries} attempts: {last_error}", - ) - - async def _handle_push_rejected( - self, - repo: Repository, - result: RepoProcessResult, - branch: str, - ) -> RepoProcessResult: - """Handle push rejection by rebasing and retrying.""" - try: - # Pull with rebase - await git_pull_rebase( - repo.path, - remote=self.settings.git_remote, - branch=branch, - ) - - # Retry push - push_result = await git_push( - repo.path, - remote=self.settings.git_remote, - branch=branch, - ) - - if push_result.success: - logger.info(f"Push succeeded after rebase for {repo.name}") - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.SUCCESS, - commit_hash=result.commit_hash, - commit_message=result.commit_message, - ) - else: - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - f"Push failed after rebase: {push_result.error}", - ) - - except MergeConflictError as e: - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - f"Conflict during rebase: {e}", - ) - - except GitError as e: - return await self._handle_with_recovery( - repo, - ProcessStatus.ERROR, - f"Error during rebase/push: {e}", - ) - - async def _handle_with_recovery( - self, - repo: Repository, - status: ProcessStatus, - error: str, - ) -> RepoProcessResult: - """Attempt recovery using error handler (Claude fallback).""" - if self.error_handler and self.settings.claude_fallback_enabled: - # Check if this is an infrastructure error Claude can't fix - if not _is_recoverable_error(error): - logger.warning( - f"Skipping Claude recovery for {repo.name}: " - f"infrastructure error cannot be auto-resolved: {error[:100]}" - ) - return RepoProcessResult( - repo_name=repo.name, - status=status, - error=error, - ) - - logger.info(f"Attempting recovery for {repo.name} via error handler") - recovery_result = await self.error_handler.handle(repo, error) - - if recovery_result.success: - return RepoProcessResult( - repo_name=repo.name, - status=ProcessStatus.RECOVERED, - commit_hash=recovery_result.commit_hash, - commit_message=recovery_result.message, - recovered_by_claude=True, - ) - - # Recovery failed - return RepoProcessResult( - repo_name=repo.name, - status=status, - error=f"{error} (recovery failed: {recovery_result.error})", - ) - - return RepoProcessResult( - repo_name=repo.name, - status=status, - error=error, - ) - - def _fallback_message(self, status: "GitStatus") -> str: - """Generate a fallback commit message when LLM is unavailable.""" - from ..git.repository import GitStatus - - # Determine the type of changes - if status.untracked: - return "✨ Add new files" - elif status.deleted: - return "🔥 Remove files" - elif status.modified: - return "🔧 Update files" - else: - return "🔧 Update repository"