Pipelines

Pipelines provide a versatile API for automating tasks efficiently. Below some key features and best practices:

1. Reproducibility

  • Initialization and Configuration: Pipelines are initialized using the __init__ method, allowing configuration of common elements. All parameters passed to the class constructor are stored in the self.hparams dictionary, facilitating reproducibility and serialization. Additionally, the ignore parameter in the __init__ method allows exclusion of specific parameters, enhancing reproducibility by avoiding the storage of non-essential or large parameters. For example:

    pipeline = Pipeline(ignore=["large_param"])
    
  • ID and Working Directory: Each pipeline instance is assigned a unique identifier (id) upon initialization, aiding in tracking and identification. Additionally, pipelines have a designated working directory for organizing generated files, though it doesn’t alter Python’s working directory. Example:

    print(f"Pipeline ID: {pipeline.pipeline_id}")
    print(f"Working Directory: {pipeline.working_dir}")
    
  • Public Interface: Pipelines offer the run method as the public interface for execution. The run method encapsulates the pipeline’s logic and returns the output. Note that, run is the only method that should be called directly by users. For your own version of pipeline, you should override _run method (that is called from run) Example:

    result = pipeline.run(argument=value)
    

    Besides a result, the run method can also set public attributes of the pipeline instance. These attributes are implemented as read-only properties, ensuring a consistent state during execution. For instance, the code below:

    class Example(Pipeline):
        def __init__(self, start=0, end=100):
            # Cache result allows to maintain the return of `run` method at `_result` attribute
            # This can be accessed though the `result` property (public attribute)
            super().__init__(cache_result=True)
            self._seed = None
            self._start = start
            self._end = end
    
        # Read-only, public attribute (properties)
        # This attribute is set during pipeline execution and can be accessed before and after execution.
        # ONly these attributes (properties) and the _run method should be accessed directly by users.
        @property
        def seed(self):
            return self._seed
    
        def _run(self, argument):
            # Pipeline logic here
    
            # Set seed attribute.
            self._seed = int(time.time())
            np.random.seed(self._seed)
            return np.random.randint(self._start, self._end) + argument
    
    
    pipeline = Example()
    
    print(pipeline.hparams)
    # Output: {'start': 0, 'end': 100}
    
    print(pipeline.status)
    #{'status': 'NOT STARTED',
    #  'working_dir': '/workspaces/seismic',
    #  'id': 'ae87a62731c04604bb35c0b9d4626982',
    #  'count': 0,
    #  'created': 1715128966.1678915,
    #  'start_time': None,
    #  'end_time': None,
    #  'exception_info': None,
    #  'cached': False}
    
    result = pipeline.run(argument=10)
    print(result)
    # Output: 91
    
    print(pipeline.result)
    # Output: 91
    

    The public attributes are seed that is set during the pipeline run.

2. Composition

  • Combining Pipelines: Pipelines can be composed of other pipelines, allowing the creation of complex workflows from simpler components. This modularity enhances flexibility and scalability in pipeline design.

For instance, consider the minimal example following example:

class Distance(Pipeline):
    def __init__(self, norm: int):
        super().__init__(cache_result=False)
        self.norm = norm

    def _run(self, x, y):
        return (x**self.norm + y**self.norm) ** (1 / self.norm)


class SumOfDistances(Pipeline):
    def __init__(self, constant: int, distance_pipeline: Distance):
        super().__init__(ignore="distance_pipeline", cache_result=True)
        self.constant = constant
        self.distance_pipeline = distance_pipeline

    def _run(self, items: List[Tuple[float, float]]):
        return (
            sum(self.distance_pipeline.run(x, y) for x, y in items)
            + self.constant
        )

In this example, we have two pipelines: Distance and SumOfDistances. The Distance pipeline calculates the distance between two points based on a specified norm. The SumOfDistances pipeline calculates the sum of distances between multiple points and adds a constant value. The SumOfDistances pipeline uses the Distance pipeline as a component, demonstrating pipeline composition.

distance_pipeline = Distance(norm=2)
sum_of_distances_pipeline = SumOfDistances(constant=10, distance_pipeline=distance_pipeline)
sum_of_distances_pipeline.run([(1, 2), (3, 4),(5, 6)])
# Output: 25.046317653406444

3. Integration with CLI

  • Seamless CLI Integration: Pipelines integrate seamlessly with jsonargparse, enabling the creation of command-line interfaces (CLI) for easy configuration and execution. Configuration can be provided via YAML files or directly through CLI run arguments, enhancing user accessibility. Examples of CLI usage with jsonargparse are provided. For instance, we can use the CLI class to run a pipeline with arguments:

    # Example CLI usage
    args = [
        "--constant",
        "10",
        "--distance_pipeline",
        '{"class_path": "Distance", "init_args": {"norm": "2"}}',
        "run",
        "--items",
        '[["1", "2"], ["3", "4"], ["5", "6"]]',
    ]
    
    result = CLI(SumOfDistances, as_positional=False, args=args)
    

Or write an YAML file for some of the parameters

# config.yaml
constant: 10
distance_pipeline:
  class_path: Distance
  init_args:
    norm: 2

And run the pipeline with the YAML file:

# Example CLI usage with YAML file
result = CLI(SumOfDistances, as_positional=False, args=["--config", "config.yaml", "run", "--items", '[["1", "2"], ["3", "4"], ["5", "6"]]'])

Or write an YAML file for all the parameters

# config.yaml
constant: 10
distance_pipeline:
  class_path: Distance
  init_args:
    norm: 2
run:
  items:
    - [1, 2]
    - [3, 4]
    - [5, 6]

And run the pipeline with the YAML file:

# Example CLI usage with YAML file
result = CLI(SumOfDistances, as_positional=False, args=["--config", "config.yaml"])

And we can run from shell:

python script.py --constant 10 --distance_pipeline '{"class_path": "Distance", "init_args": {"norm": "2"}}' run --items '[["1", "2"], ["3", "4"], ["5", "6"]]'

Or the YAML file:

python script.py --config config.yaml

4. Logging and Monitoring

  • Execution Log: Pipelines maintain a log of their executions, providing a comprehensive record of activities. The status property offers insights into the pipeline’s state, from creation to completion, facilitating monitoring and troubleshooting. Example:

    print(f"Pipeline Status: {pipeline.status}")
    

5. Clonability

  • Cloning Pipelines: Pipelines are cloneable, enabling the creation of independent instances from existing ones. The clone method initializes a deep copy, providing a clean slate for each clone. Example:

    cloned_pipeline = Pipeline.clone(pipeline)
    

Note that some attributes, such as id, are unique to each pipeline instance and are updated during cloning to maintain uniqueness.

6. Parallel and Distributed Environments

  • Parallel Execution: Pipelines support parallel execution, enabling faster processing of tasks and efficient resource utilization.

  • Distributed Execution: Pipelines can be executed in a distributed manner, suitable for deployment on clusters to leverage distributed computing resources effectively. This scalability enhances performance in large-scale processing environments.