Python transform basics

Iceberg tables can be used as inputs and outputs in Python transforms using the transforms.tables API, which can be imported in the transforms-tables package.

This page provides code examples for the fundamentals of working with Iceberg table inputs and outputs in Python transforms.

Example: Generate a simple Iceberg table

Copied!
1 2 3 4 5 6 7 8 9 10 11 import polars as pl from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pl.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 from transforms.api import LightweightContext, transform from transforms.tables import IcebergOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(ctx: LightweightContext, output: IcebergOutput): conn = ctx.duckdb().conn query = conn.sql("SELECT * FROM (VALUES ('Hello'), ('World')) AS t(phrase);") query_arrow = query.to_arrow_table() output.write_table(query_arrow)
Copied!
1 2 3 4 5 6 7 8 9 10 11 import pandas as pd from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pd.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, TransformContext from transforms.tables import TableOutput, TableTransformOutput @transform( output=TableOutput("/.../Output") ) def compute(ctx: TransformContext, output: TableTransformOutput): df_custom = ctx.spark_session.createDataFrame([["Hello"], ["World"]], schema=["phrase"]) output.write_dataframe(df_custom)

Example: Iceberg table output, Iceberg table input

Copied!
1 2 3 4 5 6 7 8 9 10 11 import polars as pl from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.polars())
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from transforms.api import LightweightContext, transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(ctx: LightweightContext, source_table: IcebergInput, output_table: IcebergOutput): conn = ctx.duckdb().conn reader = source_table.table().scan().to_arrow_batch_reader() conn.register("source_tbl", reader) query = conn.sql("SELECT * FROM source_tbl") query_arrow = query.to_arrow_table() output_table.write_table(query_arrow)
Copied!
1 2 3 4 5 6 7 8 9 10 11 import pandas as pd from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.pandas())
Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput @transform( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: TableTransformInput, output_table: TableTransformOutput): output_table.write_dataframe(source_table.dataframe())

Example: Iceberg table output, dataset input

Copied!
1 2 3 4 5 6 7 8 9 10 11 import polars as pl from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.polars())
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 from transforms.api import LightweightContext, transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(ctx: LightweightContext, source_dataset: LightweightInput, output_table: IcebergOutput): conn = ctx.duckdb().conn query = conn.sql("SELECT * FROM source_dataset") query_arrow = query.to_arrow_table() output_table.write_table(query_arrow)
Copied!
1 2 3 4 5 6 7 8 9 10 11 import pandas as pd from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.pandas())
Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Input, TransformInput from transforms.tables import TableOutput, TableTransformOutput @transform( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: TransformInput, output_table: TableTransformOutput): output_table.write_dataframe(source_dataset.dataframe())

Example: Dataset output, Iceberg table input

Copied!
1 2 3 4 5 6 7 8 9 10 11 import polars as pl from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.polars())
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from transforms.api import LightweightContext, transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(ctx: LightweightContext, source_table: IcebergInput, output_dataset: LightweightOutput): conn = ctx.duckdb().conn reader = source_table.table().scan().to_arrow_batch_reader() conn.register("source_tbl", reader) query = conn.sql("SELECT * FROM source_tbl") output_dataset.write_table(query)
Copied!
1 2 3 4 5 6 7 8 9 10 11 import pandas as pd from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.pandas())
Copied!
1 2 3 4 5 6 7 8 9 10 from transforms.api import transform, Output, TransformOutput from transforms.tables import TableInput, TableTransformInput @transform( source_table=TableInput("/.../Input"), output=Output("/.../Output") ) def compute(source_table: TableTransformInput, output: TransformOutput): output.write_dataframe(source_table.dataframe())

Advanced: PyIceberg native scans

The examples above use Foundry's streamlined shortcuts on IcebergInput and IcebergOutput, which read and write the full current snapshot of a table. These streamlined APIs offer concise syntax for most standard pipelines. However, they materialize the entire table into memory and do not provide access to some underlying functionality, such as predicate pushdown, column projection, or snapshot selection.

To access these finer-grained controls, you can use .table() on your IcebergInput to return the underlying PyIceberg ↗ table and expose the full PyIceberg scan API.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from pyiceberg.expressions import And, EqualTo, GreaterThan from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output"), ) def compute(source_table: IcebergInput, output_table: IcebergOutput): iceberg_table = source_table.table() scan = iceberg_table.scan( row_filter=And( EqualTo("region", "EMEA"), GreaterThan("score", 0.5), ), selected_fields=("customer_id", "score", "region"), limit=10_000, ) output_table.write_table(scan.to_polars())