Each Python transforms sub-project in a repository exposes a single transforms.api.Pipeline
object. This Pipeline
object is used to do the following:
@transform
logic responsible for building a given dataset during a Foundry build.In most cases, the default repository setup will register transforms automatically, and the Pipeline
object will not require special configuration.
When a transform that is associated with your project’s pipeline declares a dataset as an Output
, you can build this dataset in Foundry. The two recommended ways to add transforms to a Pipeline
object are automatic registration and manual registration.
If you have a more advanced workflow and/or want to explicitly add each transform to your project’s pipeline, you can use manual registration. Otherwise, it is highly recommended that you use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the discover_transforms
method recursively discovers any transforms defined at the module level. Refer to the sections below for more information.
The discover_transforms
method imports every module it finds. As a result, any code in your imported modules will be executed.
As the complexity of a project grows, manually adding transforms to a Pipeline
object can become unwieldy. To remedy this, the Pipeline
object provides the discover_transforms()
method to recursively discover all transforms in a Python module or package.
Copied!1 2 3 4 5 6
from transforms.api import Pipeline import my_module # This is where your transform definition lives my_pipeline = Pipeline() my_pipeline.discover_transforms(my_module)
Transforms can be manually added to a Pipeline
object using the add_transforms()
function. This function takes any number of transforms and adds them to the pipeline. It also checks whether any two transforms declare the same output dataset.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, Pipeline, Input, Output @transform.using( my_output=Output('/path/to/output/dataset'), my_input=Input('/path/to/input/dataset') ) def my_compute_function(my_output, my_input): my_output.write_table(my_input.polars()) my_pipeline = Pipeline() my_pipeline.add_transforms(my_compute_function)
If you want to define a data transformation that creates multiple outputs, you can either use transform generation or define a multiple-output transform. With transform generation, it may be necessary to read and process the same input once for each output. With a multiple-output transform, it is possible to read and process the input just once. For more information, review the documentation on optimizing multi-output transforms.
You may want to re-use the same data transformation logic across multiple transforms. For instance, consider the following scenarios:
In both cases, it would be useful to use the same data transformation code across multiple transforms. Instead of separately defining a transform object for each of your outputs, you can generate transform objects using a for-loop and register them in bulk to your project’s pipeline.
Below is an example of a transform generator:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
import polars as pl from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # To capture the source variable in the function, pass it as a keyword argument with a default value. df = my_input.polars() filtered = df.filter(pl.col('source') == source) output.write_table(filtered) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # To capture the source variable in the function, pass it as a keyword argument with a default value. df = my_input.pandas() filtered = df[df['source'] == source] output.write_table(filtered) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
If using manual registration, you can then add the generated transforms to the pipeline. If you are unfamiliar with the *
syntax, refer to the Python documentation.
Copied!1 2 3 4
import my_module my_pipeline = Pipeline() my_pipeline.add_transforms(*my_module.TRANSFORMS)
Read the following considerations carefully for information on how to avoid errors and failures.
source
keyword argument with a default value in your compute function.The default entry point and Pipeline
object setup is sufficient for most use cases. Configuring the entry point is only recommended for multi-pipeline repositories or differing directory structures.
The runtime responsible for executing a Python transformation needs to be able to find the project’s Pipeline
object. To export a Pipeline
object, add it to the entry_points
argument in the setup.py
file in a transforms Python sub-project. For more information about entry points, refer to the setuptools library documentation ↗.
By default, it is required that each Python sub-project exports a transforms.pipelines
entry point named root
. This entry point references the module name and the Pipeline
attribute.
For example, if you have a Pipeline
called “my_pipeline” defined in myproject/pipeline.py
as show below:
Copied!1 2 3 4
from transforms.api import Pipeline my_pipeline = Pipeline()
You can register this Pipeline
in setup.py
as follows:
Copied!1 2 3 4 5 6 7 8 9 10
from setuptools import find_packages, setup setup( entry_points={ 'transforms.pipelines': [ 'root = myproject.pipeline:my_pipeline' ] } )
In the code above, root
refers to the name of the Pipeline
object you are exporting. myproject.pipeline
refers to the module containing your Pipeline
, and my_pipeline
refers to the Pipeline
attribute defined in that module.