Foundry Transforms
are the core building blocks of data processing pipelines. They define how data flows from inputs to outputs, encapsulating both the computational logic and resource requirements in a single, declarative unit.
Transforms are registered in Pipelines
that can be made up of multiple transform definitions.
Every transform consists of three key components:
Below is the basic pattern used to define transforms:
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform, Input, Output @transform.using( # decorator hair_eye_color=Input('/examples/students_hair_eye_color'), # inputs processed=Output('/examples/hair_eye_color_processed') # outputs ) def filter_hair_color(hair_eye_color, processed): # function signature matches input/output ... # compute logic
By default, you can load the data as either a Polars ↗ or pandas ↗ DataFrame. Polars is a DataFrame library for transforming tabular data. It is known for its performance, stability, and ease of use. Pandas is a widely adopted and easy to use data analysis tool.
To retrieve a DataFrame, call the appropriate method on your input as shown below:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import polars as pl from transforms.api import transform, Input, Output @transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # Load data as Polars DataFrame df = hair_eye_color.polars() # Apply filtering logic filtered = df.filter(pl.col('hair') == 'Brown') # Write the result processed.write_table(filtered)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from transforms.api import transform, Input, Output @transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # Load data as Pandas DataFrame df = hair_eye_color.pandas() # Apply filtering logic filtered = df[df['hair'] == 'Brown'] # Write the result processed.write_table(filtered)
The example below demonstrates that transforms can take multiple inputs and outputs.
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, Input, Output @transform.using( hair_color=Input('/examples/students_hair_color'), eye_color=Input('/examples/students_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def filter_hair_color(hair_color, eye_color, males, females): ...
The lightweight transforms API supports loading data in a variety of formats, as demonstrated in the example below:
Copied!1 2 3 4 5 6 7
@transform.using(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input')) def compute(output, dataset): polars_df = dataset.polars() # polars_df is a polars.DataFrame object lazy_df = dataset.polars(lazy=True) # Activate streaming mode, lazy_df is a polars.LazyFrame pandas_df = dataset.pandas() # pandas_df is a pandas.DataFrame arrow_table = dataset.arrow() # arrow_table is a pyarrow.Table output.write_table(lazy_df) # Any of the formats above can be passed to write_table
Note that calling dataset.pandas()
expects pandas to be installed in your environment. Likewise, dataset.polars(...)
requires Polars to be available.
You can control the compute resources allocated to your transforms using the .with_resources()
method.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, Input, Output @transform.using( large_dataset=Input('/datasets/large_file'), processed=Output('/datasets/processed') ).with_resources( cpu_cores=8, # Use 8 CPU cores memory_gb=16, # Allocate 16GB memory ) def process_large_dataset(large_dataset, processed): ...
cpu_cores
: The number of CPU cores. This can be fractional, for example, 0.5
, defaults to 2
.memory_mb
or memory_gb
: Memory allocation; only use one, not both.There may be cases when a data transformation depends on things other than its input datasets. For instance, a transformation may be required to access the current Spark session or to contact an external service. In such cases, you can inject a TransformContext
object into the transformation.
To inject a TransformContext
object, your compute function must accept a parameter called ctx
. Note that this also means that no inputs or outputs may be named ctx
. Refer to the API reference ↗ for the full set of TransformContext
capabilities.
Copied!1 2 3 4 5 6 7 8
from transforms.api import transform, Output @transform.using( out=Output('/examples/context') ) def generate_dataframe(ctx, out): ...
By default, transforms run on a single compute node and leverage data processing libraries like pandas or Polars. For big data compute workflows, PySpark ↗ can be leveraged in Python transforms.
PySpark is a wrapper language that allows you to interface with an Apache Spark backend to quickly process data. Spark can operate on very large datasets across a distributed network of servers, which provides scaling and reliability benefits when used correctly. However, it also comes with higher overhead and resource usage, making it a poor choice for small to medium scale data.
While a high-level overview of the PySpark transforms APIs are given here, refer to the Python transforms (Spark) documentation for more details.
PySpark transforms are defined with similar syntax to standard transforms.
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): ... # transform logic goes here
Similarly to Polars or pandas transforms, a DataFrame object can be retrieved from the transform input.
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown') processed.write_dataframe(filtered_df)
The DataFrame
↗ objects returned by transform inputs are PySpark DataFrames. For more information about working with PySpark, refer to the Spark Python API documentation ↗.
PySpark transforms can also take multiple inputs and outputs:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, Input, Output @transform( hair_color=Input('/examples/students_hair_color'), eye_color=Input('/examples/students_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def filter_hair_color(hair_color, eye_color, males, females): ...
Since PySpark transforms only support one data processing library, the transform_df
decorator can be used to directly access the DataFrame
↗ for convenience.
Copied!1 2 3 4 5 6 7 8 9
from transforms.api import transform_df, Input, Output @transform_df( Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color') ) def filter_hair_color(hair_eye_color): return hair_eye_color.filter(hair_eye_color.hair == 'Brown')
In PySpark transforms, resources are configured with spark profiles. These allow strict access controls for large scale compute usage.
For TLLV to function correctly, your code must declare all imports at the module level and should not attempt to patch or otherwise modify objects in another module. If this is not the case in your project, you must disable TLLV. Refer to the code example below for more information.
TLLV is enabled by default. To disable TLLV set tllv in transformsPython configuration to false. This configuration is inside the build.gradle
file in your Python transforms subproject.
transformsPython {
tllv false
}
A transform’s version is a string that is used to compare two versions of a transform when considering logic staleness. A transform’s output is up to date if its inputs are unchanged and if the transform’s version is unchanged. If the version changes, the transform’s output will be invalidated and recomputed.
By default, a transform’s version includes the following:
If any of these change, the version string will be changed.
If you want to invalidate outputs if a change happens in file that is not covered by listed parts, set tllvFiles
in the transformsPython
configuration. An example use case is if you are reading a file's configuration and want to invalidate outputs when the configuration is changed.