Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incremental scd2 with merge_key #1818

Merged
merged 19 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,20 @@ class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition


class TMergeDispositionDict(TWriteDispositionDict, total=False):
class TMergeDispositionDict(TWriteDispositionDict):
strategy: Optional[TLoaderMergeStrategy]


class TScd2StrategyDict(TMergeDispositionDict, total=False):
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]


TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict]
TWriteDispositionConfig = Union[
TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict
]


class _TTableSchemaBase(TTableProcessingHints, total=False):
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql(
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
return sql, temp_table_name

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
# Athena requires explicit casting
columns = [f"CAST({c} AS VARCHAR)" for c in columns]
return f"CONCAT({', '.join(columns)})"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
return True
Expand Down
34 changes: 27 additions & 7 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ def gen_delete_from_sql(
);
"""

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
return f"CONCAT({', '.join(columns)})"

@classmethod
def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str:
"""Trims identifier to max length supported by sql_client. Used for dynamically constructed table names"""
Expand Down Expand Up @@ -755,19 +759,35 @@ def gen_scd2_sql(
active_record_timestamp = get_active_record_timestamp(root_table)
if active_record_timestamp is None:
active_record_literal = "NULL"
is_active_clause = f"{to} IS NULL"
is_active = f"{to} IS NULL"
else: # it's a datetime
active_record_literal = format_datetime_literal(
active_record_timestamp, caps.timestamp_precision
)
is_active_clause = f"{to} = {active_record_literal}"
is_active = f"{to} = {active_record_literal}"

# retire updated and deleted records
sql.append(f"""
# retire records:
# - no `merge_key`: retire all absent records
# - yes `merge_key`: retire those absent records whose `merge_key`
# is present in staging data
retire_sql = f"""
{cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal}
WHERE {is_active_clause}
WHERE {is_active}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")
"""
merge_keys = cls._escape_list(
get_columns_names_with_prop(root_table, "merge_key"),
escape_column_id,
)
if len(merge_keys) > 0:
if len(merge_keys) == 1:
key = merge_keys[0]
else:
key = cls.gen_concat_sql(merge_keys) # compound key
key_present = f"{key} IN (SELECT {key} FROM {staging_root_table_name})"
retire_sql = retire_sql.rstrip()[:-1] # remove semicolon
retire_sql += f" AND {key_present};"
sql.append(retire_sql)

# insert new active records in root table
columns = map(escape_column_id, list(root_table["columns"].keys()))
Expand All @@ -776,7 +796,7 @@ def gen_scd2_sql(
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to}
FROM {staging_root_table_name} AS s
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause});
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active});
""")

# insert list elements for new active records in nested tables
Expand Down
38 changes: 23 additions & 15 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TTableSchemaColumns,
TWriteDispositionConfig,
TMergeDispositionDict,
TScd2StrategyDict,
TAnySchemaColumns,
TTableFormat,
TSchemaContract,
Expand Down Expand Up @@ -352,7 +353,7 @@ def _set_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
DltResourceHints.validate_dynamic_hints(hints_template)
DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition"))
DltResourceHints.validate_write_disposition_hint(hints_template)
if create_table_variant:
table_name: str = hints_template["name"] # type: ignore[assignment]
# incremental cannot be specified in variant
Expand Down Expand Up @@ -452,10 +453,11 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
md_dict: TMergeDispositionDict = dict_.pop("write_disposition")
if merge_strategy := md_dict.get("strategy"):
dict_["x-merge-strategy"] = merge_strategy
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
# add columns for `scd2` merge strategy

if merge_strategy == "scd2":
md_dict = cast(TScd2StrategyDict, md_dict)
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
if md_dict.get("validity_column_names") is None:
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
else:
Expand Down Expand Up @@ -514,7 +516,8 @@ def validate_dynamic_hints(template: TResourceHints) -> None:
)

@staticmethod
def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConfig]) -> None:
def validate_write_disposition_hint(template: TResourceHints) -> None:
wd = template.get("write_disposition")
if isinstance(wd, dict) and wd["disposition"] == "merge":
wd = cast(TMergeDispositionDict, wd)
if "strategy" in wd and wd["strategy"] not in MERGE_STRATEGIES:
Expand All @@ -523,13 +526,18 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)

for ts in ("active_record_timestamp", "boundary_timestamp"):
if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None:
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)
if wd.get("strategy") == "scd2":
wd = cast(TScd2StrategyDict, wd)
for ts in ("active_record_timestamp", "boundary_timestamp"):
if (
ts == "active_record_timestamp"
and wd.get("active_record_timestamp") is None
):
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)
127 changes: 126 additions & 1 deletion docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ info = pipeline.run(fb_ads.with_resources("ads"), write_disposition="merge")
In the example above, we enforce the root key propagation with `fb_ads.root_key = True`. This ensures that the correct data is propagated on the initial `replace` load so the future `merge` load can be executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`.

### `scd2` strategy
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as a surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g., 9999-12-31 00:00:00.000000) instead.
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. By default, the resource is expected to provide a full extract of the source table each run, but [incremental extracts](#example-incremental-scd2) are also possible. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead.

:::note
The `unique` hint for `_dlt_id` in the root table is set to `false` when using `scd2`. This differs from [default behavior](./destination-tables.md#child-and-parent-tables). The reason is that the surrogate key stored in `_dlt_id` contains duplicates after an _insert-delete-reinsert_ pattern:
Expand Down Expand Up @@ -300,6 +300,131 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403
| 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 |
| 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 |

#### Example: incremental `scd2`
A `merge_key` can be provided to work with incremental extracts instead of full extracts. The `merge_key` lets you define which absent rows are considered "deleted". Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`.

*Case 1: do not retire absent records*

You can set the natural key as `merge_key` to prevent retirement of absent rows. In this case you don't consider any absent row deleted. Records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records.

```py
@dlt.resource(
merge_key="customer_key",
write_disposition={"disposition": "merge", "strategy": "scd2"}
)
def dim_customer():
# initial load
yield [
{"customer_key": 1, "c1": "foo", "c2": 1},
{"customer_key": 2, "c1": "bar", "c2": 2}
]

pipeline.run(dim_customer()) # first run — 2024-04-09 18:27:53.734235
...
```
*`dim_customer` destination table after first run:*

| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` |
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | NULL | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 |

```py
...
def dim_customer():
# second load — record for customer_key 1 got updated, customer_key 2 absent
yield [
{"customer_key": 1, "c1": "foo_updated", "c2": 1},
]

pipeline.run(dim_customer()) # second run — 2024-04-09 22:13:07.943703
```

*`dim_customer` destination table after second run—customer key 2 was not retired:*

| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` |
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | **2024-04-09 22:13:07.943703** | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 |
| **2024-04-09 22:13:07.943703** | **NULL** | **1** | **foo_updated** | **1** |

*Case 2: only retire records for given partitions*

:::note
Technically this is not SCD2 because the key used to merge records is not a natural key.
:::

You can set a "partition" column as `merge_key` to retire absent rows for given partitions. In this case you only consider absent rows deleted if their partition value is present in the extract. Physical partitioning of the table is not required—the word "partition" is used conceptually here.

```py
@dlt.resource(
merge_key="date",
write_disposition={"disposition": "merge", "strategy": "scd2"}
)
def some_data():
# load 1 — "2024-01-01" partition
yield [
{"date": "2024-01-01", "name": "a"},
{"date": "2024-01-01", "name": "b"},
]

pipeline.run(some_data()) # first run — 2024-01-02 03:03:35.854305
...
```

*`some_data` destination table after first run:*

| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` |
| -- | -- | -- | -- |
| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a |
| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | b |

```py
...
def some_data():
# load 2 — "2024-01-02" partition
yield [
{"date": "2024-01-02", "name": "c"},
{"date": "2024-01-02", "name": "d"},
]

pipeline.run(some_data()) # second run — 2024-01-03 03:01:11.943703
...
```

*`some_data` destination table after second run—added 2024-01-02 records, did not touch 2024-01-01 records:*

| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` |
| -- | -- | -- | -- |
| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a |
| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | b |
| **2024-01-03 03:01:11.943703** | **NULL** | **2024-01-02** | **c** |
| **2024-01-03 03:01:11.943703** | **NULL** | **2024-01-02** | **d** |

```py
...
def some_data():
# load 3 — reload "2024-01-01" partition
yield [
{"date": "2024-01-01", "name": "a"}, # unchanged
{"date": "2024-01-01", "name": "bb"}, # new
]

pipeline.run(some_data()) # third run — 2024-01-03 10:30:05.750356
...
```

*`some_data` destination table after third run—retired b, added bb, did not touch 2024-01-02 partition:*

| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` |
| -- | -- | -- | -- |
| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a |
| 2024-01-02 03:03:35.854305 | **2024-01-03 10:30:05.750356** | 2024-01-01 | b |
| 2024-01-03 03:01:11.943703 | NULL | 2024-01-02 | c |
| 2024-01-03 03:01:11.943703 | NULL | 2024-01-02 | d |
| **2024-01-03 10:30:05.750356** | **NULL** | **2024-01-01** | **bb** |


#### Example: configure validity column names
`_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows:
```py
Expand Down
8 changes: 4 additions & 4 deletions tests/common/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,17 @@ def test_typeddict_friendly_exceptions() -> None:
wrong_dict["write_disposition"] = {"strategy": "scd2"}
validate_dict(EndpointResource, wrong_dict, ".")
print(e.value)
# Union of 3 types and callable
assert len(e.value.nested_exceptions) == 4
# Union of 4 types and callable
assert len(e.value.nested_exceptions) == 5

# this has wrong disposition string
with pytest.raises(DictValidationException) as e:
wrong_dict = deepcopy(valid_dict)
wrong_dict["write_disposition"] = "unknown" # type: ignore[assignment]
validate_dict(EndpointResource, wrong_dict, ".")
print(e.value)
# Union of 3 types and callable
assert len(e.value.nested_exceptions) == 4
# Union of 4 types and callable
assert len(e.value.nested_exceptions) == 5

# this has wrong nested type
with pytest.raises(DictValidationException) as e:
Expand Down
Loading
Loading