Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incremental scd2 with merge_key #1818

Merged
merged 19 commits into from
Sep 29, 2024
Merged

incremental scd2 with merge_key #1818

merged 19 commits into from
Sep 29, 2024

Conversation

jorritsandbrink
Copy link
Collaborator

@jorritsandbrink jorritsandbrink commented Sep 16, 2024

Description

Adds merge_key support for scd2 to enable incremental extracts.

When a merge_key is provided, absent records are only retired if their merge_key value is present in the source extract.

@dlt.resource(
    merge_key="my_natural_key",
    write_disposition={"disposition": "merge", "strategy": "scd2"}
)
def my_incremental_dim_data():
    ...

Related Issues

Closes #1789

@jorritsandbrink jorritsandbrink added the enhancement New feature or request label Sep 16, 2024
@jorritsandbrink jorritsandbrink linked an issue Sep 16, 2024 that may be closed by this pull request
Copy link

netlify bot commented Sep 16, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 84c101b
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/66f7cfb2d32b140008588a65
😎 Deploy Preview https://deploy-preview-1818--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@jorritsandbrink jorritsandbrink marked this pull request as ready for review September 17, 2024 15:09
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 3
boundary_ts = get_load_package_created_at(p, info)
# natural key 1 should now have two records (one retired, one active)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also test here that when the row with natural key 1 reverts back to the first version (and has the same row hash as it had in the first load) it will still create a new entry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]
retire_if_absent: Optional[bool]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call this "retire_absent_rows"? then it is a bit more clear just from the naming what it does. wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Changed it.

@jorritsandbrink jorritsandbrink changed the title incremental scd2 with retire_if_absent flag incremental scd2 with retire_absent_rows flag Sep 19, 2024
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]
retire_absent_rows: Optional[bool]
natural_key: Optional[str]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer needed, is it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. Removed now.

dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
if "retire_absent_rows" in md_dict:
dict_["x-retire-absent-rows"] = md_dict["retire_absent_rows"]
if "natural_key" in md_dict:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also no longer needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also removed

...
...
```
Using this setting, records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. You need to specify the natural key as `merge_key` when `retire_absent_rows` is `False`. Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix we are using only the merge_key as opposed to a combination of primary key and merge key here, let us know if this is ok.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorritsandbrink @sh-rp I think there's some kind of conceptual confusion. merge_key is correct but it is not a natural key for the source data. it is the same merge_key we have in delete-insert strategy.

Example 1: I load data day by day. I set the merge key to day column. I load the same day twice. Then I retire records only from the given day.
Example 2: I set the merge key to the same value as primary (natural) key (or to the content hash if no natural key is present). Then I have the behavior as I'd set the retire_absent_rows to False. (which is not needed anymore btw.)
Example 3: I update chunked documents and use merge key to be doc_id. Then I retire all the missing chunks for those doc ids, not touching the others.

My take:

  1. drop the flag
  2. if the merge key is present, always limit the set of records to retire
  3. update the documentation to explain two basic cases (do not retire deleted records at all, retire deleted records only for given partition)

WDYT? IMO this is really powerful functionality if done this way

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right :) I'll adapt per your suggestions.

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small changes requested and one question that @rudolfix should answer.

@jorritsandbrink jorritsandbrink changed the title incremental scd2 with retire_absent_rows flag incremental scd2 with merge_key Sep 28, 2024
@jorritsandbrink
Copy link
Collaborator Author

@rudolfix I've adapted the code per your suggestions. Can you review?

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so good! thanks!

@rudolfix rudolfix merged commit cb45046 into devel Sep 29, 2024
60 of 61 checks passed
@rudolfix rudolfix deleted the feat/1789-scd2-incremental branch September 29, 2024 19:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Support incremental loading for scd2
3 participants