Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incremental scd2 with merge_key #1818

Merged
merged 19 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,22 @@ class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition


class TMergeDispositionDict(TWriteDispositionDict, total=False):
class TMergeDispositionDict(TWriteDispositionDict):
strategy: Optional[TLoaderMergeStrategy]


class TScd2StrategyDict(TMergeDispositionDict, total=False):
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]
retire_if_absent: Optional[bool]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call this "retire_absent_rows"? then it is a bit more clear just from the naming what it does. wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Changed it.

natural_key: Optional[str]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer needed, is it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. Removed now.



TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict]
TWriteDispositionConfig = Union[
TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict
]


class _TTableSchemaBase(TTableProcessingHints, total=False):
Expand Down
22 changes: 15 additions & 7 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,19 +755,27 @@ def gen_scd2_sql(
active_record_timestamp = get_active_record_timestamp(root_table)
if active_record_timestamp is None:
active_record_literal = "NULL"
is_active_clause = f"{to} IS NULL"
is_active = f"{to} IS NULL"
else: # it's a datetime
active_record_literal = format_datetime_literal(
active_record_timestamp, caps.timestamp_precision
)
is_active_clause = f"{to} = {active_record_literal}"
is_active = f"{to} = {active_record_literal}"

# retire updated and deleted records
sql.append(f"""
# retire records
# always retire updated records, retire deleted records only if `retire_if_absent`
retire_sql = f"""
{cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_literal}
WHERE {is_active_clause}
WHERE {is_active}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")
"""
retire_if_absent = root_table.get("x-retire-if-absent", True)
if not retire_if_absent:
retire_sql = retire_sql.rstrip()[:-1] # remove semicolon
nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key"))
nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})"
retire_sql += f" AND {nk_present};"
sql.append(retire_sql)

# insert new active records in root table
columns = map(escape_column_id, list(root_table["columns"].keys()))
Expand All @@ -776,7 +784,7 @@ def gen_scd2_sql(
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, {boundary_literal} AS {from_}, {active_record_literal} AS {to}
FROM {staging_root_table_name} AS s
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause});
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active});
""")

# insert list elements for new active records in nested tables
Expand Down
48 changes: 35 additions & 13 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TTableSchemaColumns,
TWriteDispositionConfig,
TMergeDispositionDict,
TScd2StrategyDict,
TAnySchemaColumns,
TTableFormat,
TSchemaContract,
Expand Down Expand Up @@ -452,10 +453,19 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
md_dict: TMergeDispositionDict = dict_.pop("write_disposition")
if merge_strategy := md_dict.get("strategy"):
dict_["x-merge-strategy"] = merge_strategy
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
# add columns for `scd2` merge strategy

if merge_strategy == "scd2":
md_dict = cast(TScd2StrategyDict, md_dict)
if "boundary_timestamp" in md_dict:
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
if "retire_if_absent" in md_dict:
dict_["x-retire-if-absent"] = md_dict["retire_if_absent"]
if "natural_key" in md_dict:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also no longer needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also removed

nk = md_dict["natural_key"]
if nk in dict_["columns"]:
dict_["columns"][nk]["x-natural-key"] = True
else:
dict_["columns"][nk] = {"name": nk, "x-natural-key": True}
if md_dict.get("validity_column_names") is None:
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
else:
Expand Down Expand Up @@ -523,13 +533,25 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)

for ts in ("active_record_timestamp", "boundary_timestamp"):
if ts == "active_record_timestamp" and wd.get("active_record_timestamp") is None:
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)
if wd.get("strategy") == "scd2":
wd = cast(TScd2StrategyDict, wd)
for ts in ("active_record_timestamp", "boundary_timestamp"):
if (
ts == "active_record_timestamp"
and wd.get("active_record_timestamp") is None
):
continue # None is allowed for active_record_timestamp
if ts in wd:
try:
ensure_pendulum_datetime(wd[ts]) # type: ignore[literal-required]
except Exception:
raise ValueError(
f'could not parse `{ts}` value "{wd[ts]}"' # type: ignore[literal-required]
)

if (
"retire_if_absent" in wd
and not wd["retire_if_absent"]
and "natural_key" not in wd
):
raise ValueError("`natural_key` is required when `retire_if_absent=False`")
8 changes: 4 additions & 4 deletions tests/common/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,17 @@ def test_typeddict_friendly_exceptions() -> None:
wrong_dict["write_disposition"] = {"strategy": "scd2"}
validate_dict(EndpointResource, wrong_dict, ".")
print(e.value)
# Union of 3 types and callable
assert len(e.value.nested_exceptions) == 4
# Union of 4 types and callable
assert len(e.value.nested_exceptions) == 5

# this has wrong disposition string
with pytest.raises(DictValidationException) as e:
wrong_dict = deepcopy(valid_dict)
wrong_dict["write_disposition"] = "unknown" # type: ignore[assignment]
validate_dict(EndpointResource, wrong_dict, ".")
print(e.value)
# Union of 3 types and callable
assert len(e.value.nested_exceptions) == 4
# Union of 4 types and callable
assert len(e.value.nested_exceptions) == 5

# this has wrong nested type
with pytest.raises(DictValidationException) as e:
Expand Down
98 changes: 96 additions & 2 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
from dlt.common.typing import TAnyDateTime
from dlt.common.pendulum import pendulum
from dlt.common.pipeline import LoadInfo
from dlt.common.schema.exceptions import ColumnNameConflictException
from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES
from dlt.common.normalizers.json.relational import DataItemNormalizer
from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention
from dlt.common.time import ensure_pendulum_datetime, reduce_pendulum_datetime_precision
from dlt.extract.resource import DltResource
from dlt.pipeline.exceptions import PipelineStepFailed

from tests.cases import arrow_table_all_data_types
from tests.load.utils import (
Expand Down Expand Up @@ -717,6 +715,102 @@ def r(data):
)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_retire_if_absent(
destination_config: DestinationTestConfiguration,
) -> None:
p = destination_config.setup_pipeline("abstract", dev_mode=True)

@dlt.resource(
table_name="dim_test",
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": False,
"natural_key": "nk",
},
)
def r(data):
yield data

# load 1 — initial load
dim_snap = [
{"nk": 1, "foo": "foo"},
{"nk": 2, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
_, to = DEFAULT_VALIDITY_COLUMN_NAMES
# both records should be active (i.e. not retired)
assert [row[to] for row in get_table(p, "dim_test")] == [None, None]

# load 2 — natural key 2 is absent, natural key 1 is unchanged
dim_snap = [
{"nk": 1, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
# both records should still be active
assert [row[to] for row in get_table(p, "dim_test")] == [None, None]

# load 3 — natural key 2 is absent, natural key 1 has changed
dim_snap = [
{"nk": 1, "foo": "bar"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 3
boundary_ts = get_load_package_created_at(p, info)
# natural key 1 should now have two records (one retired, one active)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also test here that when the row with natural key 1 reverts back to the first version (and has the same row hash as it had in the first load) it will still create a new entry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")]
expected = [{"nk": 1, to: boundary_ts}, {"nk": 1, to: None}, {"nk": 2, to: None}]
assert_records_as_set(actual, expected) # type: ignore[arg-type]

# now test various configs

with pytest.raises(ValueError):
# should raise because `natural_key` is required when `retire_if_absent=False`
r.apply_hints(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": False,
}
)

# `retire_if_absent=True` does not require `natural_key`
r.apply_hints(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": True,
}
)
assert r.compute_table_schema()["x-retire-if-absent"] # type: ignore[typeddict-item]

# user-provided hints for `natural_key` column should be respected
r.apply_hints(
columns={"nk": {"x-foo": "foo"}}, # type: ignore[typeddict-unknown-key]
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_if_absent": False,
"natural_key": "nk",
},
)
assert r.compute_table_schema()["columns"]["nk"] == {
"x-foo": "foo",
"name": "nk",
"x-natural-key": True,
}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
Expand Down
Loading