From 1f26f722212d2b894a73b5d1ff290fd8cc071564 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 13 Sep 2024 21:37:17 +0200 Subject: [PATCH] decomposes dicts of resources so names are split accross many sources --- dlt/common/utils.py | 21 +++++++++++++++++++ dlt/extract/extract.py | 16 ++++++++------ tests/common/test_utils.py | 37 +++++++++++++++++++++++++++++++++ tests/pipeline/test_pipeline.py | 29 ++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 6 deletions(-) diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 37b644c0b5..f7f82133f2 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -566,6 +566,27 @@ def get_exception_trace_chain( return traces +def group_dict_of_lists(input_dict: Dict[str, List[Any]]) -> List[Dict[str, Any]]: + """Decomposes a dictionary with list values into a list of dictionaries with unique keys. + + This function takes an input dictionary where each key maps to a list of objects. + It returns a list of dictionaries, each containing at most one object per key. + The goal is to ensure that no two objects with the same key appear in the same dictionary. + + Parameters: + input_dict (Dict[str, List[Any]]): A dictionary with string keys and list of objects as values. + + Returns: + List[Dict[str, Any]]: A list of dictionaries, each with unique keys and single objects. + """ + max_length = max(len(v) for v in input_dict.values()) + list_of_dicts: List[Dict[str, Any]] = [{} for _ in range(max_length)] + for name, value_list in input_dict.items(): + for idx, obj in enumerate(value_list): + list_of_dicts[idx][name] = obj + return list_of_dicts + + def order_deduped(lst: List[Any]) -> List[Any]: """Returns deduplicated list preserving order of input elements. diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 0128dfc93b..e65f6cf0d0 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -35,7 +35,7 @@ TLoadPackageState, commit_load_package_state, ) -from dlt.common.utils import get_callable_name, get_full_class_name +from dlt.common.utils import get_callable_name, get_full_class_name, group_dict_of_lists from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints @@ -97,7 +97,7 @@ def choose_schema() -> Schema: # a list of sources or a list of resources may be passed as data sources: List[DltSource] = [] - resources: List[DltResource] = [] + resources: Dict[str, List[DltResource]] = {} data_resources: List[DltResource] = [] def append_data(data_item: Any) -> None: @@ -107,7 +107,9 @@ def append_data(data_item: Any) -> None: data_item.schema = schema sources.append(data_item) elif isinstance(data_item, DltResource): - resources.append(data_item) + # many resources with the same name may be present + r_ = resources.setdefault(data_item.name, []) + r_.append(data_item) else: # iterator/iterable/generator # create resource first without table template @@ -127,9 +129,11 @@ def append_data(data_item: Any) -> None: # add all appended resource instances in one source if resources: - # do not set section to prevent source that represent a standalone resource - # to overwrite other standalone resources (ie. parents) in that source - sources.append(DltSource(effective_schema, "", resources)) + # decompose into groups so at most single resource with a given name belongs to a group + for r_ in group_dict_of_lists(resources): + # do not set section to prevent source that represent a standalone resource + # to overwrite other standalone resources (ie. parents) in that source + sources.append(DltSource(effective_schema, "", list(r_.values()))) # add all the appended data-like items in one source if data_resources: diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index e08c1cdf01..864bce5b91 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -13,6 +13,7 @@ flatten_list_of_str_or_dicts, digest128, graph_edges_to_nodes, + group_dict_of_lists, map_nested_in_place, reveal_pseudo_secret, obfuscate_pseudo_secret, @@ -367,3 +368,39 @@ def test_nested_dict_merge() -> None: mappings_update, {"_config": {"_dsn": dsn, "_dict": {"a": 3}}} ) assert mappings_update == deep_clone_dict_1_mappings + + +def test_group_dict_of_lists_one_element_each_list(): + input_dict = {"Frege": ["obj1"], "Gödel": ["obj2"], "Wittgenstein": ["obj3"]} + result = group_dict_of_lists(input_dict) + assert len(result) == 1 + assert result[0] == {"Frege": "obj1", "Gödel": "obj2", "Wittgenstein": "obj3"} + + +def test_group_dict_of_lists_equal_length_lists(): + input_dict = { + "Frege": ["obj1", "obj2"], + "Gödel": ["obj3", "obj4"], + "Wittgenstein": ["obj5", "obj6"], + } + result = group_dict_of_lists(input_dict) + assert len(result) == 2 + assert result[0] == {"Frege": "obj1", "Gödel": "obj3", "Wittgenstein": "obj5"} + assert result[1] == {"Frege": "obj2", "Gödel": "obj4", "Wittgenstein": "obj6"} + + +def test_group_dict_of_lists_various_length_lists(): + input_dict = { + "Frege": ["obj1", "obj2", "obj3"], + "Gödel": ["obj4", "obj5"], + "Wittgenstein": ["obj6"], + } + result = group_dict_of_lists(input_dict) + assert len(result) == 3 + assert result[0] == {"Frege": "obj1", "Gödel": "obj4", "Wittgenstein": "obj6"} + assert result[1] == {"Frege": "obj2", "Gödel": "obj5"} + assert result[2] == {"Frege": "obj3"} + + # Check if the sizes of the decomposed dicts are decreasing + sizes = [len(d) for d in result] + assert sizes == sorted(sizes, reverse=True), "Sizes of decomposed dicts are not decreasing" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 5f0e6f1a03..73125cbd6c 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -2555,6 +2555,7 @@ def test_import_unknown_file_format() -> None: def test_resource_transformer_standalone() -> None: + # requires that standalone resources are executes in a single source page = 1 @dlt.resource(name="pages") @@ -2596,6 +2597,34 @@ def get_subpages(page_item): assert load_data_table_counts(pipeline) == {"subpages": 10, "pages": 10} +def test_resources_same_name_in_single_source() -> None: + source_ids: List[int] = [] + + @dlt.resource(name="pages") + def gen_pages(): + page = 0 + # also store id of current source instance + source_ids.append(id(dlt.current.source())) + while True: + yield {"page": page} + if page == 10: + return + page += 1 + + pipeline = dlt.pipeline("test_resources_same_name_in_single_source", destination="duckdb") + info = pipeline.run([gen_pages(), gen_pages()]) + assert_load_info(info) + # two separate sources + assert len(set(source_ids)) == 2 + + # check against different names + source_ids.clear() + info = pipeline.run([gen_pages().with_name("page_1"), gen_pages().with_name("page_2")]) + assert_load_info(info) + # one source + assert len(set(source_ids)) == 1 + + def test_static_staging_dataset() -> None: # share database and staging dataset duckdb_ = dlt.destinations.duckdb(