Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incremental scd2 with merge_key #1818

Merged
merged 19 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,22 @@ class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition


class TMergeDispositionDict(TWriteDispositionDict, total=False):
class TMergeDispositionDict(TWriteDispositionDict):
strategy: Optional[TLoaderMergeStrategy]


class TScd2StrategyDict(TMergeDispositionDict, total=False):
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]
retire_absent_rows: Optional[bool]
natural_key: Optional[str]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer needed, is it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. Removed now.



TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict]
TWriteDispositionConfig = Union[
TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict
]


class _TTableSchemaBase(TTableProcessingHints, total=False):
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql(
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
return sql, temp_table_name

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
# Athena requires explicit casting
columns = [f"CAST({c} AS VARCHAR)" for c in columns]
return f"CONCAT({', '.join(columns)})"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
return True
Expand Down
34 changes: 27 additions & 7 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ def gen_delete_from_sql(
);
"""

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
return f"CONCAT({', '.join(columns)})"

@classmethod
def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str:
"""Trims identifier to max length supported by sql_client. Used for dynamically constructed table names"""
Expand Down Expand Up @@ -755,19 +759,35 @@ def gen_scd2_sql(
active_record_timestamp = get_active_record_timestamp(root_table)
if active_record_timestamp is None:
active_record_literal = "NULL"
is_active_clause = f"{to} IS NULL"
is_active = f"{to} IS NULL"
else: # it's a datetime
active_record_literal = format_datetime_literal(
active_record_timestamp, caps.timestamp_precision
)
is_active_clause = f"{to} = {active_record_literal}"
is_active = f"{to} = {active_record_literal}"

# retire updated and deleted records
sql.append(f"""
# retire records
# always retire updated records, retire deleted records only if `retire_absent_rows`
retire_sql = f"""
{cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal}
WHERE {is_active_clause}
WHERE {is_active}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")
"""
retire_absent_rows = root_table.get("x-retire-absent-rows", True)
if not retire_absent_rows:
retire_sql = retire_sql.rstrip()[:-1] # remove semicolon
# merge keys act as natural key
merge_keys = cls._escape_list(
get_columns_names_with_prop(root_table, "merge_key"),
escape_column_id,
)
if len(merge_keys) == 1:
nk = merge_keys[0]
else:
nk = cls.gen_concat_sql(merge_keys) # compound key
nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})"
retire_sql += f" AND {nk_present};"
sql.append(retire_sql)

# insert new active records in root table
columns = map(escape_column_id, list(root_table["columns"].keys()))
Expand All @@ -776,7 +796,7 @@ def gen_scd2_sql(
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to}
FROM {staging_root_table_name} AS s
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause});
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active});
""")

# insert list elements for new active records in nested tables
Expand Down
53 changes: 38 additions & 15 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TTableSchemaColumns,
TWriteDispositionConfig,
TMergeDispositionDict,
TScd2StrategyDict,
TAnySchemaColumns,
TTableFormat,
TSchemaContract,
Expand Down Expand Up @@ -352,7 +353,7 @@ def _set_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
DltResourceHints.validate_dynamic_hints(hints_template)
DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition"))
DltResourceHints.validate_write_disposition_hint(hints_template)
if create_table_variant:
table_name: str = hints_template["name"] # type: ignore[assignment]
# incremental cannot be specified in variant
Expand Down Expand Up @@ -452,10 +453,19 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
md_dict: TMergeDispositionDict = dict_.pop("write_disposition")
if merge_strategy := md_dict.get("strategy"):
dict_["x-merge-strategy"] = merge_strategy
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
# add columns for `scd2` merge strategy

if merge_strategy == "scd2":
md_dict = cast(TScd2StrategyDict, md_dict)
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
if "retire_absent_rows" in md_dict:
dict_["x-retire-absent-rows"] = md_dict["retire_absent_rows"]
if "natural_key" in md_dict:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also no longer needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also removed

nk = md_dict["natural_key"]
if nk in dict_["columns"]:
dict_["columns"][nk]["x-natural-key"] = True
else:
dict_["columns"][nk] = {"name": nk, "x-natural-key": True}
if md_dict.get("validity_column_names") is None:
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
else:
Expand Down Expand Up @@ -514,7 +524,8 @@ def validate_dynamic_hints(template: TResourceHints) -> None:
)

@staticmethod
def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConfig]) -> None:
def validate_write_disposition_hint(template: TResourceHints) -> None:
wd = template.get("write_disposition")
if isinstance(wd, dict) and wd["disposition"] == "merge":
wd = cast(TMergeDispositionDict, wd)
if "strategy" in wd and wd["strategy"] not in MERGE_STRATEGIES:
Expand All @@ -523,13 +534,25 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)

for ts in ("active_record_timestamp", "boundary_timestamp"):
if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None:
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)
if wd.get("strategy") == "scd2":
wd = cast(TScd2StrategyDict, wd)
for ts in ("active_record_timestamp", "boundary_timestamp"):
if (
ts == "active_record_timestamp"
and wd.get("active_record_timestamp") is None
):
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)

if (
"retire_absent_rows" in wd
and not wd["retire_absent_rows"]
and template.get("merge_key") is None
):
raise ValueError("`merge_key` is required when `retire_absent_rows=False`")
19 changes: 18 additions & 1 deletion docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ info = pipeline.run(fb_ads.with_resources("ads"), write_disposition="merge")
In the example above, we enforce the root key propagation with `fb_ads.root_key = True`. This ensures that the correct data is propagated on the initial `replace` load so the future `merge` load can be executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`.

### `scd2` strategy
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as a surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g., 9999-12-31 00:00:00.000000) instead.
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. By default, the resource is expected to provide a full extract of the source table each run, but [incremental extracts](#example-incremental-scd2) are also possible. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead.

:::note
The `unique` hint for `_dlt_id` in the root table is set to `false` when using `scd2`. This differs from [default behavior](./destination-tables.md#child-and-parent-tables). The reason is that the surrogate key stored in `_dlt_id` contains duplicates after an _insert-delete-reinsert_ pattern:
Expand Down Expand Up @@ -300,6 +300,23 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403
| 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 |
| 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 |

#### Example: incremental `scd2`
`retire_absent_rows` can be set to `False` to work with incremental extracts instead of full extracts:
```py
@dlt.resource(
merge_key="my_natural_key",
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_absent_rows": False,
}
)
def dim_customer():
...
...
```
Using this setting, records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. You need to specify the natural key as `merge_key` when `retire_absent_rows` is `False`. Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix we are using only the merge_key as opposed to a combination of primary key and merge key here, let us know if this is ok.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorritsandbrink @sh-rp I think there's some kind of conceptual confusion. merge_key is correct but it is not a natural key for the source data. it is the same merge_key we have in delete-insert strategy.

Example 1: I load data day by day. I set the merge key to day column. I load the same day twice. Then I retire records only from the given day.
Example 2: I set the merge key to the same value as primary (natural) key (or to the content hash if no natural key is present). Then I have the behavior as I'd set the retire_absent_rows to False. (which is not needed anymore btw.)
Example 3: I update chunked documents and use merge key to be doc_id. Then I retire all the missing chunks for those doc ids, not touching the others.

My take:

  1. drop the flag
  2. if the merge key is present, always limit the set of records to retire
  3. update the documentation to explain two basic cases (do not retire deleted records at all, retire deleted records only for given partition)

WDYT? IMO this is really powerful functionality if done this way

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right :) I'll adapt per your suggestions.


#### Example: configure validity column names
`_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows:
```py
Expand Down
8 changes: 4 additions & 4 deletions tests/common/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,17 @@ def test_typeddict_friendly_exceptions() -> None:
wrong_dict["write_disposition"] = {"strategy": "scd2"}
validate_dict(EndpointResource, wrong_dict, ".")
print(e.value)
# Union of 3 types and callable
assert len(e.value.nested_exceptions) == 4
# Union of 4 types and callable
assert len(e.value.nested_exceptions) == 5

# this has wrong disposition string
with pytest.raises(DictValidationException) as e:
wrong_dict = deepcopy(valid_dict)
wrong_dict["write_disposition"] = "unknown" # type: ignore[assignment]
validate_dict(EndpointResource, wrong_dict, ".")
print(e.value)
# Union of 3 types and callable
assert len(e.value.nested_exceptions) == 4
# Union of 4 types and callable
assert len(e.value.nested_exceptions) == 5

# this has wrong nested type
with pytest.raises(DictValidationException) as e:
Expand Down
Loading
Loading