Skip to content

Commit

Permalink
decomposes dicts of resources so names are split accross many sources
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 13, 2024
1 parent 435f06d commit 1f26f72
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 6 deletions.
21 changes: 21 additions & 0 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,27 @@ def get_exception_trace_chain(
return traces


def group_dict_of_lists(input_dict: Dict[str, List[Any]]) -> List[Dict[str, Any]]:
"""Decomposes a dictionary with list values into a list of dictionaries with unique keys.
This function takes an input dictionary where each key maps to a list of objects.
It returns a list of dictionaries, each containing at most one object per key.
The goal is to ensure that no two objects with the same key appear in the same dictionary.
Parameters:
input_dict (Dict[str, List[Any]]): A dictionary with string keys and list of objects as values.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each with unique keys and single objects.
"""
max_length = max(len(v) for v in input_dict.values())
list_of_dicts: List[Dict[str, Any]] = [{} for _ in range(max_length)]
for name, value_list in input_dict.items():
for idx, obj in enumerate(value_list):
list_of_dicts[idx][name] = obj
return list_of_dicts


def order_deduped(lst: List[Any]) -> List[Any]:
"""Returns deduplicated list preserving order of input elements.
Expand Down
16 changes: 10 additions & 6 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
TLoadPackageState,
commit_load_package_state,
)
from dlt.common.utils import get_callable_name, get_full_class_name
from dlt.common.utils import get_callable_name, get_full_class_name, group_dict_of_lists

from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints
Expand Down Expand Up @@ -97,7 +97,7 @@ def choose_schema() -> Schema:

# a list of sources or a list of resources may be passed as data
sources: List[DltSource] = []
resources: List[DltResource] = []
resources: Dict[str, List[DltResource]] = {}
data_resources: List[DltResource] = []

def append_data(data_item: Any) -> None:
Expand All @@ -107,7 +107,9 @@ def append_data(data_item: Any) -> None:
data_item.schema = schema
sources.append(data_item)
elif isinstance(data_item, DltResource):
resources.append(data_item)
# many resources with the same name may be present
r_ = resources.setdefault(data_item.name, [])
r_.append(data_item)
else:
# iterator/iterable/generator
# create resource first without table template
Expand All @@ -127,9 +129,11 @@ def append_data(data_item: Any) -> None:

# add all appended resource instances in one source
if resources:
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(DltSource(effective_schema, "", resources))
# decompose into groups so at most single resource with a given name belongs to a group
for r_ in group_dict_of_lists(resources):
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(DltSource(effective_schema, "", list(r_.values())))

# add all the appended data-like items in one source
if data_resources:
Expand Down
37 changes: 37 additions & 0 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
flatten_list_of_str_or_dicts,
digest128,
graph_edges_to_nodes,
group_dict_of_lists,
map_nested_in_place,
reveal_pseudo_secret,
obfuscate_pseudo_secret,
Expand Down Expand Up @@ -367,3 +368,39 @@ def test_nested_dict_merge() -> None:
mappings_update, {"_config": {"_dsn": dsn, "_dict": {"a": 3}}}
)
assert mappings_update == deep_clone_dict_1_mappings


def test_group_dict_of_lists_one_element_each_list():
input_dict = {"Frege": ["obj1"], "Gödel": ["obj2"], "Wittgenstein": ["obj3"]}
result = group_dict_of_lists(input_dict)
assert len(result) == 1
assert result[0] == {"Frege": "obj1", "Gödel": "obj2", "Wittgenstein": "obj3"}


def test_group_dict_of_lists_equal_length_lists():
input_dict = {
"Frege": ["obj1", "obj2"],
"Gödel": ["obj3", "obj4"],
"Wittgenstein": ["obj5", "obj6"],
}
result = group_dict_of_lists(input_dict)
assert len(result) == 2
assert result[0] == {"Frege": "obj1", "Gödel": "obj3", "Wittgenstein": "obj5"}
assert result[1] == {"Frege": "obj2", "Gödel": "obj4", "Wittgenstein": "obj6"}


def test_group_dict_of_lists_various_length_lists():
input_dict = {
"Frege": ["obj1", "obj2", "obj3"],
"Gödel": ["obj4", "obj5"],
"Wittgenstein": ["obj6"],
}
result = group_dict_of_lists(input_dict)
assert len(result) == 3
assert result[0] == {"Frege": "obj1", "Gödel": "obj4", "Wittgenstein": "obj6"}
assert result[1] == {"Frege": "obj2", "Gödel": "obj5"}
assert result[2] == {"Frege": "obj3"}

# Check if the sizes of the decomposed dicts are decreasing
sizes = [len(d) for d in result]
assert sizes == sorted(sizes, reverse=True), "Sizes of decomposed dicts are not decreasing"
29 changes: 29 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,7 @@ def test_import_unknown_file_format() -> None:


def test_resource_transformer_standalone() -> None:
# requires that standalone resources are executes in a single source
page = 1

@dlt.resource(name="pages")
Expand Down Expand Up @@ -2596,6 +2597,34 @@ def get_subpages(page_item):
assert load_data_table_counts(pipeline) == {"subpages": 10, "pages": 10}


def test_resources_same_name_in_single_source() -> None:
source_ids: List[int] = []

@dlt.resource(name="pages")
def gen_pages():
page = 0
# also store id of current source instance
source_ids.append(id(dlt.current.source()))
while True:
yield {"page": page}
if page == 10:
return
page += 1

pipeline = dlt.pipeline("test_resources_same_name_in_single_source", destination="duckdb")
info = pipeline.run([gen_pages(), gen_pages()])
assert_load_info(info)
# two separate sources
assert len(set(source_ids)) == 2

# check against different names
source_ids.clear()
info = pipeline.run([gen_pages().with_name("page_1"), gen_pages().with_name("page_2")])
assert_load_info(info)
# one source
assert len(set(source_ids)) == 1


def test_static_staging_dataset() -> None:
# share database and staging dataset
duckdb_ = dlt.destinations.duckdb(
Expand Down

0 comments on commit 1f26f72

Please sign in to comment.