feat(scheduler): add rag+cot pipeline processor integration

This commit is contained in:
Lilith 2026-01-13 05:49:04 -08:00
parent 27267eb6e8
commit 2b93af6ad0
2 changed files with 50 additions and 28 deletions

View file

@ -61,13 +61,7 @@ class CommitDaemon:
from .pipeline_processor import PipelineCommitProcessor
logger.info("Using new pipeline processor with RAG+CoT")
self.processor = PipelineCommitProcessor(
settings=settings,
ml_provider=None, # TODO: Initialize from ML provider registry
semantic_search=None, # TODO: Initialize from agent-ml/knowledge
knowledge_graph=None, # TODO: Initialize from agent-ml/knowledge
error_handler=error_handler,
)
self.processor = PipelineCommitProcessor(settings=settings)
else:
logger.info("Using legacy commit processor")
self.processor = CommitProcessor(
@ -499,6 +493,28 @@ class CommitDaemon:
logger.warning("Daemon is already running")
return
# Initialize ML providers if using pipeline processor
if self.settings.use_pipeline_processor:
from lilith_auto_commit_pipeline import init_ml_providers
logger.info("Initializing ML providers for pipeline processor...")
try:
await init_ml_providers(
llama_service_url=self.settings.llama_service_url,
redis_url="redis://localhost:6379", # TODO: Make configurable
enable_rag=self.settings.enable_rag,
)
logger.info("ML providers initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize ML providers: {e}")
logger.warning("Falling back to legacy processor")
# Reinitialize with legacy processor
self.processor = CommitProcessor(
llm_client=self.llm_client,
settings=self.settings,
error_handler=self.error_handler,
)
logger.info(
f"Starting commit daemon with {len(self.repos)} repos, "
f"cycle interval: {self.settings.cycle_interval_seconds}s"
@ -549,6 +565,16 @@ class CommitDaemon:
except asyncio.CancelledError:
pass
# Shutdown ML providers if using pipeline processor
if self.settings.use_pipeline_processor:
from lilith_auto_commit_pipeline import shutdown_ml_providers
try:
await shutdown_ml_providers()
logger.info("ML providers shutdown complete")
except Exception as e:
logger.error(f"Error shutting down ML providers: {e}")
async def run_cycle(self, force: bool = False) -> CycleResult:
"""Process all repositories in a single cycle.

View file

@ -30,44 +30,40 @@ class PipelineCommitProcessor:
def __init__(
self,
settings: AutoCommitSettings,
ml_provider: Any | None = None,
semantic_search: Any | None = None,
knowledge_graph: Any | None = None,
error_handler: Any | None = None,
):
"""Initialize pipeline processor.
Args:
settings: Service settings
ml_provider: ML provider for grouping and reasoning
semantic_search: Semantic search for RAG (optional)
knowledge_graph: Knowledge graph for RAG (optional)
error_handler: Error handler for recovery (optional)
Note:
ML providers must be initialized globally via init_ml_providers()
before creating commits. Stages access these globals directly.
"""
self.settings = settings
self.ml_provider = ml_provider
self.semantic_search = semantic_search
self.knowledge_graph = knowledge_graph
self.error_handler = error_handler
# Create orchestrator (lazy initialization)
# Create orchestrator (stages access global ML providers)
self._orchestrator = None
@property
def orchestrator(self):
"""Get or create pipeline orchestrator."""
"""Get or create pipeline orchestrator.
Raises:
RuntimeError: If ML providers not initialized
"""
if self._orchestrator is None:
if self.ml_provider is None:
raise ValueError(
"ML provider is required for pipeline processor. "
from lilith_auto_commit_pipeline import is_initialized
if not is_initialized():
raise RuntimeError(
"ML providers not initialized. Call init_ml_providers() first. "
"Set USE_PIPELINE_PROCESSOR=false to use legacy processor."
)
self._orchestrator = create_auto_commit_orchestrator(
ml_provider=self.ml_provider,
semantic_search=self.semantic_search,
knowledge_graph=self.knowledge_graph,
error_handler=self.error_handler,
enable_rag=self.settings.enable_rag,
enable_recovery=self.settings.claude_fallback_enabled,
)
return self._orchestrator