Lightweight transforms can process files of datasets without schemas. To process files of a dataset without a schema, list the files via my_input.filesystem().ls()
.
The statement .filesystem().ls()
is available for datasets without schemas, but the .path()
, .pandas()
, .polars()
, .arrow()
, and .filesystem().files()
statements are only available on datasets with schemas.
The following code shows an example lightweight transform that processes files of a dataset without a schema.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from transforms.api import incremental, Input, Output, transform @incremental() @transform.using(my_input=Input("my-input"), my_output=Output('my-output')) def my_incremental_transform(my_input, my_output): fs = my_input.filesystem() files = [f.path for f in fs.ls()] polars_dataframes = [] for file_path in files: # Access the file with fs.open(file_path, "rb") as f: # <do something with the file> # append some data as a dataframe to polars_dataframes # Union all the DFs into one combined_df = union_polars_dataframes(polars_dataframes) out.write_table(combined_df)
The following code shows an example to parse Excel files:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
from transforms.api import transform, Input, Output import tempfile import shutil import polars as pl import pandas as pd @transform( my_output=Output("/path/tabular_output_dataset"), my_input=Input("/path/input_dataset_without_schema"), ) def compute(my_input, my_output): # Parse each file # Open the Excel file at the provided path, using the provided filesystem def read_excel_to_polars(fs, file_path): with fs.open(file_path, "rb") as f: with tempfile.TemporaryFile() as tmp: # Copy paste the file from the source dataset to the local filesystem shutil.copyfileobj(f, tmp) tmp.flush() # shutil.copyfileobj does not flush # read the excel file (the file is now seekable) pandas_df = pd.read_excel(tmp) # Convert eventual integer columns to string columns pandas_df = pandas_df.astype(str) # Convert the pandas dataframe to a polars dataframe return pl.from_pandas(pandas_df) fs = my_input.filesystem() # List all files in the input dataset files = [f.path for f in fs.ls()] polars_dataframes = [] for curr_file_as_row in files: # print(curr_file_as_row) polars_dataframes.append(read_excel_to_polars(fs, curr_file_as_row)) def union_polars_dataframes(dfs): return pl.concat(dfs) # Union all the DFs into one combined_df = union_polars_dataframes(polars_dataframes) my_output.write_table(combined_df)