Foundry offers the ability to push down compute to Databricks when using virtual tables. When using Databricks virtual tables registered to the same source as inputs and outputs to a pipeline, it is possible to fully federate compute to Databricks.
This documentation walks you through the process of authoring a Python transform in Code Repositories that can be executed entirely in your Databricks environment. This capability leverages Databricks Connect. Refer to the official Databricks documentation ↗ for more information on the features and limitations of Databricks Connect.
This example shows how to use a Databricks transform in a Python transform pipeline. Suppose we have the following Spark pipeline using PySpark via @transform
:
Copied!1 2 3 4 5 6 7 8 9 10 11
from pyspark.sql.functions import col from transforms.api import transform, Input, Output @transform( source_table=Input('/Project/folder/input'), output_table=Output('/Project/folder/output'), ) def compute(source_table: TransformInput, output_table: TransformOutput): df = source_table.dataframe() df = df.filter(col('id') > 1) output_table.write_dataframe(df)
To turn this into a Databricks transform, you must:
transforms-tables
and databricks-connect
from the Libraries tab.@databricks
decorator to your transform.For more details, consult the setup section of the documentation below.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from pyspark.sql.functions import col from transforms.api import transform from transforms.tables import ( databricks, DatabricksInput, DatabricksOutput, TableInput, TableOutput, ) @databricks @transform( source_table=TableInput('/Project/folder/input'), output_table=TableOutput( '/Project/folder/output', # Where to register the Databricks output as a virtual table in Foundry "ri.magritte..source.1234", # Your Databricks source connection "CATALOG.SCHEMA.TABLE", # The location to which the transform output will be written in Databricks ), ) def compute(source_table: DatabricksInput, output_table: DatabricksOutput): df = source_table.dataframe() df = df.filter(col('id') > 1) output_table.write_dataframe(df)
Given Databricks Connect uses the pyspark.sql.DataFrame
↗ API, you will see the logic of the code itself is largely unchanged. The primary difference is the transform uses Spark compute running in Databricks.
TableInput
and TableOutput
parameters are used instead of Input
and Output
to reference the input and output Databricks virtual tables of the transform. The transform function is passed DatabricksInput
and DatabricksOutput
parameters that can be used to read from and write to tables in Databricks.
The @databricks
decorator, as shown above, is only compatible with TableInput
and TableOutput
parameters. The tables referenced as inputs and outputs to the transform must be registered on the same Databricks source.
To use compute pushdown with Databricks:
transforms-tables
library.databricks-connect
library ↗ that is compatible with your Databricks cluster version. Refer to the official Databricks documentation ↗ for more information.The databricks-connect
library can be installed using the Libraries tab of your Code Repository environment. Alternatively, you can manually add the library under the pip
requirements block in the conda_recipe/meta.yaml
file. For example, to install version 15.4.6
of databricks-connect
, add:
Copied!1 2 3
requirements: pip: - databricks-connect==15.4.6
The following sections highlight the differences between the Databricks transform API and a regular Python transform. Before using @databricks
, ensure you have performed the setup steps above.
A Databricks transform uses transforms.tables.TableInput
and transforms.tables.TableOutput
parameters to reference the input and output Databricks virtual tables. A TableInput
can reference a virtual table by file path or RID. A TableOutput
requires:
DatabricksTable("<catalog>", "<schema>", "<table>")
or "<catalog>.<schema>.<table>"
syntax. catalog
, schema
and table
correspond to the three-level namespace structure of a table identifier in Unity Catalog. Refer to the official Databricks documentation ↗ for more information.The API of DatabricksInput
and DatabricksOutput
are similar to a regular Python transform. .dataframe()
loads the input table as a pyspark.sql.DataFrame
. .write_dataframe()
writes a pyspark.sql.DataFrame
to the output table. The transform logic itself can be expressed using PySpark transformation.
Not all features of PySpark are supported in Databricks Connect. Refer to the official Databricks documentation ↗ for details on feature availability and limitations.
By default, a Databricks Connect session will be established to the compute cluster configured in the connection details of the source. This is identified by the HTTP path configuration option.
Alternatively, you can use:
@databricks(cluster_id="<cluster-id>")
to configure a connection to a specific compute cluster, or;@databricks(serverless=True)
to configure a connection to serverless compute.For information on compute configuration for Databricks Connect, review the official Databricks documentation ↗.
If your Databricks source is not configured to use a Databricks compute cluster but a SQL warehouse instead, one of the cluster_id
or serverless
options must be specified.