This guide explains how to use changelogs in Python transforms with Iceberg tables. The guide covers the two main changelog modes, their mechanics, and best practices for avoiding common pitfalls.
A changelog is a view of the changes (inserts, updates, and deletes) that have occurred in a table between two points in time (as represented by Iceberg snapshots ↗). Changelogs enable efficient incremental processing, allowing you to process only what has changed since the last run.
There are two primary ways to generate a changelog in Foundry:
Use the "identifier changelog" mode when you have one or more columns in your input data that uniquely identify each row, together constituting a primary key. This mode is more performant than the "net changes changelog" mode and provides richer semantics, including update-before and update-after records.
To generate an identifier changelog, run source.changelog
and provide an array of primary keys:
Copied!1
changelog_df = source.changelog(["id"])
Foundry's "identifier changelog" mode is implemented using the compute_updates
option in Iceberg's create_changelog_view
↗ procedure.
With compute_updates
:
INSERT
and DELETE
changes.UPDATE_BEFORE
and UPDATE_AFTER
rows, also known as "pre/post update images" ↗.Foundry's API implements an additional step on top of Iceberg's create_changelog_view
to get the latest update across a transaction range. For example, if a row has its value updated from 1
to 2
in one snapshot, and then from 2
to 3
in a subsequent snapshot, the resulting identifier changelog will only show an UPDATE_BEFORE
of 2
and an UPDATE_AFTER
of 3
.
The Iceberg table specification supports identifier fields ↗ in metadata but does not enforce uniqueness. Because of that, Transforms changelogs do not currently rely on identifier fields in metadata.
The schema for the identifier changelog:
Column name | Type | Description |
---|---|---|
All data columns | various | All columns from the source table |
_change_type | string | One of INSERT , DELETE , UPDATE_BEFORE , UPDATE_AFTER |
_change_ordinal | int | Monotonically increasing number for ordering changes |
The change types for the identifier changelog:
Change type | Description |
---|---|
UPDATE_BEFORE | The old version of the row for the identifier. |
UPDATE_AFTER | The new version of the row for the identifier. |
INSERT | New row for this identifier. |
DELETE | Row removed for this identifier. |
An example identifier changelog:
id | value | _change_type | _change_ordinal |
---|---|---|---|
id1 | val1 | UPDATE_BEFORE | 1 |
id1 | val2 | UPDATE_AFTER | 1 |
id2 | val2 | INSERT | 2 |
Use the "net changes changelog" mode when there is no set of identifier columns that uniquely identify entries in your input data. This approach provides more flexibility than the "identifier changelog" mode, but comes at the cost of performance.
To generate a net changes changelog:
Copied!1
changelog_df = source.changelog()
Hint: Use "net changes changelog" when you can't trust your primary keys or have duplicate rows in the source.
Foundry's "net changes changelog" mode is implemented using the net_changes
option in Iceberg's create_changelog_view
↗ procedure.
In Iceberg, updating a row is implemented as a deletion and insertion of a row. There is no inherent notion of a row update in that nothing connects the deletion and insertion as semantically referring to the same row.
The "net changes changelog" is the simplest type of changelog in that it represents a row update as a pair of rows, one having as _change_type
value DELETE
and the other having _change_type
value INSERT
. (Deletions and insertions are represented by a single row with DELETE
and INSERT
respectively.)
The "net" in "net changes" refers to row updates (insertions, deletions, and updates) being combined across a range of snapshots. For example, if a row has its value updated from 1
to 2
in one snapshot, and 2
to 3
in a subsequent snapshot, then the "net changes changelog" will show a DELETE
of 1
and an INSERT
of 3
. The 2
update is being skipped.
The schema for the net changes changelog:
Column name | Type | Description |
---|---|---|
All data columns | various | All columns from the source table |
_change_type | string | INSERT or DELETE |
_change_ordinal | int | Monotonically increasing number for ordering changes |
The change types for the identifier changelog:
Change type | Description |
---|---|
INSERT | Row insertion |
DELETE | Row deletion |
An example net changes changelog:
id | value | _change_type | _change_ordinal |
---|---|---|---|
id1 | val1 | DELETE | 1 |
id1 | val2 | INSERT | 1 |
This example demonstrates how an Iceberg table evolves across snapshots V0
to V3
and how changelogs are produced.
V0 | V1 | V2 | V3 |
---|---|---|---|
[id1, val1] | [id1, val2] | [id1, val3] | [id1, val3] |
[id2, val2] | [id1, val3] |
The following tables show how the identifier changelog evolves across snapshots for the example Iceberg table.
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | UPDATE_BEFORE | 1 |
ID1 | VAL2 | UPDATE_AFTER | 1 |
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL2 | UPDATE_BEFORE | 2 |
ID1 | VAL3 | UPDATE_AFTER | 2 |
ID2 | VAL2 | INSERT | 2 |
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | UPDATE_BEFORE | 1 |
ID1 | VAL3 | UPDATE_AFTER | 2 |
ID2 | VAL2 | INSERT | 2 |
In this example, the identifier changelog from V0->V3 fails as there are duplicate rows for ID1
value of the primary key. This highlights the importance of unique identifier columns for the identifier changelog mode.
The following tables show how the net changes changelog evolves across snapshots for the example Iceberg table.
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | DELETE | 1 |
ID1 | VAL2 | INSERT | 1 |
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL2 | DELETE | 2 |
ID1 | VAL3 | INSERT | 2 |
ID2 | VAL2 | INSERT | 2 |
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | DELETE | 1 |
ID1 | VAL3 | INSERT | 2 |
ID2 | VAL2 | INSERT | 2 |
Rows with strikethrough are deleted to build the net changes changelog:
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | DELETE | 1 |
ID1 | VAL3 | INSERT | 2 |
ID2 | VAL2 | INSERT | 2 |
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | DELETE | 1 |
ID1 | VAL3 | INSERT | 2 |
ID1 | VAL3 | INSERT | 3 |
Net changes after strikethrough rows removed:
id | value | _change_type | _change_ordinal |
---|---|---|---|
ID1 | VAL1 | DELETE | 1 |
ID1 | VAL3 | INSERT | 2 |
ID1 | VAL3 | INSERT | 3 |
When working with changelogs, you must ensure uniqueness of identifier columns at two stages: the source table and the destination table.
source.changelog([identifier_cols])
)When you call:
Copied!1
changelog_df = source.changelog(["id"])
the API expects that the identifier columns (here, id
) uniquely identify each row in your source table.
If there are duplicate rows for the same identifier in the source, changelog creation will fail:
ValueError: Duplicate rows found for identifier columns ['id'] in changelog view.
If you cannot guarantee uniqueness in your source, use the net changes changelog instead:
Copied!1
changelog_df = source.changelog() # No identifier columns
output.apply_changelog(changelog_df, keys)
)When you call:
Copied!1
output.apply_changelog(changelog_df, ["id"])
the keys
argument specifies the identifier columns for the destination table. These columns must match the identifier columns in your changelog dataframe. The destination table must also be uniquely keyed by these columns.
If there are multiple rows in the destination table that match the same key, or if the changelog dataframe contains duplicates for the key, apply_changelog
will fail:
Exception: apply_changelog failed: duplicate keys detected in changelog dataframe for identifier columns ['id']
or
Exception: apply_changelog failed: multiple rows in destination table match key ['id']
Always ensure both your changelog dataframe and your destination table are deduplicated on the identifier columns before calling apply_changelog
.
If you want to use the identifier changelog for its performance and semantics, you must deduplicate your data before calling apply_changelog
. For example:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from pyspark.sql import functions as F from pyspark.sql.window import Window # Group rows by identifiers and order by change ordinal such that the latest # row comes first (`_row_number == 1`). This allows us to filter a net changes # changelog to only keep the latest row per identifier. window = Window.partitionBy("id").orderBy(F.col("_change_ordinal").desc()) deduped_df = ( changelog_df .withColumn("_row_number", F.row_number().over(window)) .filter("_row_number = 1") .drop("_row_number") ) output.apply_changelog(deduped_df, ["id"])
If you skip this step and there are duplicates, apply_changelog
will fail as shown above.
apply_changelog
to avoid errors.For further reading, see the Iceberg documentation on changelog views ↗.