diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 9221cca7ff..dbea3026b1 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -231,15 +231,20 @@ class TWriteDispositionDict(TypedDict): disposition: TWriteDisposition -class TMergeDispositionDict(TWriteDispositionDict, total=False): +class TMergeDispositionDict(TWriteDispositionDict): strategy: Optional[TLoaderMergeStrategy] + + +class TScd2StrategyDict(TMergeDispositionDict, total=False): validity_column_names: Optional[List[str]] active_record_timestamp: Optional[TAnyDateTime] boundary_timestamp: Optional[TAnyDateTime] row_version_column_name: Optional[str] -TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict] +TWriteDispositionConfig = Union[ + TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict +] class _TTableSchemaBase(TTableProcessingHints, total=False): diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 04078dd510..72611a9568 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql( sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""") return sql, temp_table_name + @classmethod + def gen_concat_sql(cls, columns: Sequence[str]) -> str: + # Athena requires explicit casting + columns = [f"CAST({c} AS VARCHAR)" for c in columns] + return f"CONCAT({', '.join(columns)})" + @classmethod def requires_temp_table_for_delete(cls) -> bool: return True diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 2407d2db62..ae27213a7c 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -339,6 +339,10 @@ def gen_delete_from_sql( ); """ + @classmethod + def gen_concat_sql(cls, columns: Sequence[str]) -> str: + return f"CONCAT({', '.join(columns)})" + @classmethod def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str: """Trims identifier to max length supported by sql_client. Used for dynamically constructed table names""" @@ -755,19 +759,35 @@ def gen_scd2_sql( active_record_timestamp = get_active_record_timestamp(root_table) if active_record_timestamp is None: active_record_literal = "NULL" - is_active_clause = f"{to} IS NULL" + is_active = f"{to} IS NULL" else: # it's a datetime active_record_literal = format_datetime_literal( active_record_timestamp, caps.timestamp_precision ) - is_active_clause = f"{to} = {active_record_literal}" + is_active = f"{to} = {active_record_literal}" - # retire updated and deleted records - sql.append(f""" + # retire records: + # - no `merge_key`: retire all absent records + # - yes `merge_key`: retire those absent records whose `merge_key` + # is present in staging data + retire_sql = f""" {cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal} - WHERE {is_active_clause} + WHERE {is_active} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); - """) + """ + merge_keys = cls._escape_list( + get_columns_names_with_prop(root_table, "merge_key"), + escape_column_id, + ) + if len(merge_keys) > 0: + if len(merge_keys) == 1: + key = merge_keys[0] + else: + key = cls.gen_concat_sql(merge_keys) # compound key + key_present = f"{key} IN (SELECT {key} FROM {staging_root_table_name})" + retire_sql = retire_sql.rstrip()[:-1] # remove semicolon + retire_sql += f" AND {key_present};" + sql.append(retire_sql) # insert new active records in root table columns = map(escape_column_id, list(root_table["columns"].keys())) @@ -776,7 +796,7 @@ def gen_scd2_sql( INSERT INTO {root_table_name} ({col_str}, {from_}, {to}) SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to} FROM {staging_root_table_name} AS s - WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause}); + WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active}); """) # insert list elements for new active records in nested tables diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 037ebbddf9..2774e17353 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -12,6 +12,7 @@ TTableSchemaColumns, TWriteDispositionConfig, TMergeDispositionDict, + TScd2StrategyDict, TAnySchemaColumns, TTableFormat, TSchemaContract, @@ -352,7 +353,7 @@ def _set_hints( self, hints_template: TResourceHints, create_table_variant: bool = False ) -> None: DltResourceHints.validate_dynamic_hints(hints_template) - DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition")) + DltResourceHints.validate_write_disposition_hint(hints_template) if create_table_variant: table_name: str = hints_template["name"] # type: ignore[assignment] # incremental cannot be specified in variant @@ -452,10 +453,11 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: md_dict: TMergeDispositionDict = dict_.pop("write_disposition") if merge_strategy := md_dict.get("strategy"): dict_["x-merge-strategy"] = merge_strategy - if "boundary_timestamp" in md_dict: - dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] - # add columns for `scd2` merge strategy + if merge_strategy == "scd2": + md_dict = cast(TScd2StrategyDict, md_dict) + if "boundary_timestamp" in md_dict: + dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"] if md_dict.get("validity_column_names") is None: from_, to = DEFAULT_VALIDITY_COLUMN_NAMES else: @@ -514,7 +516,8 @@ def validate_dynamic_hints(template: TResourceHints) -> None: ) @staticmethod - def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConfig]) -> None: + def validate_write_disposition_hint(template: TResourceHints) -> None: + wd = template.get("write_disposition") if isinstance(wd, dict) and wd["disposition"] == "merge": wd = cast(TMergeDispositionDict, wd) if "strategy" in wd and wd["strategy"] not in MERGE_STRATEGIES: @@ -523,13 +526,18 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}.""" ) - for ts in ("active_record_timestamp", "boundary_timestamp"): - if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None: - continue # None is allowed for active_record_timestamp - if ts in wd: - try: - ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] - except Exception: - raise ValueError( - f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] - ) + if wd.get("strategy") == "scd2": + wd = cast(TScd2StrategyDict, wd) + for ts in ("active_record_timestamp", "boundary_timestamp"): + if ( + ts == "active_record_timestamp" + and wd.get("active_record_timestamp") is None + ): + continue # None is allowed for active_record_timestamp + if ts in wd: + try: + ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required] + except Exception: + raise ValueError( + f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required] + ) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 88f009e3c2..c8f92cf154 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -223,7 +223,7 @@ info = pipeline.run(fb_ads.with_resources("ads"), write_disposition="merge") In the example above, we enforce the root key propagation with `fb_ads.root_key = True`. This ensures that the correct data is propagated on the initial `replace` load so the future `merge` load can be executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`. ### `scd2` strategy -`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as a surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g., 9999-12-31 00:00:00.000000) instead. +`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. By default, the resource is expected to provide a full extract of the source table each run, but [incremental extracts](#example-incremental-scd2) are also possible. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead. :::note The `unique` hint for `_dlt_id` in the root table is set to `false` when using `scd2`. This differs from [default behavior](./destination-tables.md#child-and-parent-tables). The reason is that the surrogate key stored in `_dlt_id` contains duplicates after an _insert-delete-reinsert_ pattern: @@ -300,6 +300,131 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403 | 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 | | 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 | +#### Example: incremental `scd2` +A `merge_key` can be provided to work with incremental extracts instead of full extracts. The `merge_key` lets you define which absent rows are considered "deleted". Compound natural keys are allowed and can be specified by providing a list of column names as `merge_key`. + +*Case 1: do not retire absent records* + +You can set the natural key as `merge_key` to prevent retirement of absent rows. In this case you don't consider any absent row deleted. Records are not retired in the destination if their corresponding natural keys are not present in the source extract. This allows for incremental extracts that only contain updated records. + +```py +@dlt.resource( + merge_key="customer_key", + write_disposition={"disposition": "merge", "strategy": "scd2"} +) +def dim_customer(): + # initial load + yield [ + {"customer_key": 1, "c1": "foo", "c2": 1}, + {"customer_key": 2, "c1": "bar", "c2": 2} + ] + +pipeline.run(dim_customer()) # first run — 2024-04-09 18:27:53.734235 +... +``` +*`dim_customer` destination table after first run:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` | +| -- | -- | -- | -- | -- | +| 2024-04-09 18:27:53.734235 | NULL | 1 | foo | 1 | +| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 | + +```py +... +def dim_customer(): + # second load — record for customer_key 1 got updated, customer_key 2 absent + yield [ + {"customer_key": 1, "c1": "foo_updated", "c2": 1}, +] + +pipeline.run(dim_customer()) # second run — 2024-04-09 22:13:07.943703 +``` + +*`dim_customer` destination table after second run—customer key 2 was not retired:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` | +| -- | -- | -- | -- | -- | +| 2024-04-09 18:27:53.734235 | **2024-04-09 22:13:07.943703** | 1 | foo | 1 | +| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 | +| **2024-04-09 22:13:07.943703** | **NULL** | **1** | **foo_updated** | **1** | + +*Case 2: only retire records for given partitions* + +:::note +Technically this is not SCD2 because the key used to merge records is not a natural key. +::: + +You can set a "partition" column as `merge_key` to retire absent rows for given partitions. In this case you only consider absent rows deleted if their partition value is present in the extract. Physical partitioning of the table is not required—the word "partition" is used conceptually here. + +```py +@dlt.resource( + merge_key="date", + write_disposition={"disposition": "merge", "strategy": "scd2"} +) +def some_data(): + # load 1 — "2024-01-01" partition + yield [ + {"date": "2024-01-01", "name": "a"}, + {"date": "2024-01-01", "name": "b"}, + ] + +pipeline.run(some_data()) # first run — 2024-01-02 03:03:35.854305 +... +``` + +*`some_data` destination table after first run:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` | +| -- | -- | -- | -- | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | b | + +```py +... +def some_data(): + # load 2 — "2024-01-02" partition + yield [ + {"date": "2024-01-02", "name": "c"}, + {"date": "2024-01-02", "name": "d"}, + ] + +pipeline.run(some_data()) # second run — 2024-01-03 03:01:11.943703 +... +``` + +*`some_data` destination table after second run—added 2024-01-02 records, did not touch 2024-01-01 records:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` | +| -- | -- | -- | -- | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | b | +| **2024-01-03 03:01:11.943703** | **NULL** | **2024-01-02** | **c** | +| **2024-01-03 03:01:11.943703** | **NULL** | **2024-01-02** | **d** | + +```py +... +def some_data(): + # load 3 — reload "2024-01-01" partition + yield [ + {"date": "2024-01-01", "name": "a"}, # unchanged + {"date": "2024-01-01", "name": "bb"}, # new + ] + +pipeline.run(some_data()) # third run — 2024-01-03 10:30:05.750356 +... +``` + +*`some_data` destination table after third run—retired b, added bb, did not touch 2024-01-02 partition:* + +| `_dlt_valid_from` | `_dlt_valid_to` | `date` | `name` | +| -- | -- | -- | -- | +| 2024-01-02 03:03:35.854305 | NULL | 2024-01-01 | a | +| 2024-01-02 03:03:35.854305 | **2024-01-03 10:30:05.750356** | 2024-01-01 | b | +| 2024-01-03 03:01:11.943703 | NULL | 2024-01-02 | c | +| 2024-01-03 03:01:11.943703 | NULL | 2024-01-02 | d | +| **2024-01-03 10:30:05.750356** | **NULL** | **2024-01-01** | **bb** | + + #### Example: configure validity column names `_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows: ```py diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 0ecbbea89d..3f8ccfc20f 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -334,8 +334,8 @@ def test_typeddict_friendly_exceptions() -> None: wrong_dict["write_disposition"] = {"strategy": "scd2"} validate_dict(EndpointResource, wrong_dict, ".") print(e.value) - # Union of 3 types and callable - assert len(e.value.nested_exceptions) == 4 + # Union of 4 types and callable + assert len(e.value.nested_exceptions) == 5 # this has wrong disposition string with pytest.raises(DictValidationException) as e: @@ -343,8 +343,8 @@ def test_typeddict_friendly_exceptions() -> None: wrong_dict["write_disposition"] = "unknown" # type: ignore[assignment] validate_dict(EndpointResource, wrong_dict, ".") print(e.value) - # Union of 3 types and callable - assert len(e.value.nested_exceptions) == 4 + # Union of 4 types and callable + assert len(e.value.nested_exceptions) == 5 # this has wrong nested type with pytest.raises(DictValidationException) as e: diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index c75ff4d3e6..3e08b792ed 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -9,13 +9,12 @@ from dlt.common.typing import TAnyDateTime from dlt.common.pendulum import pendulum from dlt.common.pipeline import LoadInfo -from dlt.common.schema.exceptions import ColumnNameConflictException +from dlt.common.data_types.typing import TDataType from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES from dlt.common.normalizers.json.relational import DataItemNormalizer from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention from dlt.common.time import ensure_pendulum_datetime, reduce_pendulum_datetime_precision from dlt.extract.resource import DltResource -from dlt.pipeline.exceptions import PipelineStepFailed from tests.cases import arrow_table_all_data_types from tests.load.utils import ( @@ -32,6 +31,7 @@ from tests.utils import TPythonTableFormat get_row_hash = DataItemNormalizer.get_row_hash +FROM, TO = DEFAULT_VALIDITY_COLUMN_NAMES def get_load_package_created_at(pipeline: dlt.Pipeline, load_info: LoadInfo) -> datetime: @@ -74,40 +74,21 @@ def get_table( @pytest.mark.essential @pytest.mark.parametrize( - "destination_config,simple,validity_column_names,active_record_timestamp", - # test basic cases for alle SQL destinations supporting merge - [ - (dconf, True, None, None) - for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) - ] - + [ - (dconf, True, None, pendulum.DateTime(2099, 12, 31, 22, 2, 59)) # arbitrary timestamp - for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) - ] - + [ # test nested columns and validity column name configuration only for postgres and duckdb - (dconf, False, ["from", "to"], None) - for dconf in destinations_configs(default_sql_configs=True, subset=["postgres", "duckdb"]) - ] - + [ - (dconf, False, ["ValidFrom", "ValidTo"], None) - for dconf in destinations_configs(default_sql_configs=True, subset=["postgres", "duckdb"]) - ], - ids=lambda x: ( - x.name - if isinstance(x, DestinationTestConfiguration) - else (x[0] + "-" + x[1] if isinstance(x, list) else x) - ), + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize( + "validity_column_names", + [None, ["from", "to"], ["ValidFrom", "ValidTo"]], + ids=lambda x: x[0] + "-" + x[1] if isinstance(x, list) else x, ) def test_core_functionality( destination_config: DestinationTestConfiguration, - simple: bool, validity_column_names: List[str], - active_record_timestamp: Optional[pendulum.DateTime], ) -> None: - # somehow destination_config comes through as ParameterSet instead of - # DestinationTestConfiguration - destination_config = destination_config.values[0] # type: ignore[attr-defined] - + if validity_column_names is not None and destination_config.destination_type != "postgres": + pytest.skip("test `validity_column_names` configuration only for `postgres`") p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( @@ -116,7 +97,6 @@ def test_core_functionality( "disposition": "merge", "strategy": "scd2", "validity_column_names": validity_column_names, - "active_record_timestamp": active_record_timestamp, }, ) def r(data): @@ -131,8 +111,8 @@ def r(data): # load 1 — initial load dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo" if simple else {"nc1": "foo"}}, - {"nk": 2, "c1": "bar", "c2": "bar" if simple else {"nc1": "bar"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo"}}, + {"nk": 2, "c1": "bar", "c2": {"nc1": "bar"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) assert_load_info(info) @@ -148,93 +128,92 @@ def r(data): # assert load results ts_1 = get_load_package_created_at(p, info) assert_load_info(info) - cname = "c2" if simple else "c2__nc1" - assert get_table(p, "dim_test", cname) == [ + assert get_table(p, "dim_test", "c2__nc1") == [ { from_: ts_1, - to: active_record_timestamp, + to: None, "nk": 2, "c1": "bar", - cname: "bar", + "c2__nc1": "bar", }, { from_: ts_1, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo", + "c2__nc1": "foo", }, ] # load 2 — update a record dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo_updated" if simple else {"nc1": "foo_updated"}}, - {"nk": 2, "c1": "bar", "c2": "bar" if simple else {"nc1": "bar"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo_updated"}}, + {"nk": 2, "c1": "bar", "c2": {"nc1": "bar"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) ts_2 = get_load_package_created_at(p, info) assert_load_info(info) - assert get_table(p, "dim_test", cname) == [ + assert get_table(p, "dim_test", "c2__nc1") == [ { from_: ts_1, - to: active_record_timestamp, + to: None, "nk": 2, "c1": "bar", - cname: "bar", + "c2__nc1": "bar", }, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "c2__nc1": "foo"}, { from_: ts_2, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo_updated", + "c2__nc1": "foo_updated", }, ] # load 3 — delete a record dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo_updated" if simple else {"nc1": "foo_updated"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo_updated"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) ts_3 = get_load_package_created_at(p, info) assert_load_info(info) - assert get_table(p, "dim_test", cname) == [ - {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + assert get_table(p, "dim_test", "c2__nc1") == [ + {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", "c2__nc1": "bar"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "c2__nc1": "foo"}, { from_: ts_2, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo_updated", + "c2__nc1": "foo_updated", }, ] # load 4 — insert a record dim_snap = [ - {"nk": 1, "c1": "foo", "c2": "foo_updated" if simple else {"nc1": "foo_updated"}}, - {"nk": 3, "c1": "baz", "c2": "baz" if simple else {"nc1": "baz"}}, + {"nk": 1, "c1": "foo", "c2": {"nc1": "foo_updated"}}, + {"nk": 3, "c1": "baz", "c2": {"nc1": "baz"}}, ] info = p.run(r(dim_snap), **destination_config.run_kwargs) ts_4 = get_load_package_created_at(p, info) assert_load_info(info) - assert get_table(p, "dim_test", cname) == [ - {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, + assert get_table(p, "dim_test", "c2__nc1") == [ + {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", "c2__nc1": "bar"}, { from_: ts_4, - to: active_record_timestamp, + to: None, "nk": 3, "c1": "baz", - cname: "baz", + "c2__nc1": "baz", }, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, + {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "c2__nc1": "foo"}, { from_: ts_2, - to: active_record_timestamp, + to: None, "nk": 1, "c1": "foo", - cname: "foo_updated", + "c2__nc1": "foo_updated", }, ] @@ -255,9 +234,6 @@ def test_child_table(destination_config: DestinationTestConfiguration, simple: b def r(data): yield data - # get validity column names - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES - # load 1 — initial load dim_snap: List[Dict[str, Any]] = [ l1_1 := {"nk": 1, "c1": "foo", "c2": [1] if simple else [{"cc1": 1}]}, @@ -267,8 +243,8 @@ def r(data): ts_1 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, - {from_: ts_1, to: None, "nk": 1, "c1": "foo"}, + {FROM: ts_1, TO: None, "nk": 2, "c1": "bar"}, + {FROM: ts_1, TO: None, "nk": 1, "c1": "foo"}, ] cname = "value" if simple else "cc1" assert get_table(p, "dim_test__c2", cname) == [ @@ -286,9 +262,9 @@ def r(data): ts_2 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, # updated - {from_: ts_2, to: None, "nk": 1, "c1": "foo_updated"}, # new + {FROM: ts_1, TO: None, "nk": 2, "c1": "bar"}, + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, # updated + {FROM: ts_2, TO: None, "nk": 1, "c1": "foo_updated"}, # new ] assert_records_as_set( get_table(p, "dim_test__c2"), @@ -315,10 +291,10 @@ def r(data): assert_records_as_set( get_table(p, "dim_test"), [ - {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, - {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, # updated - {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, # new + {FROM: ts_1, TO: None, "nk": 2, "c1": "bar"}, + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, + {FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, # updated + {FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, # new ], ) exp_3 = [ @@ -341,10 +317,10 @@ def r(data): assert_records_as_set( get_table(p, "dim_test"), [ - {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, # updated - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, - {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, - {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_1, TO: ts_4, "nk": 2, "c1": "bar"}, # updated + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, + {FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, ], ) assert_records_as_set( @@ -362,11 +338,11 @@ def r(data): assert_records_as_set( get_table(p, "dim_test"), [ - {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, - {from_: ts_5, to: None, "nk": 3, "c1": "baz"}, # new - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, - {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, - {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_1, TO: ts_4, "nk": 2, "c1": "bar"}, + {FROM: ts_5, TO: None, "nk": 3, "c1": "baz"}, # new + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, + {FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, + {FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, ], ) assert_records_as_set( @@ -519,13 +495,12 @@ def r(data): ts_3 = get_load_package_created_at(p, info) # assert parent records - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES r1_no_child = {k: v for k, v in r1.items() if k != "child"} r2_no_child = {k: v for k, v in r2.items() if k != "child"} expected = [ - {**{from_: ts_1, to: ts_2}, **r1_no_child}, - {**{from_: ts_3, to: None}, **r1_no_child}, - {**{from_: ts_1, to: None}, **r2_no_child}, + {**{FROM: ts_1, TO: ts_2}, **r1_no_child}, + {**{FROM: ts_3, TO: None}, **r1_no_child}, + {**{FROM: ts_1, TO: None}, **r2_no_child}, ] assert_records_as_set(get_table(p, "dim_test"), expected) @@ -653,10 +628,9 @@ def r(data): info = p.run(r(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 2 - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES expected = [ - {**{from_: strip_timezone(ts1), to: None}, **l1_1}, - {**{from_: strip_timezone(ts1), to: None}, **l1_2}, + {**{FROM: strip_timezone(ts1), TO: None}, **l1_1}, + {**{FROM: strip_timezone(ts1), TO: None}, **l1_2}, ] assert get_table(p, "dim_test", "nk") == expected @@ -677,10 +651,10 @@ def r(data): assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 4 expected = [ - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_1}, # retired - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_2}, # retired - {**{from_: strip_timezone(ts2), to: None}, **l2_1}, # new - {**{from_: strip_timezone(ts2), to: None}, **l2_3}, # new + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # retired + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # retired + {**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # new + {**{FROM: strip_timezone(ts2), TO: None}, **l2_3}, # new ] assert_records_as_set(get_table(p, "dim_test"), expected) @@ -699,10 +673,10 @@ def r(data): assert_load_info(info) assert load_table_counts(p, "dim_test")["dim_test"] == 4 expected = [ - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_1}, # unchanged - {**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_2}, # unchanged - {**{from_: strip_timezone(ts2), to: None}, **l2_1}, # unchanged - {**{from_: strip_timezone(ts2), to: strip_timezone(ts3)}, **l2_3}, # retired + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged + {**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged + {**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # unchanged + {**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)}, **l2_3}, # retired ] assert_records_as_set(get_table(p, "dim_test"), expected) @@ -717,6 +691,196 @@ def r(data): ) +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +def test_merge_key_natural_key( + destination_config: DestinationTestConfiguration, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) + + @dlt.resource( + merge_key="nk", + write_disposition={"disposition": "merge", "strategy": "scd2"}, + ) + def dim_test(data): + yield data + + # load 1 — initial load + dim_snap = [ + {"nk": 1, "foo": "foo"}, + {"nk": 2, "foo": "foo"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + # both records should be active (i.e. not retired) + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None] + + # load 2 — natural key 2 is absent, natural key 1 is unchanged + dim_snap = [ + {"nk": 1, "foo": "foo"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + # both records should still be active + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None] + + # load 3 — natural key 2 is absent, natural key 1 has changed + dim_snap = [ + {"nk": 1, "foo": "bar"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 3 + ts3 = get_load_package_created_at(p, info) + # natural key 1 should now have two records (one retired, one active) + actual = [{k: v for k, v in row.items() if k in ("nk", TO)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: None}, {"nk": 2, TO: None}] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + # load 4 — natural key 2 is absent, natural key 1 has changed back to + # initial version + dim_snap = [ + {"nk": 1, "foo": "foo"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 4 + ts4 = get_load_package_created_at(p, info) + # natural key 1 should now have three records (two retired, one active) + actual = [{k: v for k, v in row.items() if k in ("nk", TO)} for row in get_table(p, "dim_test")] + expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: ts4}, {"nk": 1, TO: None}, {"nk": 2, TO: None}] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize("key_type", ("text", "bigint")) +def test_merge_key_compound_natural_key( + destination_config: DestinationTestConfiguration, + key_type: TDataType, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) + + @dlt.resource( + merge_key=["first_name", "last_name"], + write_disposition={"disposition": "merge", "strategy": "scd2"}, + ) + def dim_test_compound(data): + yield data + + # vary `first_name` type to test mixed compound `merge_key` + if key_type == "text": + first_name = "John" + elif key_type == "bigint": + first_name = 1 # type: ignore[assignment] + # load 1 — initial load + dim_snap = [ + {"first_name": first_name, "last_name": "Doe", "age": 20}, + {"first_name": first_name, "last_name": "Dodo", "age": 20}, + ] + info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2 + # both records should be active (i.e. not retired) + assert [row[TO] for row in get_table(p, "dim_test_compound")] == [None, None] + + # load 2 — "Dodo" is absent, "Doe" has changed + dim_snap = [ + {"first_name": first_name, "last_name": "Doe", "age": 30}, + ] + info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 3 + ts3 = get_load_package_created_at(p, info) + # "Doe" should now have two records (one retired, one active) + actual = [ + {k: v for k, v in row.items() if k in ("first_name", "last_name", TO)} + for row in get_table(p, "dim_test_compound") + ] + expected = [ + {"first_name": first_name, "last_name": "Doe", TO: ts3}, + {"first_name": first_name, "last_name": "Doe", TO: None}, + {"first_name": first_name, "last_name": "Dodo", TO: None}, + ] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +def test_merge_key_partition( + destination_config: DestinationTestConfiguration, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) + + @dlt.resource( + merge_key="date", + write_disposition={"disposition": "merge", "strategy": "scd2"}, + ) + def dim_test(data): + yield data + + # load 1 — "2024-01-01" partition + dim_snap = [ + {"date": "2024-01-01", "name": "a"}, + {"date": "2024-01-01", "name": "b"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 2 + # both records should be active (i.e. not retired) + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None] + + # load 2 — "2024-01-02" partition + dim_snap = [ + {"date": "2024-01-02", "name": "c"}, + {"date": "2024-01-02", "name": "d"}, + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(p, "dim_test")["dim_test"] == 4 + # two "2024-01-01" records should be untouched, two "2024-01-02" records should + # be added + assert [row[TO] for row in get_table(p, "dim_test")] == [None, None, None, None] + + # load 3 — reload "2024-01-01" partition + dim_snap = [ + {"date": "2024-01-01", "name": "a"}, # unchanged + {"date": "2024-01-01", "name": "bb"}, # new + ] + info = p.run(dim_test(dim_snap), **destination_config.run_kwargs) + assert_load_info(info) + # "b" should be retired, "bb" should be added, "2024-01-02" partition + # should be untouched + assert load_table_counts(p, "dim_test")["dim_test"] == 5 + ts2 = get_load_package_created_at(p, info) + actual = [ + {k: v for k, v in row.items() if k in ("date", "name", TO)} + for row in get_table(p, "dim_test") + ] + expected = [ + {"date": "2024-01-01", "name": "a", TO: None}, + {"date": "2024-01-01", "name": "b", TO: ts2}, + {"date": "2024-01-01", "name": "bb", TO: None}, + {"date": "2024-01-02", "name": "c", TO: None}, + {"date": "2024-01-02", "name": "d", TO: None}, + ] + assert_records_as_set(actual, expected) # type: ignore[arg-type] + + @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), @@ -750,9 +914,8 @@ def _make_scd2_r(table_: Any) -> DltResource: # make sure we have scd2 columns in schema table_schema = p.default_schema.get_table("tabular") assert table_schema["x-merge-strategy"] == "scd2" # type: ignore[typeddict-item] - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES - assert table_schema["columns"][from_]["x-valid-from"] # type: ignore[typeddict-item] - assert table_schema["columns"][to]["x-valid-to"] # type: ignore[typeddict-item] + assert table_schema["columns"][FROM]["x-valid-from"] # type: ignore[typeddict-item] + assert table_schema["columns"][TO]["x-valid-to"] # type: ignore[typeddict-item] assert table_schema["columns"]["row_hash"]["x-row-version"] # type: ignore[typeddict-item] # 100 items in destination assert load_table_counts(p, "tabular")["tabular"] == 100 @@ -816,13 +979,12 @@ def r(data): ts_2 = get_load_package_created_at(p, info) # assert load results - from_, to = DEFAULT_VALIDITY_COLUMN_NAMES assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: ts_2, "nk": 2, "c1": "bar", "row_hash": "mocked_hash_2"}, - {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"}, + {FROM: ts_1, TO: ts_2, "nk": 2, "c1": "bar", "row_hash": "mocked_hash_2"}, + {FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"}, { - from_: ts_2, - to: None, + FROM: ts_2, + TO: None, "nk": 1, "c1": "foo_upd", "row_hash": "mocked_hash_1_upd",