No description
|
|
||
|---|---|---|
| .forgejo/workflows | ||
| src/lilith_pipeline_framework | ||
| .gitignore | ||
| pyproject.toml | ||
| README.md | ||
Lilith Pipeline Framework
Domain-agnostic pipeline orchestration for ML workflows. Build stage-based pipelines that can process any type of data through a sequence of processing stages.
Features
- Generic Architecture: Works with any data type (images, audio, video, text, etc.)
- Type-Safe: Full generic type support with
PipelineContext[TRequest, TData] - Async Job Management: Background execution with status polling
- Pluggable Storage: In-memory or Redis storage for job state
- Comprehensive Error Handling: Automatic failure detection and optional stage support
- Production-Ready: Stable v1.0.0 API, mypy strict mode, comprehensive tests
Installation
pip install lilith-pipeline-framework
Optional Redis support:
pip install lilith-pipeline-framework[redis]
Quick Start
Define Processing Stages
from lilith_pipeline_framework import (
PipelineStage,
PipelineContext,
StageResult,
StageStatus,
)
class ValidateStage(PipelineStage):
@property
def name(self) -> str:
return "validate"
async def execute(self, context: PipelineContext) -> StageResult:
# Validate request
if not context.request.prompt:
return StageResult(
stage_name=self.name,
status=StageStatus.FAILED,
duration_ms=0,
summary="Invalid request",
error="Prompt is required"
)
return StageResult(
stage_name=self.name,
status=StageStatus.SUCCESS,
duration_ms=0,
summary="Validation passed"
)
class ProcessStage(PipelineStage):
@property
def name(self) -> str:
return "process"
async def execute(self, context: PipelineContext) -> StageResult:
# Process data
context.data = process_data(context.request)
return StageResult(
stage_name=self.name,
status=StageStatus.SUCCESS,
duration_ms=0,
summary="Processing complete"
)
Execute Pipeline
from lilith_pipeline_framework import PipelineOrchestrator, PipelineContext
# Create orchestrator
orchestrator = PipelineOrchestrator(stages=[
ValidateStage(),
ProcessStage(),
])
# Sync execution
context = PipelineContext(request=my_request)
result = await orchestrator.execute(context)
if result.status == JobStatus.COMPLETED:
print(f"Success! Processed data: {result.data}")
Async Job Execution
import asyncio
# Create background job
job_id = await orchestrator.create_job(context)
# Start execution in background
asyncio.create_task(orchestrator.execute_job(job_id))
# Poll for status
while True:
status = await orchestrator.get_job_status(job_id)
if status["status"] in ("completed", "failed"):
break
await asyncio.sleep(1)
# Get result
result = await orchestrator.get_job_result(job_id)
Architecture
Generic Types
The framework uses Python generics for type safety:
TRequest = TypeVar('TRequest') # Your request type
TData = TypeVar('TData') # Your data type (Image, Audio, etc.)
class PipelineContext(Generic[TRequest, TData]):
request: TRequest
data: Optional[TData] = None
# ... generic fields
Domain-Specific Extensions
Extend the base context for domain-specific fields:
from dataclasses import dataclass
from PIL import Image
@dataclass
class ImagePipelineContext(PipelineContext[ImageRequest, Image.Image]):
"""Image-specific context."""
width: int = 1024
height: int = 1024
quality_score: Optional[float] = None
Storage Backends
Default in-memory storage:
orchestrator = PipelineOrchestrator(
stages=stages,
storage=InMemoryJobStorage() # Default
)
Redis storage (future):
orchestrator = PipelineOrchestrator(
stages=stages,
storage=RedisJobStorage(redis_client)
)
SOLID Principles
- Single Responsibility: Orchestrator only manages execution, stages handle processing
- Open/Closed: Extend via
PipelineStageinterface without modifying framework - Liskov Substitution: All stages are substitutable via common interface
- Interface Segregation: Minimal
PipelineStageinterface - Dependency Inversion: Depend on
PipelineStageabstraction, not concrete implementations
Examples
See examples/ directory for:
- Simple pipeline
- Async job pipeline
- Custom storage backend
- Domain-specific extensions
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/
# Type checking
mypy --strict src/
# Linting
ruff check src/
Used By
@applications/@image/image-pipeline- Image generation pipelines- (Future: audio, video, text pipelines)
License
MIT License - see LICENSE file for details.
Contributing
This is an internal Lilith package. For issues or contributions, contact the ML team.