Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creates a single source in extract for all resource instances passed as list #1535

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,27 @@ def get_exception_trace_chain(
return traces


def group_dict_of_lists(input_dict: Dict[str, List[Any]]) -> List[Dict[str, Any]]:
"""Decomposes a dictionary with list values into a list of dictionaries with unique keys.

This function takes an input dictionary where each key maps to a list of objects.
It returns a list of dictionaries, each containing at most one object per key.
The goal is to ensure that no two objects with the same key appear in the same dictionary.

Parameters:
input_dict (Dict[str, List[Any]]): A dictionary with string keys and list of objects as values.

Returns:
List[Dict[str, Any]]: A list of dictionaries, each with unique keys and single objects.
"""
max_length = max(len(v) for v in input_dict.values())
list_of_dicts: List[Dict[str, Any]] = [{} for _ in range(max_length)]
for name, value_list in input_dict.items():
for idx, obj in enumerate(value_list):
list_of_dicts[idx][name] = obj
return list_of_dicts


def order_deduped(lst: List[Any]) -> List[Any]:
"""Returns deduplicated list preserving order of input elements.

Expand Down
25 changes: 17 additions & 8 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
TLoadPackageState,
commit_load_package_state,
)
from dlt.common.utils import get_callable_name, get_full_class_name
from dlt.common.utils import get_callable_name, get_full_class_name, group_dict_of_lists

from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints
Expand Down Expand Up @@ -97,7 +97,8 @@ def choose_schema() -> Schema:

# a list of sources or a list of resources may be passed as data
sources: List[DltSource] = []
resources: List[DltResource] = []
resources: Dict[str, List[DltResource]] = {}
data_resources: List[DltResource] = []

def append_data(data_item: Any) -> None:
if isinstance(data_item, DltSource):
Expand All @@ -106,13 +107,13 @@ def append_data(data_item: Any) -> None:
data_item.schema = schema
sources.append(data_item)
elif isinstance(data_item, DltResource):
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(DltSource(effective_schema, "", [data_item]))
# many resources with the same name may be present
r_ = resources.setdefault(data_item.name, [])
r_.append(data_item)
else:
# iterator/iterable/generator
# create resource first without table template
resources.append(
data_resources.append(
DltResource.from_data(data_item, name=table_name, section=pipeline.pipeline_name)
)

Expand All @@ -126,9 +127,17 @@ def append_data(data_item: Any) -> None:
else:
append_data(data)

# add all the appended resources in one source
# add all appended resource instances in one source
if resources:
sources.append(DltSource(effective_schema, pipeline.pipeline_name, resources))
# decompose into groups so at most single resource with a given name belongs to a group
for r_ in group_dict_of_lists(resources):
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(DltSource(effective_schema, "", list(r_.values())))

# add all the appended data-like items in one source
if data_resources:
sources.append(DltSource(effective_schema, pipeline.pipeline_name, data_resources))

# apply hints and settings
for source in sources:
Expand Down
37 changes: 37 additions & 0 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
flatten_list_of_str_or_dicts,
digest128,
graph_edges_to_nodes,
group_dict_of_lists,
map_nested_in_place,
reveal_pseudo_secret,
obfuscate_pseudo_secret,
Expand Down Expand Up @@ -367,3 +368,39 @@ def test_nested_dict_merge() -> None:
mappings_update, {"_config": {"_dsn": dsn, "_dict": {"a": 3}}}
)
assert mappings_update == deep_clone_dict_1_mappings


def test_group_dict_of_lists_one_element_each_list():
input_dict = {"Frege": ["obj1"], "Gödel": ["obj2"], "Wittgenstein": ["obj3"]}
result = group_dict_of_lists(input_dict)
assert len(result) == 1
assert result[0] == {"Frege": "obj1", "Gödel": "obj2", "Wittgenstein": "obj3"}


def test_group_dict_of_lists_equal_length_lists():
input_dict = {
"Frege": ["obj1", "obj2"],
"Gödel": ["obj3", "obj4"],
"Wittgenstein": ["obj5", "obj6"],
}
result = group_dict_of_lists(input_dict)
assert len(result) == 2
assert result[0] == {"Frege": "obj1", "Gödel": "obj3", "Wittgenstein": "obj5"}
assert result[1] == {"Frege": "obj2", "Gödel": "obj4", "Wittgenstein": "obj6"}


def test_group_dict_of_lists_various_length_lists():
input_dict = {
"Frege": ["obj1", "obj2", "obj3"],
"Gödel": ["obj4", "obj5"],
"Wittgenstein": ["obj6"],
}
result = group_dict_of_lists(input_dict)
assert len(result) == 3
assert result[0] == {"Frege": "obj1", "Gödel": "obj4", "Wittgenstein": "obj6"}
assert result[1] == {"Frege": "obj2", "Gödel": "obj5"}
assert result[2] == {"Frege": "obj3"}

# Check if the sizes of the decomposed dicts are decreasing
sizes = [len(d) for d in result]
assert sizes == sorted(sizes, reverse=True), "Sizes of decomposed dicts are not decreasing"
73 changes: 72 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ def generic(start):
assert generic(0).with_name("state2").state["start"] == 20

# NOTE: only one resource will be set in table
assert pipeline.default_schema.get_table("single_table")["resource"] == "state2"
assert pipeline.default_schema.get_table("single_table")["resource"] == "state1"

# now load only state1
load_info = pipeline.run(
Expand Down Expand Up @@ -2554,6 +2554,77 @@ def test_import_unknown_file_format() -> None:
assert isinstance(inner_ex.__cause__, ValueError)


def test_resource_transformer_standalone() -> None:
# requires that standalone resources are executes in a single source
page = 1

@dlt.resource(name="pages")
def gen_pages():
nonlocal page
while True:
yield {"page": page}
if page == 10:
return
page += 1

@dlt.transformer(name="subpages")
def get_subpages(page_item):
yield from [
{
"page": page_item["page"],
"subpage": subpage,
}
for subpage in range(1, 11)
]

pipeline = dlt.pipeline("test_resource_transformer_standalone", destination="duckdb")
# here we must combine resources and transformers using the same instance
info = pipeline.run([gen_pages, gen_pages | get_subpages])
assert_load_info(info)
# this works because we extract transformer and resource above in a single source so dlt optimizes
# dag and extracts gen_pages only once.
assert load_data_table_counts(pipeline) == {"subpages": 100, "pages": 10}

# for two separate sources we have the following
page = 1
schema = Schema("test")
info = pipeline.run(
[DltSource(schema, "", [gen_pages]), DltSource(schema, "", [gen_pages | get_subpages])],
dataset_name="new_dataset",
)
assert_load_info(info, 2)
# ten subpages because only 1 page is extracted in the second source (see gen_pages exit condition)
assert load_data_table_counts(pipeline) == {"subpages": 10, "pages": 10}


def test_resources_same_name_in_single_source() -> None:
source_ids: List[int] = []

@dlt.resource(name="pages")
def gen_pages():
page = 0
# also store id of current source instance
source_ids.append(id(dlt.current.source()))
while True:
yield {"page": page}
if page == 10:
return
page += 1

pipeline = dlt.pipeline("test_resources_same_name_in_single_source", destination="duckdb")
info = pipeline.run([gen_pages(), gen_pages()])
assert_load_info(info)
# two separate sources
assert len(set(source_ids)) == 2

# check against different names
source_ids.clear()
info = pipeline.run([gen_pages().with_name("page_1"), gen_pages().with_name("page_2")])
assert_load_info(info)
# one source
assert len(set(source_ids)) == 1


def test_static_staging_dataset() -> None:
# share database and staging dataset
duckdb_ = dlt.destinations.duckdb(
Expand Down
Loading