Incremental transforms

Warning

Incremental computation is an advanced feature. Ensure that you understand the rest of the user guide before making use of this feature.

The transforms shown so far in the user guide recompute their entire output datasets every time they are run. This can lead to a lot of unnecessary work. Consider the following example:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package myproject.datasets; import com.palantir.transforms.lang.java.api.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public final class FilterTransform { @Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().read(); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(); } }

If any new data is added to the /examples/students_hair_eye_color input dataset, the filter() is performed over the entire input, rather than just the new data added to the input. This is wasteful in compute resource and time.

If a transform can become aware of its build history, it can be smarter about how to compute its output. More specifically, it can use the changes made to the inputs to modify the output dataset. This process of using already materialized data when re-materializing tables is called incremental computation. Without incremental computation, the output dataset is always replaced by the latest output of the transform.

Let's go back to the example transform shown above. The transform performs a filter() over the students dataset to write out students with brown hair. With incremental computation, if data about two new students is appended to students, the transform can use information about its build history to append only the new brown-haired students to the output:

RAW                              DERIVED
+---+-----+-----+                  +---+-----+-----+
| id| hair|  eye|                  | id| hair|  eye|
+---+-----+-----+     Build 1      +---+-----+-----+
| 17|Black|Brown|    --------->    | 18|Brown|Brown|
| 18|Brown|Brown|                  +---+-----+-----+
| 19|  Red|Brown|
+---+-----+-----+                            ...
   ...
+---+-----+-----+     Build 2      +---+-----+-----+
| 20|Brown|Brown|    --------->    | 20|Brown|Brown|
| 21|Black|Blue |                  +---+-----+-----+
+---+-----+-----+

Write an incremental transform

We will guide you step by step on how to write an incremental transform using transforms-java. As opposed to transforms-python, transforms-java doesn't use annotations to automatically verify incrementality and apply transformations in incremental fashion. The process of writing incremental transforms in java is controlled more directly by the user, who can explicitly decide in which case a transformation should act incrementally and when not. By interpreting how the input dataset was modified, the user can decide whether to update the output dataset in incremental fashion or in snapshot-like fashion.

Interpret your input

The first step to take involves interpretation of your input. The input dataset could be modified in multiple ways, and we will be able to apply incremental transformations only in some specific circumstances. DataFrameModificationType (or FilesModificationType) expresses the different ways a dataset can be modified. The different modes are:

  • APPENDED
  • UPDATED
  • NEW_VIEW
  • UNCHANGED

Based on how the input has changed we can make decisions on what to read from the input dataset and what to write to the output dataset.

Read the input

Knowing how the input was modified allows us to read it accordingly. If a transaction only appended data we are sure we can safely act incrementally and read only what was modified. If, instead, we have a change to the input dataset including modification of already existing rows we may want to re read the whole view. Transforms-Java API allows for different read modes for input dataset thanks to the readForRange() method. ReadRange exposes the possible reading ranges. The different modes are:

  • UNPROCESSED
  • PROCESSED
  • ENTIRE_VIEW

By interpreting the input modification type we can then decide how to read our data, as shown in the example below.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private ReadRange getReadRange(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case APPENDED: LOG.info("Append-only changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case UPDATED: LOG.info("Update-type changes in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; case NEW_VIEW: LOG.info("New view in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } }

We can then modify our compute method accordingly.

Copied!
1 2 3 4 5 6 7 @Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(); }
Warning

At this point we are only reading different portions of the input dataset but not acting differently on the output dataset. Running the code in this example up to this point will always result in a snapshot transaction on the output, no matter which portion of the input you are reading. Proceed until the end of the tutorial before applying your incremental transforms in order to understand how to correctly modify the output dataset.

Transform the data

In this step it's on the user to apply whichever transformation of the data is needed. Remember that depending on the input modification the data read will differ. In our case the transformation is a simple filter for brown eyes, that we can isolate as:

Copied!
1 inputDf = inputDf.filter("eye = 'Brown'");

Write the output

Once we have interpreted the modifications in the input dataset, read the desired portion of the input and transformed the data according to our transformation logic, we can write our output accordingly. WriteMode provides the different writing modes. The different modes are:

  • SNAPSHOT
  • UPDATE

For example, in our case, we can choose the output type based on the input modification type.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private WriteMode getWriteMode(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case APPENDED: LOG.info("Append-only changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case UPDATED: LOG.info("Update-type changes in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; case NEW_VIEW: LOG.info("new view in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } }
Warning

Do not confuse WriteMode.UPDATE and DataFrameModificationType.UPDATED. The former results in an incremental modification of output dataset that will result in a DataFrameModificationType.APPENDED for downstream datasets. The latter is a modification of the input dataset that includes both appends and modification in existing rows.

Finally, the write() function can be can be modified to include a write mode:

Copied!
1 2 3 4 5 6 7 @Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput)); }

Putting it all together

We can build a simple incremental filtering transform by putting the pieces together.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package myproject.datasets; import com.palantir.transforms.lang.java.api.*; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class FilterTransform { private static final Logger LOG = LoggerFactory.getLogger(FilterTransform.class); @Compute public void myComputeFunction( @Input("/examples/students_hair_eye_color") FoundryInput myInput, @Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) { Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput)); myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput)); } private ReadRange getReadRange(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case APPENDED: LOG.info("Append-only changes in input dataset, read only unprocessed"); return ReadRange.UNPROCESSED; case UPDATED: LOG.info("Update-type changes in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; case NEW_VIEW: LOG.info("New view in input dataset, read entire view"); return ReadRange.ENTIRE_VIEW; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } } private WriteMode getWriteMode(FoundryInput input) { switch (input.asDataFrame().modificationType()) { case UNCHANGED: LOG.info("No changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case APPENDED: LOG.info("Append-only changes in input dataset, writing in update mode"); return WriteMode.UPDATE; case UPDATED: LOG.info("Update-type changes in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; case NEW_VIEW: LOG.info("new view in input dataset, writing in snapshot mode"); return WriteMode.SNAPSHOT; default: throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType()); } } }

As introduced above, we evaluate the input modification type and read the input accordingly. We then decide whether to incrementally update the output dataset or start a new snapshot transaction.

Best practices

Switch between snapshot and incremental

Let's say you want to mostly run incremental transforms but sometimes need to rerun a snapshot of your dataset.

To avoid manually hardcoding the desired result, you can add a new input that results in the output using the SNAPSHOT write mode whenever this input is modified. This new input will essentially then act as a snapshot trigger dataset. Note that you will have to adapt the read range of the transform's other inputs based on the modification type of this new snapshot trigger dataset.

It is also possible to externally force a snapshot by creating an empty append transaction without provenance. However, transforms-java does not expose such functionality, and it is therefore out of the scope of this guide.

Advanced features

Warning

The advanced features in this section can have serious negative effects if used incorrectly. Do not use these features if you are not absolutely certain of the implications. If run without appropriate care and caution, there is a high risk of unwanted consequences. Contact your Palantir representative if you have any questions.

Warning

Advanced features are usually included with annotations on top of your @Compute function. However, if your transform is manually registered, you will need to add the properties to the Transform Builder instead.

Ignore incremental deletes

If an incremental build depends on an append-only dataset growing indefinitely and there is insufficient disk space for that growth, it may become necessary to delete parts of the upstream dataset. However, this may break incrementality as the modification of the original dataset will not result in a APPENDED modification type. IncrementalOptions.IGNORE_INCREMENTAL_DELETES will avoid this and not treat deletions in the upstream dataset as breaking changes.

Warning

It is only possible to ignore incremental deletes in low-level transforms.

Copied!
1 2 3 4 5 6 @Compute @UseIncrementalOptions(IncrementalOptions.IGNORE_INCREMENTAL_DELETES) public void myComputeFunction( @Input("/Users/admin/students_data") FoundryInput myInput, @Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) { ...

If your transform is manually registered, add the property to the builder as in the following code block.

Copied!
1 2 3 4 5 6 LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() .computeFunctionInstance(new MyLowLevelManualFunction()) .putParameterToInputAlias("myInput", "/path/to/input/dataset") .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") .ignoreIncrementalDeletes(true) .build();

Ignore schema change

Warning

Note that a schema modification in the input dataset may have unexpected consequences when combined with incremental transforms.

Read all of the documentation below and ensure that you understand all potential implications before using this feature.

Warning

It is only possible to ignore schema changes in low level transforms.

If the schema of the dataset an incremental build depends on changes, the change will result in a DataFrameModificationType.NEW_VIEW, possibly breaking incrementality.

However, if the IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE option is set, a schema change won't result in a new view. Instead, the schema change in the input dataset will be interpreted as DataFrameModificationType.UNCHANGED and a SchemaModificationType flag SchemaModificationType.NEW_SCHEMA will be set, allowing the user to explicitly treat this special case.

Copied!
1 2 3 4 5 6 7 @Compute @UseIncrementalOptions(IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE) public void myComputeFunction( @Input("/Users/admin/students_data") FoundryInput myInput, @Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) { ... }

If your transform is manually registered, add the property to the builder as in the following code block.

Copied!
1 2 3 4 5 6 LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder() .computeFunctionInstance(new MyLowLevelManualFunction()) .putParameterToInputAlias("myInput", "/path/to/input/dataset") .putParameterToOutputAlias("myOutput", "/path/to/output/dataset") .useSchemaModificationType(true) .build();

The build related to the transformation will succeed or fail depending on how the transformation depends on the input dataset. More precisely, if the transformation depends on columns involved in the schema change, modification to those column will make the incremental transformation fail. In these cases, a new snapshot is required before being able to use incremental transforms again.

The transformation depends on a certain column if:

  • It contains modifications depending explicitly on that column (for example, if we have filter("eye = 'Brown'") and the column eye is renamed or deleted in the RAW dataset, then if we retrigger our FilterTransform the incremental update will fail).
  • Modified columns appeared in the output dataset (for example, if we remove the column hair in our example RAW datasets, our FilterTransform will fail).

If the transformation doesn't depend on the schema changes, the incremental build will succeed.

For example, if we first add a select statement for id and eye in our transformation and trigger an initial snapshot build from the RAW dataset, and then remove the column hair in the RAW dataset, the incremental build will succeed and the schema change will not have any effect on the incremental transformation. The build will also always succeed in case of additive changes to the schema (e.g. adding a new column).