In Python, transforms.api.Transform
is a description of how to compute a dataset. It describes the following:
configure()
decorator (this includes custom Transforms profiles to use at runtime)The input and output datasets, as well as the transformation code, are wrapped up in a Transform
object and then registered to a Pipeline
. You should not construct a Transform
object by hand. Instead, you should use one of the decorators described below.
It’s important to keep in mind that data transformations can be expressed in terms of pyspark.sql.DataFrame
↗ objects as well as files. For transformations that rely on DataFrame
↗ objects, you can either use the transform decorator and explicitly call a method to access a DataFrame
↗ containing your input dataset or you can simply use the DataFrame transform decorator). For transformations that rely on files, you can use the transform decorator and then access files within your datasets. If your data transformation is going to be exclusively using the Pandas library, you can use the Pandas transform decorator.
You can define multiple Transform
objects in a single Python file. Also, all transformations currently run with transaction type SNAPSHOT.
The transforms.api.transform()
decorator can be used if you’re writing data transformations that depend on DataFrame
↗ objects or files. This decorator accepts as keyword arguments a number of transforms.api.Input
and transforms.api.Output
specifications. During a Foundry build, these specifications are resolved into transforms.api.TransformInput
and transforms.api.TransformOutput
objects, respectively. These TransformInput
and TransformOutput
objects provide access to the dataset within the compute function.
The keyword names used for the inputs and outputs must correspond to the parameter names of the wrapped compute function.
Let’s step through a simple example for creating a Transform
object using the transform()
decorator. We will use a small sample dataset called /examples/students_hair_eye_color
. Here is a preview of the dataset:
>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows
Now, we can define a Transform
that takes /examples/students_hair_eye_color
as its input and creates /examples/hair_eye_color_processed
as its output:
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') processed.write_dataframe(filtered_df)
Note that the input name of “hair_eye_color” and the output name of “processed” are used as the parameter names in the filter_hair_color
compute function.
Furthermore, filter_hair_color
reads a DataFrame
↗ from the TransformInput
using the dataframe()
method. The DataFrame
is then filtered using filter()
↗, which is a regular PySpark function. This filtered DataFrame
↗ is then written to the output named “processed” using the write_dataframe()
method.
The DataFrame
↗ objects returned by a TransformInput
are regular PySpark DataFrames. For more information about working with PySpark, you can refer to the Spark Python API documentation ↗ available online.
If your data transformation relies on access to files, rather than DataFrame
↗ objects, refer to the section on file access.
Transforms with multiple outputs are useful when a single input dataset needs to be broken into several parts. Multiple-output Transforms are only supported with the transforms.api.transform()
decorator.
Recall the /examples/students_hair_eye_color
dataset:
>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows
We can now pass multiple Output
specifications to the transform()
decorator in order to split the input:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def brown_hair_by_sex(hair_eye_color, males, females): # type: (TransformInput, TransformOutput, TransformOutput) -> None brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male')) females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female'))
Notice that we only had to filter down to brown hair once, after which we could share the filtered dataset to generate multiple output datasets.
It’s common for data transformations in Python to read, process, and write DataFrame
↗ objects. If your data transformation depends on DataFrame
↗ objects, you can use the transforms.api.transform_df()
decorator. This decorator injects DataFrame
↗ objects and expects the compute function to return a DataFrame
↗.
Alternatively, you can use the more general transform()
decorator and explicitly call the dataframe()
method to access a DataFrame
↗ containing your input dataset. Recall that the transform()
decorator injects the more powerful transforms.api.TransformInput
and transforms.api.TransformOutput
objects, rather than DataFrame
↗ objects.
The transform_df()
decorator accepts as keyword arguments a number of transforms.api.Input
specifications, and it accepts as a positional argument a single transforms.api.Output
specification. As required by Python, the positional Output
argument must appear first, followed by the keyword Input
argument.
Let’s step through a simple example for creating a Transform
object using the transform_df()
decorator. We will use the small sample dataset from above called /examples/students_hair_eye_color
. Here is a preview of the dataset:
>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows
Now, we will modify the example from the Transform decorator section above to use the transform_df()
decorator. We define a Transform
that takes /examples/students_hair_eye_color
as its input and creates /examples/hair_eye_color_processed
as its output:
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform_df, Input, Output @transform_df( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pyspark.sql.DataFrame) -> pyspark.sql.DataFrame return hair_eye_color.filter(hair_eye_color.hair == 'Brown')
Note that the input name of “hair_eye_color” is used as the parameter name in the filter_hair_color
compute function. Furthermore, since Python requires positional arguments to come before keyword arguments, the Output
argument appears before any Input
arguments.
The transform_pandas
decorator should only be used on datasets that can fit into memory. Otherwise, you should write your data transformation using the transform_df
decorator and filter your input datasets before converting them to Pandas DataFrames using the toPandas
↗ method.
We recommend using the toPandas
↗ method with PyArrow added as a dependency in your meta.yaml
. This enables Pandas DataFrame conversion optimization with Arrow ↗.
If your data transformation depends exclusively on the Pandas library, you can use the transforms.api.transform_pandas()
decorator. To use the Pandas library, you must add pandas
as a run dependency in your meta.yml
file . For more information, refer to the section describing the meta.yml file.
The transform_pandas()
decorator is similar to the transform_df()
decorator, but transform_pandas()
converts the input datasets into pandas.DataFrame
↗ objects and accepts a return type of pandas.DataFrame
↗.
The transform_pandas()
decorator accepts as keyword arguments a number of transforms.api.Input
specifications, and it accepts as a positional argument a single transforms.api.Output
specification. As required by Python, the positional Output
argument must appear first, followed by the keyword Input
argument.
Let’s step through a simple example for creating a Transform
object using the transform_pandas()
decorator. We will use the same sample dataset from above called /examples/students_hair_eye_color
. Here is a preview of the dataset:
>>> students_input = foundry.input('/examples/students_hair_eye_color')
>>> students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
only showing top 3 rows
Now, we can define a Transform
that takes /examples/students_hair_eye_color
as its input and creates /examples/hair_eye_color_processed
as its output:
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform_pandas, Input, Output @transform_pandas( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): # type: (pandas.DataFrame) -> pandas.DataFrame return hair_eye_color[hair_eye_color['hair'] == 'Brown']
Note that the input name of “hair_eye_color” is used as the parameter name in the filter_hair_color
compute function. Furthermore, since Python requires positional arguments to come before keyword arguments, the Output
argument appears before any Input
arguments.
This example creates a Transform
from a compute function that accepts and returns pandas.DataFrame
↗, rather than pyspark.sql.DataFrame
↗ objects like in the example in the DataFrame Transform decorator section above. Note that you can convert Pandas DataFrames to PySpark DataFrames using the createDataFrame()
↗ method—call this method on the spark_session
attribute of your Transform context.
There may be cases when a data transformation depends on things other than its input datasets. For instance, a transformation may be required to access the current Spark session or to contact an external service. In such cases, you can inject a transforms.api.TransformContext
object into the transformation.
To inject a TransformContext
object, your compute function must accept a parameter called ctx
. Note that this also means that no inputs or outputs may be named ctx
. For instance, you can use a TransformContext
object if you want to create a DataFrame
↗ from Python data structures:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, Output @transform( out=Output('/examples/context') ) def generate_dataframe(ctx, out): # type: (TransformContext) -> pyspark.sql.DataFrame df = ctx.spark_session.createDataFrame([ ['a', 1], ['b', 2], ['c', 3] ], schema=['letter', 'number']) out.write_dataframe(df)
For TLLV to function correctly, your code must declare all imports at the module level and should not attempt to patch or otherwise modify objects in another module. If this is not the case in your project, you must disable TLLV. Refer to the code example below for more information.
TLLV is enabled by default. To disable TLLV set tllv in transformsPython configuration to false. This configuration is inside the build.gradle
file in your Transforms Python subproject.
transformsPython {
tllv false
}
Transform’s version is a string that is used to compare two versions of a transform when considering logic staleness. Transform’s output is up-to-date if its inputs are unchanged and if the transform’s version is unchanged. If the version changes, the transform’s output will be invalidated and recomputed.
By default, transform’s version includes the following:
If any of these change, the version string will be changed. If you want to invalidate outputs if change happens in file not covered by listed parts, set tllvFiles in transformsPython configuration. Example usecase is if you’re reading configuration for a file and you want to invalidate outputs when configuration is changed.
transformsPython {
tllvFiles = [
'path/to/file/you/want/to/include/relative/to/project/directory'
]
}
If you want to avoid invalidating outputs when any project dependency version is changed set tllvIncludeDeps to false.
transformsPython {
tllvIncludeDeps false
}
Consider the following code example of valid and invalid imports:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# If you're only importing at the module top, you have nothing to worry about. from transforms.api import transform_df, Input, Output from myproject.datasets import utils from myproject.testing import test_mock as tmock import importlib # Using `__import__` of `importlib` is fine as long as it happens in module scope. logging = __import__('logging') my_compute = importlib.import_module('myproject.compute') def helper(x): # This is invalid, you must disable TLLV if you import in functions or class methods. # All imports must be in module scope. import myproject.helpers as myhelp return myhelp.round(x) @transform_df( Output("/path/to/output/dataset"), my_input=Input("/path/to/input/dataset"), ) def my_compute_function(my_input): # This is invalid, you must disable TLLV if you want to use any way of importing in functions! ihelper = __import__('myproject.init_helper') my_functions = importlib.import_module('myproject.functions') return my_input
You must disable TLLV if you’re using extension modules ↗.
Each Transforms Python subproject within a repository exposes a single transforms.api.Pipeline
object. This Pipeline
object is used to:
transforms.api.Transform
object responsible for building a given dataset during a Foundry build.The runtime responsible for executing a Python transformation needs to be able to find the project’s Pipeline
. To export a Pipeline
, you add it to the entry_points
argument in the setup.py
file in a Transforms Python subproject. For more information about entry points, you can refer to the setuptools library documentation ↗.
By default, it’s required that each Python subproject exports a transforms.pipelines
entry point named root
. Recall that an entry point is defined in the setup.py
file located in the root directory of a Python subproject. The entry point references the module name as well as the Pipeline
attribute.
For instance, say you have a Pipeline
called “my_pipeline” defined in myproject/pipeline.py
:
Copied!1 2 3
from transforms.api import Pipeline my_pipeline = Pipeline()
You can register this Pipeline
in setup.py
by doing the following:
Copied!1 2 3 4 5 6 7 8 9 10
import os from setuptools import find_packages, setup setup( entry_points={ 'transforms.pipelines': [ 'root = myproject.pipeline:my_pipeline' ] } )
In the code above, root
refers to the name of the Pipeline
you’re exporting, myproject.pipeline
refers to the module containing your Pipeline
, and my_pipeline
refers to the Pipeline
attribute defined in that module.
Once a Transform
object associated with your project’s Pipeline declares a dataset as an Output
you can build this dataset in Foundry. The two recommended ways to add Transform
objects to a Pipeline
are manual registration and automatic registration.
If you have a more advanced workflow and/or want to explicitly add each Transform
object to your project’s Pipeline, you can use manual registration. Otherwise, it’s highly recommended to use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the discover_transforms
method recursively discovers any Transform
objects defined at the module-level. Refer to the sections below for more information.
The discover_transforms
method imports every module it finds. As a result, any code within your imported modules will be executed.
As the complexity of a project grows, manually adding Transform
objects to a Pipeline
can become unwieldy. Thus, the Pipeline
object provides the discover_transforms()
method to recursively discover all Transform
objects within a Python module or package.
Copied!1 2 3 4 5
from transforms.api import Pipeline import my_module my_pipeline = Pipeline() my_pipeline.discover_transforms(my_module)
Transform
objects can be manually added to a Pipeline
using the add_transforms()
function. This function takes any number of Transform
objects and adds them to the Pipeline. It also checks that no two Transform
objects declare the same output dataset.
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform_df, Pipeline, Input, Output @transform_df( Output('/path/to/output/dataset'), my_input=Input('/path/to/input/dataset') ) def my_compute_function(my_input): return my_input my_pipeline = Pipeline() my_pipeline.add_transforms(my_compute_function)
If you want to define a data transformation that creates multiple outputs, you can either use Transform generation or define a multiple-output Transform. With Transform generation, it may be necessary to read and process the same input once for every output. With a multiple-output Transform, it is possible to read and process the input just once.
You may want to re-use the same data transformation logic across multiple Transform objects. For instance, consider the following scenarios:
In both cases, it would be useful to use the same data transformation code across multiple Transforms. Instead of separately defining a Transform object for each of your outputs, you can generate Transform objects using a for-loop and then register them in bulk to your project’s Pipeline. Here is an example for generating Transforms:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from transforms.api import transform_df, Input, Output def transform_generator(sources): # type: (List[str]) -> List[transforms.api.Transform] transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform_df( Output('/sources/{source}/output'.format(source=source)), my_input=Input('/sources/{source}/input'.format(source=source)) ) def compute_function(my_input, source=source): # To capture the source variable in the function, you pass it as a defaulted keyword argument. return my_input.filter(my_input.source == source) transforms.append(compute_function) return transforms TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
To capture the source variable in the function, you must pass it as with the defaulted keyword argument source
in your compute function.
When using a loop to generate Transforms, the loop for generating your Transform objects must be within a function, since Python for-loops don’t create new scopes. If a function is not used, automatic registration will mistakenly only discover the final Transform object defined in your for-loop. This function should return a list of the generated Transforms objects and the return value should be set equal to a variable. Following these criteria within a module that is set up to be discovered via automatic registration will allow you to use automatic registration with generated Transforms. Alternatively, you can use manual registration.
If the list of input datasets changes between builds (for example, if the list of input datasets is read from a file that is modified between builds), the build will fail because the new dataset references will not be found in the job specification for the build. If this could be a problem for you, consider using Logic Flow instead.
Dynamic input or output naming is not possible in Transforms. When the CI job runs, all the relations between inputs and outputs are determined including the links between unique identifiers and dataset names. Output datasets that do not exist are created, and a JobSpec is added to them.
Whenever a dataset is built, the reference to the repository, source file, and the entry point of the function that creates the dataset is obtained from the JobSpec. Following this, the build process is initiated and your function is called to generate the final result. Therefore, if there are changes in your inputs or outputs and the build process is launched, it will lead to an error because the JobSpecs are no longer valid. This disrupts the connection between the unique identifier and the dataset name.
If using manual registration, you can then add the generated transforms into the pipeline. If you are unfamiliar with the *
syntax, refer to this tutorial.
Copied!1 2 3 4
import my_module my_pipeline = Pipeline() my_pipeline.add_transforms(*my_module.TRANSFORMS)
Note that the Build button in Code Repositories may not work for manual registration and will present a No transforms discovered in the pipeline from the requested file error. You can still build these datasets with the Data Lineage or the Dataset Preview applications.