No description
Find a file
autocommit 9add1c48c0
Some checks failed
Publish / publish (push) Failing after 0s
Publish to PyPI / Build and Publish (push) Failing after 45s
deps-upgrade(deps): ⬆️ Update dependencies to latest stable versions in pyproject.toml
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-04-12 00:21:52 -07:00
.forgejo/workflows chore: initial commit with DRY workflow 2026-01-21 12:49:02 -08:00
src/lilith_pipeline_framework chore(lilith_pipeline_fram): 🔧 Update base.py configuration for pipeline dependency updates 2026-01-24 22:40:05 -08:00
.gitignore chore(gitignore): Add missing patterns 2026-01-24 22:39:56 -08:00
pyproject.toml deps-upgrade(deps): ⬆️ Update dependencies to latest stable versions in pyproject.toml 2026-04-12 00:21:52 -07:00
README.md chore: initial commit with DRY workflow 2026-01-21 12:49:02 -08:00

Lilith Pipeline Framework

Domain-agnostic pipeline orchestration for ML workflows. Build stage-based pipelines that can process any type of data through a sequence of processing stages.

Features

  • Generic Architecture: Works with any data type (images, audio, video, text, etc.)
  • Type-Safe: Full generic type support with PipelineContext[TRequest, TData]
  • Async Job Management: Background execution with status polling
  • Pluggable Storage: In-memory or Redis storage for job state
  • Comprehensive Error Handling: Automatic failure detection and optional stage support
  • Production-Ready: Stable v1.0.0 API, mypy strict mode, comprehensive tests

Installation

pip install lilith-pipeline-framework

Optional Redis support:

pip install lilith-pipeline-framework[redis]

Quick Start

Define Processing Stages

from lilith_pipeline_framework import (
    PipelineStage,
    PipelineContext,
    StageResult,
    StageStatus,
)

class ValidateStage(PipelineStage):
    @property
    def name(self) -> str:
        return "validate"

    async def execute(self, context: PipelineContext) -> StageResult:
        # Validate request
        if not context.request.prompt:
            return StageResult(
                stage_name=self.name,
                status=StageStatus.FAILED,
                duration_ms=0,
                summary="Invalid request",
                error="Prompt is required"
            )

        return StageResult(
            stage_name=self.name,
            status=StageStatus.SUCCESS,
            duration_ms=0,
            summary="Validation passed"
        )

class ProcessStage(PipelineStage):
    @property
    def name(self) -> str:
        return "process"

    async def execute(self, context: PipelineContext) -> StageResult:
        # Process data
        context.data = process_data(context.request)

        return StageResult(
            stage_name=self.name,
            status=StageStatus.SUCCESS,
            duration_ms=0,
            summary="Processing complete"
        )

Execute Pipeline

from lilith_pipeline_framework import PipelineOrchestrator, PipelineContext

# Create orchestrator
orchestrator = PipelineOrchestrator(stages=[
    ValidateStage(),
    ProcessStage(),
])

# Sync execution
context = PipelineContext(request=my_request)
result = await orchestrator.execute(context)

if result.status == JobStatus.COMPLETED:
    print(f"Success! Processed data: {result.data}")

Async Job Execution

import asyncio

# Create background job
job_id = await orchestrator.create_job(context)

# Start execution in background
asyncio.create_task(orchestrator.execute_job(job_id))

# Poll for status
while True:
    status = await orchestrator.get_job_status(job_id)
    if status["status"] in ("completed", "failed"):
        break
    await asyncio.sleep(1)

# Get result
result = await orchestrator.get_job_result(job_id)

Architecture

Generic Types

The framework uses Python generics for type safety:

TRequest = TypeVar('TRequest')  # Your request type
TData = TypeVar('TData')        # Your data type (Image, Audio, etc.)

class PipelineContext(Generic[TRequest, TData]):
    request: TRequest
    data: Optional[TData] = None
    # ... generic fields

Domain-Specific Extensions

Extend the base context for domain-specific fields:

from dataclasses import dataclass
from PIL import Image

@dataclass
class ImagePipelineContext(PipelineContext[ImageRequest, Image.Image]):
    """Image-specific context."""
    width: int = 1024
    height: int = 1024
    quality_score: Optional[float] = None

Storage Backends

Default in-memory storage:

orchestrator = PipelineOrchestrator(
    stages=stages,
    storage=InMemoryJobStorage()  # Default
)

Redis storage (future):

orchestrator = PipelineOrchestrator(
    stages=stages,
    storage=RedisJobStorage(redis_client)
)

SOLID Principles

  • Single Responsibility: Orchestrator only manages execution, stages handle processing
  • Open/Closed: Extend via PipelineStage interface without modifying framework
  • Liskov Substitution: All stages are substitutable via common interface
  • Interface Segregation: Minimal PipelineStage interface
  • Dependency Inversion: Depend on PipelineStage abstraction, not concrete implementations

Examples

See examples/ directory for:

  • Simple pipeline
  • Async job pipeline
  • Custom storage backend
  • Domain-specific extensions

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/

# Type checking
mypy --strict src/

# Linting
ruff check src/

Used By

  • @applications/@image/image-pipeline - Image generation pipelines
  • (Future: audio, video, text pipelines)

License

MIT License - see LICENSE file for details.

Contributing

This is an internal Lilith package. For issues or contributions, contact the ML team.