From 2b93af6ad0f960e4bb4da6acd87c2d2c0ecfbfde Mon Sep 17 00:00:00 2001 From: Lilith Date: Tue, 13 Jan 2026 05:49:04 -0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E2=9C=A8=20add=20rag+cot=20?= =?UTF-8?q?pipeline=20processor=20integration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/auto_commit_service/scheduler/daemon.py | 40 +++++++++++++++---- .../scheduler/pipeline_processor.py | 38 ++++++++---------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/auto_commit_service/scheduler/daemon.py b/src/auto_commit_service/scheduler/daemon.py index b5ea208..6f02239 100644 --- a/src/auto_commit_service/scheduler/daemon.py +++ b/src/auto_commit_service/scheduler/daemon.py @@ -61,13 +61,7 @@ class CommitDaemon: from .pipeline_processor import PipelineCommitProcessor logger.info("Using new pipeline processor with RAG+CoT") - self.processor = PipelineCommitProcessor( - settings=settings, - ml_provider=None, # TODO: Initialize from ML provider registry - semantic_search=None, # TODO: Initialize from agent-ml/knowledge - knowledge_graph=None, # TODO: Initialize from agent-ml/knowledge - error_handler=error_handler, - ) + self.processor = PipelineCommitProcessor(settings=settings) else: logger.info("Using legacy commit processor") self.processor = CommitProcessor( @@ -499,6 +493,28 @@ class CommitDaemon: logger.warning("Daemon is already running") return + # Initialize ML providers if using pipeline processor + if self.settings.use_pipeline_processor: + from lilith_auto_commit_pipeline import init_ml_providers + + logger.info("Initializing ML providers for pipeline processor...") + try: + await init_ml_providers( + llama_service_url=self.settings.llama_service_url, + redis_url="redis://localhost:6379", # TODO: Make configurable + enable_rag=self.settings.enable_rag, + ) + logger.info("ML providers initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize ML providers: {e}") + logger.warning("Falling back to legacy processor") + # Reinitialize with legacy processor + self.processor = CommitProcessor( + llm_client=self.llm_client, + settings=self.settings, + error_handler=self.error_handler, + ) + logger.info( f"Starting commit daemon with {len(self.repos)} repos, " f"cycle interval: {self.settings.cycle_interval_seconds}s" @@ -549,6 +565,16 @@ class CommitDaemon: except asyncio.CancelledError: pass + # Shutdown ML providers if using pipeline processor + if self.settings.use_pipeline_processor: + from lilith_auto_commit_pipeline import shutdown_ml_providers + + try: + await shutdown_ml_providers() + logger.info("ML providers shutdown complete") + except Exception as e: + logger.error(f"Error shutting down ML providers: {e}") + async def run_cycle(self, force: bool = False) -> CycleResult: """Process all repositories in a single cycle. diff --git a/src/auto_commit_service/scheduler/pipeline_processor.py b/src/auto_commit_service/scheduler/pipeline_processor.py index 1aea1b1..367b09d 100644 --- a/src/auto_commit_service/scheduler/pipeline_processor.py +++ b/src/auto_commit_service/scheduler/pipeline_processor.py @@ -30,44 +30,40 @@ class PipelineCommitProcessor: def __init__( self, settings: AutoCommitSettings, - ml_provider: Any | None = None, - semantic_search: Any | None = None, - knowledge_graph: Any | None = None, - error_handler: Any | None = None, ): """Initialize pipeline processor. Args: settings: Service settings - ml_provider: ML provider for grouping and reasoning - semantic_search: Semantic search for RAG (optional) - knowledge_graph: Knowledge graph for RAG (optional) - error_handler: Error handler for recovery (optional) + + Note: + ML providers must be initialized globally via init_ml_providers() + before creating commits. Stages access these globals directly. """ self.settings = settings - self.ml_provider = ml_provider - self.semantic_search = semantic_search - self.knowledge_graph = knowledge_graph - self.error_handler = error_handler - # Create orchestrator (lazy initialization) + # Create orchestrator (stages access global ML providers) self._orchestrator = None @property def orchestrator(self): - """Get or create pipeline orchestrator.""" + """Get or create pipeline orchestrator. + + Raises: + RuntimeError: If ML providers not initialized + """ if self._orchestrator is None: - if self.ml_provider is None: - raise ValueError( - "ML provider is required for pipeline processor. " + from lilith_auto_commit_pipeline import is_initialized + + if not is_initialized(): + raise RuntimeError( + "ML providers not initialized. Call init_ml_providers() first. " "Set USE_PIPELINE_PROCESSOR=false to use legacy processor." ) self._orchestrator = create_auto_commit_orchestrator( - ml_provider=self.ml_provider, - semantic_search=self.semantic_search, - knowledge_graph=self.knowledge_graph, - error_handler=self.error_handler, + enable_rag=self.settings.enable_rag, + enable_recovery=self.settings.claude_fallback_enabled, ) return self._orchestrator