To use compute pushdown with Snowflake, create a lightweight Python repository and install the most recent version of the transforms-tables
library. A Snowpark ↗ session will be configured based on the connection details of the Snowflake tables configured as inputs and/or outputs to the transforms. The data can be transformed using the Snowpark DataFrame API. For full guidance on the Snowpark API, consult the Snowpark documentation ↗.
An example of a Snowpark transform is shown below:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
from snowflake.snowpark.functions import col, udf from snowflake.snowpark.types import StringType from transforms.api import lightweight, transform from transforms.tables import ( SnowflakeTable, TableInput, TableLightweightInput, TableLightweightOutput, TableOutput, ) ID_PREFIX = "CUSTOMER-NO-" @lightweight @transform( input_table=TableInput("ri.tables.main.table.1234"), output_table=TableOutput( "ri.tables.main.table.5678", "ri.magritte..source.1234", SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED"), ), ) def compute_in_snowflake(input_table: TableLightweightInput, output_table: TableLightweightOutput): """ With Snowflake tables, you can perform lightweight transforms using the Snowpark APIs. All compute for these is pushed down to the underlying Snowflake instance, so this can tackle big data workloads. In a set up like this, all data must live in the same Snowpark instance and be accessible through the same connection. """ # get a Snowpark DataFrame instance df: snow.DataFrame = input_table.snowpark().dataframe() session: snow.Session = df.session # define a UDF to apply to our data @udf(session=session, return_type=StringType()) def fix_id_col(ident: int) -> str: """ UDF to convert id to string and prepend "CUSTOMER-NO-". """ return ID_PREFIX + str(ident) # apply UDF df = df.with_column("ID", fix_id_col(col("ID"))) # write back to the new table output_table.snowpark().write(df)
The Snowpark API allows data to be converted into a pandas DataFrame. If the scale of your data is small enough, this can be used to bring the data from Snowflake into Foundry lightweight compute. This enables the use of transforms beyond the capabilities of the Snowpark APIs, and allows Snowflake tables to be combined with other Foundry data.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
import hashlib from transforms.api import lightweight, transform from transforms.tables import ( SnowflakeTable, TableInput, TableLightweightInput, TableLightweightOutput, TableOutput, ) @lightweight @transform( input_table=TableInput("ri.tables.main.table.1234"), output_table=TableOutput( "ri.tables.main.table.5678", "ri.magritte..source.1234", SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED_ANON"), ), ) def compute_local(input_table: TableLightweightInput, output_table: TableLightweightOutput): """ Snowpark also supports conversion to pandas DataFrames, meaning that you can use lightweight transforms on top of Snowflake tables to conduct in-container compute work. You can use this to go beyond the scope of what is supported in Snowpark. """ # get a Snowpark DataFrame instance df = input_table.snowpark().dataframe() session = df.session # convert to pandas pd_df = df.to_pandas() # create ANON_CODE by hashing the concatenation of CITY, STATE, and ZIP_CODE def generate_anon_code(row): concatenated = f"{row['CITY']}{row['STATE']}{row['ZIP_CODE']}" return hashlib.sha256(concatenated.encode("utf-8")).hexdigest() # apply the function to create the ANON_CODE column pd_df["ANON_CODE"] = pd_df.apply(generate_anon_code, axis=1) # select the ID and ANON_CODE columns result_data = pd_df[["ID", "ANON_CODE"]] # write back to the new table new_df = session.create_dataframe(result_data, schema=["ID", "ANON_CODE"]) output_table.snowpark().write(new_df)