Lightweight transforms can process dataset files without schemas. To process dataset files without a schema, list the files with my_input.filesystem().ls().
The statement .filesystem().ls() is available for datasets without schemas, but the .path(), .pandas(), .polars(), .arrow(), and .filesystem().files() statements are only available on datasets with schemas.
The following code shows an example lightweight transform that processes files of a dataset without a schema.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19from transforms.api import incremental, Input, Output, transform @incremental() @transform.using(my_input=Input("my-input"), my_output=Output('my-output')) def my_incremental_transform(my_input, my_output): fs = my_input.filesystem() files = [f.path for f in fs.ls()] polars_dataframes = [] for file_path in files: # Access the file with fs.open(file_path, "rb") as f: # <do something with the file> # append some data as a dataframe to polars_dataframes # Union all the DFs into one combined_df = union_polars_dataframes(polars_dataframes) out.write_table(combined_df)
The following example demonstrates how to parse Excel files:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45from transforms.api import transform, Input, Output import tempfile import shutil import polars as pl import pandas as pd @transform.spark.using( my_output=Output("/path/tabular_output_dataset"), my_input=Input("/path/input_dataset_without_schema"), ) def compute(my_input, my_output): # Parse each file # Open the Excel file at the provided path, using the provided filesystem def read_excel_to_polars(fs, file_path): with fs.open(file_path, "rb") as f: with tempfile.TemporaryFile() as tmp: # Copy paste the file from the source dataset to the local filesystem shutil.copyfileobj(f, tmp) tmp.flush() # shutil.copyfileobj does not flush # read the excel file (the file is now seekable) pandas_df = pd.read_excel(tmp) # Convert eventual integer columns to string columns pandas_df = pandas_df.astype(str) # Convert the pandas dataframe to a polars dataframe return pl.from_pandas(pandas_df) fs = my_input.filesystem() # List all files in the input dataset files = [f.path for f in fs.ls()] polars_dataframes = [] for curr_file_as_row in files: # print(curr_file_as_row) polars_dataframes.append(read_excel_to_polars(fs, curr_file_as_row)) def union_polars_dataframes(dfs): return pl.concat(dfs) # Union all the DFs into one combined_df = union_polars_dataframes(polars_dataframes) my_output.write_table(combined_df)