Advanced single-node compute options

The basic principle of the following integrations is that you can access tabular Foundry datasets in multiple formats, such as a pandas DataFrame, Arrow Table, Polars DataFrame, and even as raw Parquet or CSV files. This is also shown in the transforms overview. When saving tables from memory to Foundry, we can pass them in any of the formats in which they have been read.

Using Ibis

Most modern compute engines embrace the notion of disaggregated data systems and thus operate on industry-standard open-source software. The de-facto standard for storing tables in memory is Arrow ↗. To start, we will use Ibis ↗ with a DuckDB backend, which uses the Arrow format internally. In this case, we can read the Foundry dataset as a pyarrow.Table ↗ object through a my_input.arrow() call, transform it, and write back the transformed object into Foundry with my_output.write_table(...). Consider the following example:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import ibis from transforms.api import transform, Output, Input @transform.using(my_input=Input('my-input'), my_output=Output('my-output')) def my_ibis_transform(my_input, my_output): ibis_client = ibis.duckdb.connect(':memory:') # Get the Foundry dataset as a pyarrow.Table object table = ibis_client.read_in_memory(my_input.arrow()) # Execute the data transformation results = ( table .filter(table['name'].like('John%')) .execute() ) # Save the pyarrow.Table object to a Foundry dataset my_output.write_table(results)

Working with Apache DataFusion and DuckDB

Sometimes, it is easier to cut out the data deserialization step and directly pass the raw underlying dataset files to our compute engine. We can get the path to the files on disk, which get downloaded on-demand by calling my_input.path(). When it comes to writing raw files back to Foundry, we have two limitations to keep in mind:

  • Only Parquet files can be stored in Foundry datasets through this API.
  • Files must be placed in the folder located at the value of my_output.path_for_write_table.

When both criteria are met, we can call write_table with the path to this folder, like so: my_output.write_table(my_output.path_for_write_table). To see this in action, consider the following snippet demonstrating how to use DataFusion ↗ in-platform.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import datafusion from datafusion import lit from datafusion.functions import col, starts_with from transforms.api import transform, Output, Input @transform.using(my_input=Input('my-input'), my_output=Output('my-output')) def my_datafusion_transform(my_input, my_output): ctx = datafusion.SessionContext() table = ctx.read_parquet(my_input.path()) my_output.write_table( table .filter(starts_with(col("name"), lit("John"))) .to_arrow_table() )

We can use the same approach for DuckDB ↗ as well, as shown by the following example. Note that we have to ensure that all Parquet files are read from the folder.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import duckdb from transforms.api import transform, Output, Input @transform(my_input=Input('my-input'), my_output=Output('my-output')) def my_duckdb_transform(my_input, my_output): duckdb.connect(database=':memory:').execute(f""" COPY ( SELECT * FROM parquet_scan('{my_input.path()}/**/*.parquet') WHERE Name LIKE 'John%' ) TO '{my_output.path_for_write_table}' (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE) """) # Optimize performance by writing a separate Parquet file per thread in parallel my_output.write_table(my_output.path_for_write_table)

Utilizing cuDF

You can also achieve integration through the use of pandas.DataFrame. The following snippet shows an instance using cuDF ↗ in a lightweight transform. This will essentially run pandas code in a highly parallelized manner on the GPU where possible.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @transform.using( my_input=Input('my-input'), my_output=Output('my-output') ).with_resources( cpu_cores=4, memory_gb=32, gpu_type='NVIDIA_T4' ) def my_cudf_transform(my_input, my_output): import cudf # Only import CUDF at runtime, not during CI df = cudf.from_pandas(my_input.pandas()[['name']]) filtered_df = df[df['name'].str.startswith('John')] sorted_df = filtered_df.sort_values(by='name') my_output.write_table(sorted_df)

The above snippet assumes that your Foundry enrollment is equipped with an NVIDIA T4 GPU and it is available to your project through a resource queue.