Pipelines
Pipelines provide a versatile API for automating tasks efficiently. Below some key features and best practices:
1. Reproducibility
Initialization and Configuration: Pipelines are initialized using the
__init__
method, allowing configuration of common elements. All parameters passed to the class constructor are stored in theself.hparams
dictionary, facilitating reproducibility and serialization. Additionally, theignore
parameter in the__init__
method allows exclusion of specific parameters, enhancing reproducibility by avoiding the storage of non-essential or large parameters. For example:pipeline = Pipeline(ignore=["large_param"])
ID and Working Directory: Each pipeline instance is assigned a unique identifier (
id
) upon initialization, aiding in tracking and identification. Additionally, pipelines have a designated working directory for organizing generated files, though it doesn’t alter Python’s working directory. Example:print(f"Pipeline ID: {pipeline.pipeline_id}") print(f"Working Directory: {pipeline.working_dir}")
Public Interface: Pipelines offer the
run
method as the public interface for execution. Therun
method encapsulates the pipeline’s logic and returns the output. Note that,run
is the only method that should be called directly by users. For your own version of pipeline, you should override_run
method (that is called fromrun
) Example:result = pipeline.run(argument=value)
Besides a result, the
run
method can also set public attributes of the pipeline instance. These attributes are implemented as read-only properties, ensuring a consistent state during execution. For instance, the code below:class Example(Pipeline): def __init__(self, start=0, end=100): # Cache result allows to maintain the return of `run` method at `_result` attribute # This can be accessed though the `result` property (public attribute) super().__init__(cache_result=True) self._seed = None self._start = start self._end = end # Read-only, public attribute (properties) # This attribute is set during pipeline execution and can be accessed before and after execution. # ONly these attributes (properties) and the _run method should be accessed directly by users. @property def seed(self): return self._seed def _run(self, argument): # Pipeline logic here # Set seed attribute. self._seed = int(time.time()) np.random.seed(self._seed) return np.random.randint(self._start, self._end) + argument pipeline = Example() print(pipeline.hparams) # Output: {'start': 0, 'end': 100} print(pipeline.status) #{'status': 'NOT STARTED', # 'working_dir': '/workspaces/seismic', # 'id': 'ae87a62731c04604bb35c0b9d4626982', # 'count': 0, # 'created': 1715128966.1678915, # 'start_time': None, # 'end_time': None, # 'exception_info': None, # 'cached': False} result = pipeline.run(argument=10) print(result) # Output: 91 print(pipeline.result) # Output: 91
The public attributes are
seed
that is set during the pipeline run.
2. Composition
Combining Pipelines: Pipelines can be composed of other pipelines, allowing the creation of complex workflows from simpler components. This modularity enhances flexibility and scalability in pipeline design.
For instance, consider the minimal example following example:
class Distance(Pipeline):
def __init__(self, norm: int):
super().__init__(cache_result=False)
self.norm = norm
def _run(self, x, y):
return (x**self.norm + y**self.norm) ** (1 / self.norm)
class SumOfDistances(Pipeline):
def __init__(self, constant: int, distance_pipeline: Distance):
super().__init__(ignore="distance_pipeline", cache_result=True)
self.constant = constant
self.distance_pipeline = distance_pipeline
def _run(self, items: List[Tuple[float, float]]):
return (
sum(self.distance_pipeline.run(x, y) for x, y in items)
+ self.constant
)
In this example, we have two pipelines: Distance
and SumOfDistances
. The Distance
pipeline calculates the distance between two points based on a specified norm. The SumOfDistances
pipeline calculates the sum of distances between multiple points and adds a constant value. The SumOfDistances
pipeline uses the Distance
pipeline as a component, demonstrating pipeline composition.
distance_pipeline = Distance(norm=2)
sum_of_distances_pipeline = SumOfDistances(constant=10, distance_pipeline=distance_pipeline)
sum_of_distances_pipeline.run([(1, 2), (3, 4),(5, 6)])
# Output: 25.046317653406444
3. Integration with CLI
Seamless CLI Integration: Pipelines integrate seamlessly with
jsonargparse
, enabling the creation of command-line interfaces (CLI) for easy configuration and execution. Configuration can be provided via YAML files or directly through CLI run arguments, enhancing user accessibility. Examples of CLI usage withjsonargparse
are provided. For instance, we can use theCLI
class to run a pipeline with arguments:# Example CLI usage args = [ "--constant", "10", "--distance_pipeline", '{"class_path": "Distance", "init_args": {"norm": "2"}}', "run", "--items", '[["1", "2"], ["3", "4"], ["5", "6"]]', ] result = CLI(SumOfDistances, as_positional=False, args=args)
Or write an YAML file for some of the parameters
# config.yaml
constant: 10
distance_pipeline:
class_path: Distance
init_args:
norm: 2
And run the pipeline with the YAML file:
# Example CLI usage with YAML file
result = CLI(SumOfDistances, as_positional=False, args=["--config", "config.yaml", "run", "--items", '[["1", "2"], ["3", "4"], ["5", "6"]]'])
Or write an YAML file for all the parameters
# config.yaml
constant: 10
distance_pipeline:
class_path: Distance
init_args:
norm: 2
run:
items:
- [1, 2]
- [3, 4]
- [5, 6]
And run the pipeline with the YAML file:
# Example CLI usage with YAML file
result = CLI(SumOfDistances, as_positional=False, args=["--config", "config.yaml"])
And we can run from shell:
python script.py --constant 10 --distance_pipeline '{"class_path": "Distance", "init_args": {"norm": "2"}}' run --items '[["1", "2"], ["3", "4"], ["5", "6"]]'
Or the YAML file:
python script.py --config config.yaml
4. Logging and Monitoring
Execution Log: Pipelines maintain a log of their executions, providing a comprehensive record of activities. The
status
property offers insights into the pipeline’s state, from creation to completion, facilitating monitoring and troubleshooting. Example:print(f"Pipeline Status: {pipeline.status}")
5. Clonability
Cloning Pipelines: Pipelines are cloneable, enabling the creation of independent instances from existing ones. The
clone
method initializes a deep copy, providing a clean slate for each clone. Example:cloned_pipeline = Pipeline.clone(pipeline)
Note that some attributes, such as id
, are unique to each pipeline instance and are updated during cloning to maintain uniqueness.
6. Parallel and Distributed Environments
Parallel Execution: Pipelines support parallel execution, enabling faster processing of tasks and efficient resource utilization.
Distributed Execution: Pipelines can be executed in a distributed manner, suitable for deployment on clusters to leverage distributed computing resources effectively. This scalability enhances performance in large-scale processing environments.