Each Python transforms sub-project in a repository exposes a single transforms.api.Pipeline object. This Pipeline object is used to do the following:
@transform logic responsible for building a given dataset during a Foundry build.In most cases, the default repository setup will register transforms automatically, and the Pipeline object will not require special configuration.
When a transform that is associated with your project’s pipeline declares a dataset as an Output, you can build this dataset in Foundry. The two recommended ways to add transforms to a Pipeline object are automatic registration and manual registration.
If you have a more advanced workflow and/or want to explicitly add each transform to your project’s pipeline, you can use manual registration. Otherwise, it is highly recommended that you use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the discover_transforms method recursively discovers any transforms defined at the module level. Refer to the sections below for more information.
The discover_transforms method imports every module it finds. As a result, any code in your imported modules will be executed.
As the complexity of a project grows, manually adding transforms to a Pipeline object can become unwieldy. To remedy this, the Pipeline object provides the discover_transforms() method to recursively discover all transforms in a Python module or package.
Copied!1 2 3 4 5 6from transforms.api import Pipeline import my_module # This is where your transform definition lives my_pipeline = Pipeline() my_pipeline.discover_transforms(my_module)
Transforms can be manually added to a Pipeline object using the add_transforms() function. This function takes any number of transforms and adds them to the pipeline. It also checks whether any two transforms declare the same output dataset.
Copied!1 2 3 4 5 6 7 8 9 10 11 12from transforms.api import transform, Pipeline, Input, Output @transform.using( my_output=Output('/path/to/output/dataset'), my_input=Input('/path/to/input/dataset') ) def my_compute_function(my_output, my_input): my_output.write_table(my_input.polars()) my_pipeline = Pipeline() my_pipeline.add_transforms(my_compute_function)
If you want to define a data transformation that creates multiple outputs, you can either use transform generation or define a multiple-output transform. With transform generation, it may be necessary to read and process the same input once for each output. With a multiple-output transform, it is possible to read and process the input just once. For more information, review the documentation on optimizing multi-output transforms.
You may want to re-use the same data transformation logic across multiple transforms. For instance, consider the following scenarios:
In both cases, it would be useful to use the same data transformation code across multiple transforms. Instead of separately defining a transform object for each of your outputs, you can generate transform objects using a for-loop and register them in bulk to your project’s pipeline.
Below is an example of a transform generator:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22import polars as pl from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # To capture the source variable in the function, pass it as a keyword argument with a default value. df = my_input.polars() filtered = df.filter(pl.col('source') == source) output.write_table(filtered) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # To capture the source variable in the function, pass it as a keyword argument with a default value. df = my_input.pandas() filtered = df[df['source'] == source] output.write_table(filtered) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
If using manual registration, you can then add the generated transforms to the pipeline. If you are unfamiliar with the * syntax, refer to the Python documentation ↗.
Copied!1 2 3 4import my_module my_pipeline = Pipeline() my_pipeline.add_transforms(*my_module.TRANSFORMS)
Read the following considerations carefully for information on how to avoid errors and failures.
source keyword argument with a default value in your compute function.The default entry point and Pipeline object setup is sufficient for most use cases. Configuring the entry point is only recommended for multi-pipeline repositories or differing directory structures.
The runtime responsible for executing a Python transformation needs to be able to find the project’s Pipeline object. To export a Pipeline object, add it to the entry_points argument in the setup.py file in a Python transforms sub-project. For more information about entry points, refer to the setuptools library documentation ↗.
By default, it is required that each Python sub-project exports a transforms.pipelines entry point named root. This entry point references the module name and the Pipeline attribute.
For example, if you have a Pipeline called “my_pipeline” defined in myproject/pipeline.py as show below:
Copied!1 2 3 4from transforms.api import Pipeline my_pipeline = Pipeline()
You can register this Pipeline in setup.py as follows:
Copied!1 2 3 4 5 6 7 8 9 10from setuptools import find_packages, setup setup( entry_points={ 'transforms.pipelines': [ 'root = myproject.pipeline:my_pipeline' ] } )
In the code above, root refers to the name of the Pipeline object you are exporting. myproject.pipeline refers to the module containing your Pipeline, and my_pipeline refers to the Pipeline attribute defined in that module.