Merge loading
Merge loading allows you to update existing data in your destination tables, rather than replacing all data. This approach is ideal when you want to update only specific records without replacing entire tables or to keep the history of data changes.
To perform a merge load, you need to specify the write_disposition as merge on your resource and provide a primary_key or merge_key.
Depending on your use case, you can choose from four different merge strategies.
Merge strategies
delete-insert strategy
The default delete-insert strategy is used in two scenarios:
- You want to keep only one instance of a certain record, i.e., you receive updates of the
userstate from an API and want to keep just one record peruser_id. - You receive data in daily batches, and you want to make sure that you always keep just a single instance of a record for each batch, even in case you load an old batch or load the current batch several times a day (i.e., to receive "live" updates).
The delete-insert strategy loads data to a staging dataset, deduplicates the staging data if a primary_key is provided, deletes the data from the destination using merge_key and primary_key, and then inserts the new records. All of this occurs within a single atomic transaction for the root and all nested tables.
The example below loads all the GitHub events and updates them in the destination using "id" as the primary key, making sure that only a single copy of the event is present in the github_repo_events table:
@dlt.resource(primary_key="id", write_disposition="merge")
def github_repo_events():
yield from _get_event_pages()
Since primary key is a compound property, you can define a composite primary key by providing multiple column names:
@dlt.resource(primary_key=("id", "url"), write_disposition="merge")
def resource():
...
The example below merges on the batch_day column that holds the day for which the given record is valid.
Merge keys also can be compound:
@dlt.resource(merge_key="batch_day", write_disposition="merge")
def get_daily_batch(day):
yield _get_batch_from_bucket(day)
As with any other write disposition, you can use it to load data ad hoc. Below, we load issues with the top reactions for the duckdb repo. The lists obviously contain many overlapping issues, but we want to keep only one instance of each.
p = dlt.pipeline(destination="bigquery", dataset_name="github")
issues = []
reactions = ["%2B1", "-1", "smile", "tada", "thinking_face", "heart", "rocket", "eyes"]
for reaction in reactions:
for page_no in range(1, 3):
page = requests.get(f"https://api.github.com/repos/{REPO_NAME}/issues?state=all&sort=reactions-{reaction}&per_page=100&page={page_no}", headers=headers)
print(f"got page for {reaction} page {page_no}, requests left", page.headers["x-ratelimit-remaining"])
issues.extend(page.json())
p.run(issues, write_disposition="merge", primary_key="id", table_name="issues")
The example below dispatches GitHub events to several tables by event type, keeps one copy of each event by "id", and skips loading past records using a “last value” incremental. As you can see, all of this can be declared directly in the resource.
@dlt.resource(primary_key="id", write_disposition="merge", table_name=lambda i: i['type'])
def github_repo_events(last_created_at = dlt.sources.incremental("created_at", "1970-01-01T00:00:00Z")):
"""A resource taking a stream of github events and dispatching them to tables named by event type. Deduplicates by 'id'. Loads incrementally by 'created_at' """
yield from _get_rest_pages("events")
If you use the merge write disposition, but do not specify merge or primary keys, merge will fallback to append.
The appended data will be inserted from a staging table in one transaction for most destinations in this case.
Control deduplication of staging data
By default, primary_key deduplication is arbitrary. You can pass the dedup_sort column hint with a value of desc or asc to control which record remains after deduplication. With desc, records sharing the same primary_key are sorted in descending order before deduplication, ensuring that the record with the highest value for the column with the dedup_sort hint remains. The asc option applies the opposite behavior.
@dlt.resource(
primary_key="id",
write_disposition="merge",
columns={"created_at": {"dedup_sort": "desc"}} # select "latest" record
)
def resource():
...
Example: deduplication with timestamp based sorting
# Sample data
data = [
{"id": 1, "metadata_modified": "2024-01-01", "value": "A"},
{"id": 1, "metadata_modified": "2024-01-02", "value": "B"},
{"id": 2, "metadata_modified": "2024-01-01", "value": "C"},
{"id": 2, "metadata_modified": "2024-01-01", "value": "D"}, # Same metadata_modified as above
]
# Define the resource with dedup_sort configuration
@dlt.resource(
primary_key='id',
write_disposition='merge',
columns={
"metadata_modified": {"dedup_sort": "desc"}
}
)
def sample_data():
for item in data:
yield item
Output:
| id | metadata_modified | value |
|---|---|---|
| 1 | 2024-01-02 | B |
| 2 | 2024-01-01 | C |
When this resource is executed, the following deduplication rules are applied:
-
For records with different values in the
dedup_sortcolumn:- The record with the highest value is kept when using
desc. - For example, among records with id=1, the one with
"metadata_modified"="2024-01-02"is kept.
- The record with the highest value is kept when using
-
For records with identical values in the
dedup_sortcolumn:- The first occurrence encountered is kept.
- For example, among records with id=2 and identical
"metadata_modified"="2024-01-01", the first record (value="C") is kept.
Disable deduplication
If staging data is already deduplicated (or was always clean) you can disable it. Deduplication is performed by the database backend so you may save some costs:
@dlt.resource(primary_key="id", write_disposition={"disposition": "merge", "strategy": "delete-insert", "deduplicated": True})
def github_repo_events():
yield from _get_event_pages()
Delete records
The hard_delete column hint can be used to delete records from the destination dataset. The behavior of the delete mechanism depends on the data type of the column marked with the hint:
booltype: onlyTrueleads to a delete—NoneandFalsevalues are disregarded.- Other types: each
not Nonevalue leads to a delete.
If the incoming data contains a record marked as deleted, then any existing record in the destination table with the same primary_key or merge_key will be removed.
Deletes are propagated to any nested table that might exist. For each record that gets deleted in the root table, all corresponding records in the nested table(s) will also be deleted. Records in parent and nested tables are linked through the root key that is explained in the next section.
Example: with primary key and boolean delete column
@dlt.resource(
primary_key="id",
write_disposition="merge",
columns={"deleted_flag": {"hard_delete": True}}
)
def resource():
# This will insert a record (assuming a record with id = 1 does not yet exist).
yield {"id": 1, "val": "foo", "deleted_flag": False}
# This will update the record.
yield {"id": 1, "val": "bar", "deleted_flag": None}
# This will delete the record.
yield {"id": 1, "val": "foo", "deleted_flag": True}
# Similarly, this would have also deleted the record.
# Only the key and the column marked with the "hard_delete" hint suffice to delete records.
yield {"id": 1, "deleted_flag": True}
...
Example: with merge key and non-boolean delete column
@dlt.resource(
merge_key="id",
write_disposition="merge",
columns={"deleted_at_ts": {"hard_delete": True}})
def resource():
# This will insert two records.
yield [
{"id": 1, "val": "foo", "deleted_at_ts": None},
{"id": 1, "val": "bar", "deleted_at_ts": None}
]
# This will delete two records.
yield {"id": 1, "val": "foo", "deleted_at_ts": "2024-02-22T12:34:56Z"}
...