Iceberg tables can be used as inputs and outputs in Python transforms using the transforms.tables
API, which can be imported in the transforms-tables
package.
This page provides code examples for the fundamentals of working with Iceberg table inputs and outputs in Python transforms.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
import polars as pl from transforms.api import transform from transforms.tables import TableLightweightOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(output: TableLightweightOutput): df_custom = pl.DataFrame({"phrase": ["Hello", "World"]}) tbl_arrow = df_custom.to_arrow() output.iceberg().write(tbl_arrow)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
import pandas as pd import pyarrow as pa from transforms.api import transform from transforms.tables import TableLightweightOutput, TableOutput @transform.using( output=TableOutput("/.../Output") ) def compute(output: TableLightweightOutput): df_custom = pd.DataFrame({"phrase": ["Hello", "World"]}) tbl_arrow = pa.Table.from_pandas(df_custom) output.iceberg().write(table_arrow)
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform, TransformContext from transforms.tables import TableOutput, TableTransformOutput @transform( output=TableOutput("/.../Output") ) def compute(ctx: TransformContext, output: TableTransformOutput): df_custom = ctx.spark_session.createDataFrame([["Hello"], ["World"]], schema=["phrase"]) output.write_dataframe(df_custom)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
import polars as pl from transforms.api import transform from transforms.tables import TableLightweightOutput, TableOutput, TableInput, TableLightweightInput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: TableLightweightInput, output_table: TableLightweightOutput): polars_df = source_table.iceberg().table().scan().to_polars() table_arrow = polars_df.to_arrow() output_table.iceberg().write(table_arrow)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
import pandas as pd import pyarrow as pa from transforms.api import transform from transforms.tables import TableLightweightOutput, TableOutput, TableInput, TableLightweightInput @transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: TableLightweightInput, output_table: TableLightweightOutput): pandas_df = source_table.iceberg().table().scan().to_pandas() table_arrow = pa.Table.from_pandas(pandas_df) output_table.iceberg().write(table_arrow)
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput @transform( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: TableTransformInput, output_table: TableTransformOutput): output_table.write_dataframe(source_table.dataframe())
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
import polars as pl from transforms.api import transform, Input, LightweightInput from transforms.tables import TableLightweightOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: TableLightweightOutput): polars_df = source_dataset.polars() table_arrow = polars_df.to_arrow() output_table.iceberg().write(table_arrow)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
import pandas as pd import pyarrow as pa from transforms.api import transform, Input, LightweightInput from transforms.tables import TableLightweightOutput, TableOutput @transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: TableLightweightOutput): pandas_df = source_dataset.pandas() table_arrow = pa.Table.from_pandas(pandas_df) output_table.iceberg().write(table_arrow)
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform, Input, TransformInput from transforms.tables import TableOutput, TableTransformOutput @transform( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: TransformInput, output_table: TableTransformOutput): output_table.write_dataframe(source_dataset.dataframe())
Copied!1 2 3 4 5 6 7 8 9 10 11 12
import polars as pl from transforms.api import transform, Output, LightweightOutput from transforms.tables import TableLightweightInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: TableLightweightInput, output_dataset: LightweightOutput): polars_df = source_table.iceberg().table().scan().to_polars() output_dataset.write_table(polars_df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12
import pandas as pd from transforms.api import transform, Output, LightweightOutput from transforms.tables import TableLightweightInput, TableInput @transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: TableLightweightInput, output_dataset: LightweightOutput): pandas_df = source_table.iceberg().table().scan().to_pandas() output_dataset.write_table(pandas_df)
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform, Output, TransformOutput from transforms.tables import TableInput, TableTransformInput @transform( source_table=TableInput("/.../Input"), output=Output("/.../Output") ) def compute(source_table: TableTransformInput, output: TransformOutput): output.write_dataframe(source_table.dataframe())