DuckDB API

Foundry offers support for streamed, lazy execution using DuckDB, similar to the Polars lazy API.

To access a preconfigured DuckDB connection in your transform, use the Context object as shown below:

Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Input, Output @transform.using( output=Output("/Users/jsmith/output"), my_input=Input("/Users/jsmith/input"), ) def compute(ctx, output, my_input): conn = ctx.duckdb().conn query = conn.sql("""SELECT * FROM my_input""") output.write_table(query)

The DuckDB connection comes preconfigured with all input datasets registered as unmaterialized DuckDB tables, using the input names as table names. This allows easy access to input data without manual configuration.

When to use DuckDB

DuckDB is a highly performant, efficient query engine which will often outperform other single-node compute engines like Pandas or Polars for many workloads. Consider using DuckDB for:

  • Single-node transforms that operate on very large scale data.
  • Transforms that need strict control over memory usage.

However, DuckDB does not yet support a Python DataFrame API like Polars or Pandas, instead requiring raw SQL queries for data manipulation.

Incremental workflows

Similar to other compute engines, the DuckDB API lets you configure incremental read modes on inputs.

Like with other compute engines, this defaults to added when running incrementally, and current when running non-incrementally.

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, Input, Output, incremental @incremental() @transform.using( output=Output("/Users/jsmith/output"), my_input=Input("/Users/jsmith/input"), ) def compute(ctx, output, my_input): ddb = ctx.duckdb(read_modes={my_input: "current"}) result = ddb.conn.sql("""SELECT * FROM my_input""") output.write_table(result)

Reading from a .sql file

Some DuckDB workflows may involve complex SQL queries that are better managed in separate .sql files. You can reference such files using the sql_from_file method on the DuckDB connection. The paths are relative to the file containing the call to sql_from_file.

Copied!
1 2 # -- query.sql SELECT * FROM my_input WHERE column_a > 100;
Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, Input, Output, incremental @incremental() @transform.using( output=Output("/Users/jsmith/output"), my_input=Input("/Users/jsmith/input"), ) def compute(ctx, output, my_input): conn = ctx.duckdb().conn query = conn.sql_from_file("query.sql") output.write_table(query)

DuckDB configuration

Unlike many other single-node compute engines, DuckDB supports resource configuration to control memory usage and parallelism, which allows fine-grained optimization for different workloads. This is especially important for memory-constrained contexts, where DuckDB can self-limit its memory consumption to avoid out-of-memory errors, at the cost of performance.

You can set these options when initializing the DuckDB connection via the duckdb method on the Context object. A full list of configuration options can be found in the official documentation ↗.

Copied!
1 2 3 conn = ctx.duckdb().conn conn.execute("SET memory_limit='2GB';") conn.execute("SET threads=4;")

Advanced use cases

Some DuckDB workflows require access to lower level APIs than simple references to input datasets. Examples include

For these use cases, you can read your dataset's raw Parquet or CSV fields with a preconfigured DuckDB UDF called <input_name>_files(). This function returns a list of file paths for the underlying data files of the input dataset, which can then be passed to DuckDB's read functions with custom parameters.

You can then copy your output to an intermediate location on disk, or directly stream the output back to Foundry using DuckDB's COPY TO command.

To infer a schema from files that have been manually uploaded with COPY TO without calling write_table, you can use the put_metadata() method on the Output object.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from transforms.api import transform, Input, Output @transform.using( output=Output("/Users/jsmith/output"), my_input=Input("/Users/jsmith/input"), ) def compute(ctx, output, my_input): conn = ctx.duckdb().conn # Write to disk, then upload dataset from disk query = conn.sql(""" COPY ( SELECT * FROM read_parquet(my_input_files(), file_row_number=True) WHERE my_column = 'abc' LIMIT 10 ) TO '{output_dataset.dataset.path_for_write_table}' (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE) """) output.write_table(my_output.path_for_write_table) # Directly stream inputs and outputs conn.execute(""" COPY ( SELECT * FROM read_parquet(my_input_files(), file_row_number=True) WHERE my_column = 'abc' LIMIT 10 ) TO '{output_dataset.dataset.path_for_lazy_write_table}' (FORMAT PARQUET, PARTITION_BY (date), WRITE_PARTITION_COLUMNS true, FILENAME_PATTERN 'file_{uuid}') """ ) output.put_metadata()

Incremental outputs

Users have direct control over the outputs to their datasets using COPY TO syntax, including the file names of the output files. For incremental workflows, users should ensure that sequential writes do not share file names with previous transactions to avoid conflicts and overwrites between files. DuckDB provides the ability to provide a filename pattern ↗ for outputs, and users are encouraged to append a UUID to their filenames, per the example above, to ensure uniqueness.

Partitioned outputs

Note that DuckDB's partitioning behavior differs from defaults in common libraries like Polars or Spark. When writing partitioned datasets, you are strongly encouraged to set WRITE_PARTITION_COLUMNS to true, as in the example above, to ensure compatibility with other transforms. You should also note that DuckDB uses the string null in Hive filepaths for missing values, instead of the Hive standard of __HIVE_DEFAULT_PARTITION__, and take special care to ensure that downstream transforms parse these partitions properly, generally by infilling nulls on the partition column before writing outputs with DuckDB.

Eager download

By default, Foundry's DuckDB integration uses lazy streamed downloading of input datasets to optimize performance and resource usage. However, in some scenarios, users may want to eagerly download all input data to disk at the start of the transform. You can pull to disk and query inputs by calling path() on the input:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import duckdb from transforms.api import transform, Output, Input @transform.using(my_input=Input('my-input'), my_output=Output('my-output')) def my_duckdb_transform(my_input, my_output): duckdb.connect(database=':memory:').execute(f""" COPY ( SELECT * FROM parquet_scan('{my_input.path()}/**/*.parquet') WHERE Name LIKE 'John%' ) TO '{my_output.path_for_write_table}' (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE) """) # Optimize performance by writing a separate Parquet file per thread in parallel my_output.write_table(my_output.path_for_write_table)