Skip to content

Commit

Permalink
adds tests for tables without jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 11, 2024
1 parent 52e3f1e commit cd8f08c
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 8 deletions.
13 changes: 10 additions & 3 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def ensure_delta_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema:
pa.types.is_time: pa.string(),
pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128
}
# NOTE: also consider calling _convert_pa_schema_to_delta() from delta.schema which casts unsigned types
return cast_arrow_schema_types(schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP)


Expand Down Expand Up @@ -192,14 +193,20 @@ def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str
return {**creds, **extra_options}


def _evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema) -> None:
def _evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema) -> DeltaTable:
"""Evolves `delta_table` schema if different from `arrow_schema`.
We compare fields via names. Actual types and nullability are ignored. This is
how schemas are evolved for other destinations. Existing columns are never modified.
Variant columns are created.
Adds column(s) to `delta_table` present in `arrow_schema` but not in `delta_table`.
"""
new_fields = [
deltalake.Field.from_pyarrow(field)
for field in ensure_delta_compatible_arrow_schema(arrow_schema)
if field not in delta_table.to_pyarrow_dataset().schema
if field.name not in delta_table.schema().to_pyarrow().names
]
delta_table.alter.add_columns(new_fields)
if new_fields:
delta_table.alter.add_columns(new_fields)
return delta_table
8 changes: 5 additions & 3 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import deepcopy
from typing import TypedDict, cast, Any, Optional, Dict
from typing_extensions import Self

from dlt.common import logger
from dlt.common.schema.typing import (
Expand Down Expand Up @@ -222,7 +222,7 @@ def apply_hints(
table_format: TTableHintTemplate[TTableFormat] = None,
file_format: TTableHintTemplate[TFileFormat] = None,
create_table_variant: bool = False,
) -> None:
) -> Self:
"""Creates or modifies existing table schema by setting provided hints. Accepts both static and dynamic hints based on data.
If `create_table_variant` is specified, the `table_name` must be a string and hints will be used to create a separate set of hints
Expand All @@ -238,6 +238,8 @@ def apply_hints(
Please note that for efficient incremental loading, the resource must be aware of the Incremental by accepting it as one if its arguments and then using are to skip already loaded data.
In non-aware resources, `dlt` will filter out the loaded values, however, the resource will yield all the values again.
Returns: self for chaining
"""
if create_table_variant:
if not isinstance(table_name, str):
Expand Down Expand Up @@ -344,6 +346,7 @@ def apply_hints(
t["incremental"] = incremental

self._set_hints(t, create_table_variant)
return self

def _set_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
Expand Down Expand Up @@ -403,7 +406,6 @@ def _clone_hints(hints_template: TResourceHints) -> TResourceHints:
if hints_template is None:
return None
# creates a deep copy of dict structure without actually copying the objects
# deepcopy(hints_template) #
return clone_dict_nested(hints_template) # type: ignore[type-var]

@staticmethod
Expand Down
13 changes: 13 additions & 0 deletions tests/destinations/test_custom_destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,19 @@ def local_sink_func(items: TDataItems, table: TTableSchema, my_val=dlt.config.va
# local func does not create entry in destinations
assert "local_sink_func" not in _DESTINATIONS

def local_sink_func_no_params(items: TDataItems, table: TTableSchema) -> None:
# consume data
pass

p = dlt.pipeline(
"sink_test",
destination=Destination.from_reference(
"destination", destination_callable=local_sink_func_no_params
),
dev_mode=True,
)
p.run([1, 2, 3], table_name="items")

# test passing string reference
global global_calls
global_calls = []
Expand Down
24 changes: 23 additions & 1 deletion tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,16 @@ def nested_table():
assert len(rows_dict["nested_table__child"]) == 3
assert len(rows_dict["nested_table__child__grandchild"]) == 5

# now drop children and grandchildren, use merge write disposition to create and pass full table chain
# also for tables that do not have jobs
info = pipeline.run(
[{"foo": 3}] * 10000,
table_name="nested_table",
primary_key="foo",
write_disposition="merge",
)
assert_load_info(info)


@pytest.mark.parametrize(
"destination_config",
Expand Down Expand Up @@ -735,9 +745,21 @@ def delta_table(data):
ensure_delta_compatible_arrow_data(empty_arrow_table).schema
)

# now run the empty frame again
info = pipeline.run(delta_table(empty_arrow_table))
assert_load_info(info)

# use materialized list
# NOTE: this will create an empty parquet file with a schema takes from dlt schema.
# the original parquet file had a nested (struct) type in `json` field that is now
# in the delta table schema. the empty parquet file lost this information and had
# string type (converted from dlt `json`)
info = pipeline.run([dlt.mark.materialize_table_schema()], table_name="delta_table")
assert_load_info(info)

# test `dlt.mark.materialize_table_schema()`
users_materialize_table_schema.apply_hints(table_format="delta")
info = pipeline.run(users_materialize_table_schema())
info = pipeline.run(users_materialize_table_schema(), loader_file_format="parquet")
assert_load_info(info)
dt = get_delta_tables(pipeline, "users")["users"]
assert dt.version() == 0
Expand Down
140 changes: 140 additions & 0 deletions tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from dlt.common import json, pendulum
from dlt.common.configuration.container import Container
from dlt.common.destination.utils import resolve_merge_strategy
from dlt.common.pipeline import StateInjectableContext
from dlt.common.schema.utils import has_table_seen_data
from dlt.common.schema.exceptions import (
Expand Down Expand Up @@ -46,6 +47,7 @@ def skip_if_not_supported(
merge_strategy: TLoaderMergeStrategy,
destination: TDestination,
) -> None:
# resolve_merge_strategy
if merge_strategy not in destination.capabilities().supported_merge_strategies:
pytest.skip(
f"`{merge_strategy}` merge strategy not supported for `{destination.destination_name}`"
Expand All @@ -70,6 +72,7 @@ def test_merge_on_keys_in_schema(
destination_config: DestinationTestConfiguration,
merge_strategy: TLoaderMergeStrategy,
) -> None:
"""Tests merge disposition on an annotated schema, no annotations on resource"""
p = destination_config.setup_pipeline("eth_2", dev_mode=True)

skip_if_not_supported(merge_strategy, p.destination)
Expand Down Expand Up @@ -261,6 +264,143 @@ def r(data):
)


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True,
local_filesystem_configs=True,
table_format_filesystem_configs=True,
supports_merge=True,
bucket_subset=(FILE_BUCKET),
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("merge_strategy", ("delete-insert", "upsert"))
def test_merge_nested_records_inserted_deleted(
destination_config: DestinationTestConfiguration,
merge_strategy: TLoaderMergeStrategy,
) -> None:
p = destination_config.setup_pipeline(
"test_merge_nested_records_inserted_deleted", dev_mode=True
)

skip_if_not_supported(merge_strategy, p.destination)

@dlt.resource(
table_name="parent",
write_disposition={"disposition": "merge", "strategy": merge_strategy},
primary_key="id",
merge_key="foo",
)
def r(data):
yield data

# initial load
run_1 = [
{"id": 1, "foo": 1, "child": [{"bar": 1, "grandchild": [{"baz": 1}]}]},
{"id": 2, "foo": 1, "child": [{"bar": 1, "grandchild": [{"baz": 1}]}]},
{"id": 3, "foo": 1, "child": [{"bar": 3, "grandchild": [{"baz": 1}]}]},
]
info = p.run(r(run_1), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "parent", "parent__child", "parent__child__grandchild") == {
"parent": 3,
"parent__child": 3,
"parent__child__grandchild": 3,
}
tables = load_tables_to_dicts(p, "parent", exclude_system_cols=True)
assert_records_as_set(
tables["parent"],
[
{"id": 1, "foo": 1},
{"id": 2, "foo": 1},
{"id": 3, "foo": 1},
],
)

# delete records — delete parent (id 3), child (id 2) and grandchild (id 1)
# foo is merge key, should delete id = 3
run_3 = [
{"id": 1, "foo": 1, "child": [{"bar": 2}]},
{"id": 2, "foo": 1},
]
info = p.run(r(run_3), **destination_config.run_kwargs)
assert_load_info(info)

table_counts = load_table_counts(p, "parent", "parent__child", "parent__child__grandchild")
table_data = load_tables_to_dicts(p, "parent", "parent__child", exclude_system_cols=True)
if merge_strategy == "upsert":
# merge keys will not apply and parent will not be deleted
if destination_config.table_format == "delta":
# delta merges cannot delete from nested tables
assert table_counts == {
"parent": 3, # id == 3 not deleted (not present in the data)
"parent__child": 3, # child not deleted
"parent__child__grandchild": 3, # grand child not deleted,
}
else:
assert table_counts == {
"parent": 3, # id == 3 not deleted (not present in the data)
"parent__child": 2,
"parent__child__grandchild": 1,
}
assert_records_as_set(
table_data["parent__child"],
[
{"bar": 2}, # id 1 updated to bar
{"bar": 3}, # id 3 not deleted
],
)
else:
assert table_counts == {
"parent": 2,
"parent__child": 1,
"parent__child__grandchild": 0,
}
assert_records_as_set(
table_data["parent__child"],
[
{"bar": 2},
],
)

# insert records id 3 inserted back, id 2 added child, id 1 added grandchild
run_3 = [
{"id": 1, "foo": 1, "child": [{"bar": 1, "grandchild": [{"baz": 1}, {"baz": 4}]}]},
{"id": 2, "foo": 1, "child": [{"bar": 2, "grandchild": [{"baz": 2}]}, {"bar": 4}]},
{"id": 3, "foo": 1, "child": [{"bar": 3, "grandchild": [{"baz": 3}]}]},
]
info = p.run(r(run_3), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "parent", "parent__child", "parent__child__grandchild") == {
"parent": 3,
"parent__child": 4,
"parent__child__grandchild": 4,
}
tables = load_tables_to_dicts(
p, "parent__child", "parent__child__grandchild", exclude_system_cols=True
)
assert_records_as_set(
tables["parent__child__grandchild"],
[
{"baz": 2},
{"baz": 1},
{"baz": 3},
{"baz": 4},
],
)
assert_records_as_set(
tables["parent__child"],
[
{"bar": 2},
{"bar": 1},
{"bar": 3},
{"bar": 4},
],
)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
Expand Down
2 changes: 1 addition & 1 deletion tests/load/postgres/postgres/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM postgres:14
FROM postgres:15
COPY 01_init.sql /docker-entrypoint-initdb.d/

0 comments on commit cd8f08c

Please sign in to comment.