Iceberg tables can be used as inputs and outputs in Python transforms using the transforms.tables API, which can be imported in the transforms-tables package.
This page provides code examples for the fundamentals of working with Iceberg table inputs and outputs in Python transforms.
Copied!1 2 3 4 5 6 7 8 9 10 11import polars as pl from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pl.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
Copied!1 2 3 4 5 6 7 8 9 10 11 12from transforms.api import LightweightContext, transform from transforms.tables import IcebergOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(ctx: LightweightContext, output: IcebergOutput): conn = ctx.duckdb().conn query = conn.sql("SELECT * FROM (VALUES ('Hello'), ('World')) AS t(phrase);") query_arrow = query.to_arrow_table() output.write_table(query_arrow)
Copied!1 2 3 4 5 6 7 8 9 10 11import pandas as pd from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pd.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
Copied!1 2 3 4 5 6 7 8 9 10from transforms.api import transform, TransformContext from transforms.tables import TableOutput, TableTransformOutput @transform( output=TableOutput("/.../Output") ) def compute(ctx: TransformContext, output: TableTransformOutput): df_custom = ctx.spark_session.createDataFrame([["Hello"], ["World"]], schema=["phrase"]) output.write_dataframe(df_custom)
Copied!1 2 3 4 5 6 7 8 9 10 11import polars as pl from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.polars())
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15from transforms.api import LightweightContext, transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(ctx: LightweightContext, source_table: IcebergInput, output_table: IcebergOutput): conn = ctx.duckdb().conn reader = source_table.table().scan().to_arrow_batch_reader() conn.register("source_tbl", reader) query = conn.sql("SELECT * FROM source_tbl") query_arrow = query.to_arrow_table() output_table.write_table(query_arrow)
Copied!1 2 3 4 5 6 7 8 9 10 11import pandas as pd from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.pandas())
Copied!1 2 3 4 5 6 7 8 9 10from transforms.api import transform from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput @transform( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: TableTransformInput, output_table: TableTransformOutput): output_table.write_dataframe(source_table.dataframe())
Copied!1 2 3 4 5 6 7 8 9 10 11import polars as pl from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.polars())
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13from transforms.api import LightweightContext, transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(ctx: LightweightContext, source_dataset: LightweightInput, output_table: IcebergOutput): conn = ctx.duckdb().conn query = conn.sql("SELECT * FROM source_dataset") query_arrow = query.to_arrow_table() output_table.write_table(query_arrow)
Copied!1 2 3 4 5 6 7 8 9 10 11import pandas as pd from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.pandas())
Copied!1 2 3 4 5 6 7 8 9 10from transforms.api import transform, Input, TransformInput from transforms.tables import TableOutput, TableTransformOutput @transform( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: TransformInput, output_table: TableTransformOutput): output_table.write_dataframe(source_dataset.dataframe())
Copied!1 2 3 4 5 6 7 8 9 10 11import polars as pl from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.polars())
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14from transforms.api import LightweightContext, transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(ctx: LightweightContext, source_table: IcebergInput, output_dataset: LightweightOutput): conn = ctx.duckdb().conn reader = source_table.table().scan().to_arrow_batch_reader() conn.register("source_tbl", reader) query = conn.sql("SELECT * FROM source_tbl") output_dataset.write_table(query)
Copied!1 2 3 4 5 6 7 8 9 10 11import pandas as pd from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.pandas())
Copied!1 2 3 4 5 6 7 8 9 10from transforms.api import transform, Output, TransformOutput from transforms.tables import TableInput, TableTransformInput @transform( source_table=TableInput("/.../Input"), output=Output("/.../Output") ) def compute(source_table: TableTransformInput, output: TransformOutput): output.write_dataframe(source_table.dataframe())
The examples above use Foundry's streamlined shortcuts on IcebergInput and IcebergOutput, which read and write the full current snapshot of a table. These streamlined APIs offer concise syntax for most standard pipelines. However, they materialize the entire table into memory and do not provide access to some underlying functionality, such as predicate pushdown, column projection, or snapshot selection.
To access these finer-grained controls, you can use .table() on your IcebergInput to return the underlying PyIceberg ↗ table and expose the full PyIceberg scan API.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22from pyiceberg.expressions import And, EqualTo, GreaterThan from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output"), ) def compute(source_table: IcebergInput, output_table: IcebergOutput): iceberg_table = source_table.table() scan = iceberg_table.scan( row_filter=And( EqualTo("region", "EMEA"), GreaterThan("score", 0.5), ), selected_fields=("customer_id", "score", "region"), limit=10_000, ) output_table.write_table(scan.to_polars())