The following documentation assumes the following prerequisite working knowledge:
Spark sidecar transforms allows you to deploy containerized code while leveraging the existing infrastructure provided by Spark and transforms.
Containerizing code allows you to package any code and any dependencies to run in Foundry. The containerization workflow is integrated with transforms, meaning scheduling, branching, and data health are all seamlessly integrated. Since containerized logic runs alongside Spark executors, you can scale your containerized logic with your input data.
In short, any logic that can run in a container can be used to process, generate, or consume data in Foundry.
If you are familiar with containerization concepts, use the sections below to learn more about using the Spark sidecar transforms:
Learn more about containerization in Foundry.
Transforms in Foundry can send data to and from datasets using a Spark driver to distribute processing across multiple executors, as shown in the diagram below:
Annotating a transform using the @sidecar
decorator (provided in the transforms-sidecar
library) allows you to specify exactly one container that launches alongside each executor in a PySpark transform. The user-provided container, made with custom logic and running with each executor, is called the sidecar container.
In a simple use case with one executor, the data flow would look like the following:
If you write a transform that partitions an input dataset across many executors, the data flow would look like this:
The interface between each executor and the sidecar container is a shared volume, or a directory, to communicate information such as the following:
These shared volumes are specified using the Volume
argument to the @sidecar
decorator and will be subfolders within the path /opt/palantir/sidecars/shared-volumes/
.
The next sections will guide you through preparing for and writing your Spark sidecar transforms.
To build an image compatible with Spark sidecar transforms, the image must meet the image requirements. The image must also include the critical components described below and included in the example Docker image. To build this example image, you will need the Python script entrypoint.py
.
You will need Docker installed on your local computer and must have access to the docker
CLI command (official documentation ↗).
To push an image, create a new Artifacts repository and follow the instructions to tag and push your image to the relevant Docker repository.
Docker
.docker build . --tag <container_registry>/<image_name>:<image_tag> --platform linux/amd64
where the following is true:
container_registry
represents the address of your Foundry instance container registry, which you can locate as part of the last command in the instructions for pushing a Docker image to an Artifact repository.image_name
and image_tag
are at your discretion. This example uses simple_example:0.0.1
.<image_name>:<image_version>
in the last command with the image_name
and image_version
used in the image building step above.transforms-sidecar
and commit the change.In a folder on your local computer, add the following contents to a file called Dockerfile
:
Copied!1# Use the official Python image from the Docker Hub 2FROM python:3.8-slim 3 4# Set the working directory 5WORKDIR /usr/src/app 6 7# Copy application dependency manifest 8COPY requirements.txt ./ 9 10# Install application dependencies 11RUN pip install --no-cache-dir -r requirements.txt 12 13# Copy the application code 14COPY . . 15 16# Expose the port the app runs on 17EXPOSE 1234 18 19# Run the application 20CMD ["python", "app.py"] 21 22USER 1234
In the same folder, add a new file called requirements.txt
and list the required Python dependencies:
flask
In the same local folder as your Dockerfile
, copy the following code snippet into a file named app.py
.
Copied!1from flask import Flask 2 3app = Flask(__name__) 4 5@app.route('/hello') 6def hello(): 7 return 'Hello World' 8 9if __name__ == '__main__': 10 app.run(host='0.0.0.0', port=1234)
In your Foundry Python code repository, write the following example transform to call the /hello
endpoint on the sidecar and save the response to the output:
Copied!1import requests 2 3from transforms.api import Output, transform_df 4from transforms.sidecar import sidecar 5 6 7@sidecar(image='simple-example', tag='0.0.1') 8@transform_df( 9 Output("<output dataset rid>"), 10) 11def compute(ctx): 12 response = requests.get('http://localhost:1234/hello') 13 14 data = [(response.text,)] 15 16 columns = ["response_text"] 17 return ctx.spark_session.createDataFrame(data, columns)
In a folder on your local computer, add the following contents to a file called Dockerfile
:
Copied!1FROM fedora:38 2 3ADD entrypoint.py /usr/bin/entrypoint 4RUN chmod +x /usr/bin/entrypoint 5 6RUN mkdir -p /opt/palantir/sidecars/shared-volumes/shared/ 7RUN chown 5001 /opt/palantir/sidecars/shared-volumes/shared/ 8ENV SHARED_DIR=/opt/palantir/sidecars/shared-volumes/shared 9 10USER 5001 11 12ENTRYPOINT entrypoint -c "dd if=$SHARED_DIR/infile.csv of=$SHARED_DIR/outfile.csv"
You can build your own Dockerfile, as above, but make sure to cover the following:
Specify a numeric non-root user on line 10. This is one of the image requirements and helps to maintain a proper security posture where containers are not given privileged execution.
Next, place the creation of a shared volume on lines 6-8. As discussed in the architecture section above, shared volumes that are subdirectories within /opt/palantir/sidecars/shared-volumes/
are the primary method in which the input data and output data are shared from the PySpark transform to the sidecar container.
Finally, add a simple entrypoint
script to the container on line 3 and set as the ENTRYPOINT
on line 12. This step is critical, as Spark sidecar transforms do not natively instruct the sidecar container to wait for input data to be available before the container launches. Additionally, sidecar transforms do not tell the container to stay alive and wait for the output data to be copied off. The provided entrypoint
script uses Python to tell the container to wait for a start_flag
file to be written to the shared volume before the specified logic executes. When the specified logic finishes, it writes a done_flag
to the same directory. The container will wait for a close_flag
to be written to the shared volume before it will stop itself and be automatically cleaned up.
As shown in the example above, the containerized logic uses the POSIX Disk Dump (dd) utility to copy and input CSV files from the shared directory to an output file stored in the same directory. This “command”, which is passed into the entrypoint
script, could be any logic that can execute in the container.
In the same local folder as your Dockerfile
, copy the following code snippet into a file named entrypoint.py
.
Copied!1#!/usr/bin/env python3 2 3import os 4import time 5import subprocess 6from datetime import datetime 7 8import argparse 9 10parser = argparse.ArgumentParser() 11parser.add_argument("-c", "--command", type=str, help="model command to execute") 12args = parser.parse_args() 13the_command = args.command.split(" ") 14 15 16def run_process(exe): 17 "Define a function for running commands and capturing stdout line by line" 18 p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 19 return iter(p.stdout.readline, b"") 20 21 22start_flag_fname = "/opt/palantir/sidecars/shared-volumes/shared/start_flag" 23done_flag_fname = "/opt/palantir/sidecars/shared-volumes/shared/done_flag" 24close_flag_fname = "/opt/palantir/sidecars/shared-volumes/shared/close_flag" 25 26# Wait for start flag 27print(f"{datetime.utcnow().isoformat()}: waiting for start flag") 28while not os.path.exists(start_flag_fname): 29 time.sleep(1) 30print(f"{datetime.utcnow().isoformat()}: start flag detected") 31 32# Execute model, logging output to file 33with open("/opt/palantir/sidecars/shared-volumes/shared/logfile", "w") as logfile: 34 for item in run_process(the_command): 35 my_string = f"{datetime.utcnow().isoformat()}: {item}" 36 print(my_string) 37 logfile.write(my_string) 38 logfile.flush() 39print(f"{datetime.utcnow().isoformat()}: execution finished writing output file") 40 41# Write out the done flag 42open(done_flag_fname, "w") 43print(f"{datetime.utcnow().isoformat()}: done flag file written") 44 45# Wait for close flag before allowing the script to finish 46while not os.path.exists(close_flag_fname): 47 time.sleep(1) 48print(f"{datetime.utcnow().isoformat()}: close flag detected. shutting down")
The following examples will review the key information required to get started with sidecar transforms. Both examples use the same utilities file found here that you can add to your code repository and import as shown below.
The transform below imports the @sidecar
decorator and the Volume
primitive from the transforms-sidecar
library.
The transform uses both items for annotation so that one instance of the simple-example:0.0.1
container is launched with each executor. Each executor/sidecar pair will have a shared volume at /opt/palantir/sidecars/shared-volumes/shared
.
This first example launches one instance of the container with one executor and follows the architecture shown in the image below:
The transform then uses the utility function lanch_udf_once
to launch one instance of the user_defined_function
. That user-defined function will run on one executor and communicate with one instance of the sidecar container. The user-defined function will invoke the imported utility functions to do the following:
Copied!1from transforms.api import transform, Input, Output 2from transforms.sidecar import sidecar, Volume 3from myproject.datasets.utils import copy_files_to_shared_directory, copy_start_flag, wait_for_done_flag 4from myproject.datasets.utils import copy_output_files, copy_close_flag, launch_udf_once 5 6 7@sidecar(image='simple-example', tag='0.0.1', volumes=[Volume("shared")]) 8@transform( 9 output=Output("<output dataset rid>"), 10 source=Input("<input dataset rid>"), 11) 12def compute(output, source, ctx): 13 def user_defined_function(row): 14 # Copy files from source to shared directory. 15 copy_files_to_shared_directory(source) 16 # Send the start flag so the container knows it has all the input files 17 copy_start_flag() 18 # Iterate till the stop flag is written or we hit the max time limit 19 wait_for_done_flag() 20 # Copy out output files from the container to an output dataset 21 output_fnames = [ 22 "start_flag", 23 "outfile.csv", 24 "logfile", 25 "done_flag", 26 ] 27 copy_output_files(output, output_fnames) 28 # Write the close flag so the container knows you have extracted the data 29 copy_close_flag() 30 # The user defined function must return something 31 return (row.ExecutionID, "success") 32 # This spawns one task, which maps to one executor, and launches one "sidecar container" 33 launch_udf_once(ctx, user_defined_function)
This example launches many instances of the sidecar container, each processing a subset of the input data. The information is then collected and saved to output datasets. This example more closely resembles the architecture shown below:
The following transform uses different utility functions to partition the input data and send individual files to each container, performing the same execution on different chunks of input data. The utility functions are written to save the output files as both individual files and as a tabular output dataset.
You will see the same parameters configured for the @sidecar
decorator and Volume
specification, as in Example 1.
An @configure
flag is set to ensure that only one task launches per executor and that exactly four total executors can launch. This configuration, combined with the fact that the input dataset has exactly four rows of data and the input repartition is set to 4
, means that four instances of the user-defined function will launch on four executors. Therefore, exactly four instances of the sidecar container will launch and process their segment of the input data.
Ensure that your repository has the two Spark profiles imported under Settings > Spark.
Copied!1from transforms.api import transform, Input, Output, configure 2from transforms.sidecar import sidecar, Volume 3import uuid 4from myproject.datasets.utils import copy_start_flag, wait_for_done_flag, copy_close_flag 5from myproject.datasets.utils import write_this_row_as_a_csv_with_one_row 6from myproject.datasets.utils import copy_output_files_with_prefix, copy_out_a_row_from_the_output_csv 7 8 9@configure(["EXECUTOR_CORES_EXTRA_SMALL", "NUM_EXECUTORS_4"]) 10@sidecar(image='simple-example', tag='0.0.1', volumes=[Volume("shared")]) 11@transform( 12 output=Output("<first output dataset rid>"), 13 output_rows=Output("<second output dataset rid>"), 14 source=Input("<input dataset rid>"), 15) 16def compute(output, output_rows, source, ctx): 17 18 def user_defined_function(row): 19 # Copy files from source to shared directory 20 write_this_row_as_a_csv_with_one_row(row) 21 22 # Send the start flag so the container knows it has all the input files. 23 copy_start_flag() 24 25 # Iterate until the stop flag is written or you hit the maximum time limit. 26 wait_for_done_flag() 27 28 # Copy output files from the container to the output datasets 29 output_fnames = [ 30 "start_flag", 31 "infile.csv", 32 "outfile.csv", 33 "logfile", 34 "done_flag", 35 ] 36 random_unique_prefix = f'{uuid.uuid4()}'[:8] 37 copy_output_files_with_prefix(output, output_fnames, random_unique_prefix) 38 39 outdata1, outdata2, outdata3 = copy_out_a_row_from_the_output_csv() 40 41 # Write the close flag so the container knows you have extracted the data. 42 copy_close_flag() 43 44 # The user-defined function must return something. 45 return (row.data1, row.data2, row.data3, "success", outdata1, outdata2, outdata3) 46 47 results = source.dataframe().repartition(4).rdd.map(user_defined_function) 48 columns = ["data1", "data2", "data3", "success", "outdata1", "outdata2", "outdata3"] 49 output_rows.write_dataframe(results.toDF(columns))
utils.py
Copied!1import os 2import shutil 3import time 4import csv 5import pyspark.sql.types as T 6 7VOLUME_PATH = "/opt/palantir/sidecars/shared-volumes/shared" 8MAX_RUN_MINUTES = 10 9 10 11def write_this_row_as_a_csv_with_one_row(row): 12 in_path = "/opt/palantir/sidecars/shared-volumes/shared/infile.csv" 13 with open(in_path, 'w', newline='') as csvfile: 14 writer = csv.writer(csvfile, delimiter=',') 15 writer.writerow(['data1', 'data2', 'data3']) 16 writer.writerow([row.data1, row.data2, row.data3]) 17 18 19def copy_out_a_row_from_the_output_csv(): 20 out_path = "/opt/palantir/sidecars/shared-volumes/shared/outfile.csv" 21 with open(out_path, newline='') as csvfile: 22 reader = csv.reader(csvfile, delimiter=',', quotechar='|') 23 values = "", "", "" 24 for myrow in reader: 25 values = myrow[0], myrow[1], myrow[2] 26 return values 27 28 29def copy_output_files_with_prefix(output, output_fnames, prefix): 30 for file_path in output_fnames: 31 output_fs = output.filesystem() 32 out_path = os.path.join(VOLUME_PATH, file_path) 33 try: 34 with open(out_path, "rb") as shared_file: 35 with output_fs.open(f'{prefix}_{file_path}', "wb") as output_file: 36 shutil.copyfileobj(shared_file, output_file) 37 except FileNotFoundError as err: 38 print(err) 39 40 41def copy_files_to_shared_directory(source): 42 source_fs = source.filesystem() 43 for item in source_fs.ls(): 44 file_path = item.path 45 with source_fs.open(file_path, "rb") as source_file: 46 dest_path = os.path.join(VOLUME_PATH, file_path) 47 with open(dest_path, "wb") as shared_file: 48 shutil.copyfileobj(source_file, shared_file) 49 50 51def copy_start_flag(): 52 open(os.path.join(VOLUME_PATH, 'start_flag'), 'w') 53 time.sleep(1) 54 55 56def wait_for_done_flag(): 57 i = 0 58 while i < 60 * MAX_RUN_MINUTES and not os.path.exists(os.path.join(VOLUME_PATH, 'done_flag')): 59 i += 1 60 time.sleep(1) 61 62 63def copy_output_files(output, output_fnames): 64 for file_path in output_fnames: 65 output_fs = output.filesystem() 66 out_path = os.path.join(VOLUME_PATH, file_path) 67 try: 68 with open(out_path, "rb") as shared_file: 69 with output_fs.open(file_path, "wb") as output_file: 70 shutil.copyfileobj(shared_file, output_file) 71 except FileNotFoundError as err: 72 print(err) 73 74 75def copy_close_flag(): 76 time.sleep(5) 77 open(os.path.join(VOLUME_PATH, 'close_flag'), 'w') # send the close flag 78 79 80def launch_udf_once(ctx, user_defined_function): 81 # Using a dataframe with a single row, launch user_defined_function once on that row 82 schema = T.StructType([T.StructField("ExecutionID", T.IntegerType())]) 83 ctx.spark_session.createDataFrame([{"ExecutionID": 1}], schema=schema).rdd.foreach(user_defined_function)