Typically, when an output dataset is built incrementally, all unprocessed transactions of each input dataset are processed in the same job. However, in some situations, the number of transactions processed by a job can vary significantly:
SNAPSHOT
mode and the entire input is read from the beginning (for example, the semantic version of the transform was increased).You can configure a transaction limit on each incremental input of a transform to constrain the amount of data read in each job.
The example below configures an incremental transform to use the following:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from transforms.api import transform, Input, Output, incremental @incremental( v2_semantics=True, strict_append=True, snapshot_inputs=["snapshot_input"] ) @transform( # Incremental input configured to read a maximum of 3 transactions input_1=Input("/examples/input_1", transaction_limit=3), # Incremental input configured to read a maximum of 2 transactions input_2=Input("/examples/input_2", transaction_limit=2), # Incremental input without a transaction limit input_3=Input("/examples/input_3"), # Snapshot input whose entire view is read each time snapshot_input=Input("/examples/input_4"), output=Output("/examples/output") ) def compute(input_1, input_2, input_3, snapshot_input, output): ...
When transaction limits are enabled, a dataset may still be out of date with the latest upstream data after a successful build since only a portion of the data would have been processed. You can configure a schedule to keep building the output dataset until it is up to date with its inputs by following the below steps:
You can verify the ranges of transactions read per input in an incremental job by following the steps below:
On this page, ranges of transactions are reported per input, displaying which part of each input was processed in both the current and previous job:
Select a transaction to navigate to the History page of the input, where the corresponding transaction will be highlighted.
Though the same added
, current
, and previous
read ranges are offered when the input is configured with or without transaction limits, they behave slightly differently.
In the example below, consider an incremental transform where you already processed transactions A
to C
. Now, assume that a relatively large number of transactions, D
to J
, are added to the input.
If you read the input without a transaction limit, the range of transactions for each read mode in the next job would be as follows:
D
to J
A
to C
A
to J
However, if you read the input with a transaction limit of three, you would need three jobs to catch up to the input. The range of transactions for each read mode per job would be as follows:
First job:
D
to F
(the next three unprocessed transactions)A
to C
(all transactions that were processed in the previous job)A
to F
(all transactions that were processed up to and including this batch)Second job:
G
to I
A
to F
A
to I
Third job:
J
A
to I
A
to J
Now, if the output was snapshotted (for example, if the semantic version was changed), transactions would be processed again from the start transaction of the input and result in the resolved ranges below:
Without a transaction limit:
A
to J
(all transactions are now "unprocessed")A
to J
With a incremental input:
First job:
A
to C
A
to C
(all transactions that were processed up to and including this batch)Second job:
D
to F
A
to C
A
to F
Third job:
G
to I
A
to F
A
to I
Fourth job:
J
A
to I
A
to J
To use transaction limits in an incremental transform, ensure you have access to the necessary tools and services and that the transforms and datasets meet the requirements below.
The transform must meet the following conditions:
v2_semantics
argument is set to True
.3.25.0
or higher. Configure a job with module pinning to use a specific version of Python transforms.Enabling v2_semantics
on an existing incremental transform will cause the subsequent build to run as SNAPSHOT
. This only happens once.
Input datasets must meet the following conditions to be configured with a transaction limit:
APPEND
transactions; however, the starting transaction can be a SNAPSHOT
.If any transaction in the current view is a DELETE
or UPDATE
transaction, the job will fail with a Build2:InvalidTransactionTypeForBatchInputResolution
error.