Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creates a single source in extract for all resource instances passed as list #1535

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def choose_schema() -> Schema:
# a list of sources or a list of resources may be passed as data
sources: List[DltSource] = []
resources: List[DltResource] = []
data_resources: List[DltResource] = []

def append_data(data_item: Any) -> None:
if isinstance(data_item, DltSource):
Expand All @@ -103,13 +104,11 @@ def append_data(data_item: Any) -> None:
data_item.schema = schema
sources.append(data_item)
elif isinstance(data_item, DltResource):
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(DltSource(effective_schema, "", [data_item]))
resources.append(data_item)
else:
# iterator/iterable/generator
# create resource first without table template
resources.append(
data_resources.append(
DltResource.from_data(data_item, name=table_name, section=pipeline.pipeline_name)
)

Expand All @@ -123,9 +122,15 @@ def append_data(data_item: Any) -> None:
else:
append_data(data)

# add all the appended resources in one source
# add all appended resource instances in one source
if resources:
sources.append(DltSource(effective_schema, pipeline.pipeline_name, resources))
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(DltSource(effective_schema, "", resources))

# add all the appended data-like items in one source
if data_resources:
sources.append(DltSource(effective_schema, pipeline.pipeline_name, data_resources))

# apply hints and settings
for source in sources:
Expand Down
44 changes: 43 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ def generic(start):
assert generic(0).with_name("state2").state["start"] == 20

# NOTE: only one resource will be set in table
assert pipeline.default_schema.get_table("single_table")["resource"] == "state2"
assert pipeline.default_schema.get_table("single_table")["resource"] == "state1"

# now load only state1
load_info = pipeline.run(
Expand Down Expand Up @@ -2438,6 +2438,48 @@ def test_import_unknown_file_format() -> None:
assert isinstance(inner_ex.__cause__, ValueError)


def test_resource_transformer_standalone() -> None:
page = 1

@dlt.resource(name="pages")
def gen_pages():
nonlocal page
while True:
yield {"page": page}
if page == 10:
return
page += 1

@dlt.transformer(name="subpages")
def get_subpages(page_item):
yield from [
{
"page": page_item["page"],
"subpage": subpage,
}
for subpage in range(1, 11)
]

pipeline = dlt.pipeline("test_resource_transformer_standalone", destination="duckdb")
# here we must combine resources and transformers using the same instance
info = pipeline.run([gen_pages, gen_pages | get_subpages])
assert_load_info(info)
# this works because we extract transformer and resource above in a single source so dlt optimizes
# dag and extracts gen_pages only once.
assert load_data_table_counts(pipeline) == {"subpages": 100, "pages": 10}

# for two separate sources we have the following
page = 1
schema = Schema("test")
info = pipeline.run(
[DltSource(schema, "", [gen_pages]), DltSource(schema, "", [gen_pages | get_subpages])],
dataset_name="new_dataset",
)
assert_load_info(info, 2)
# ten subpages because only 1 page is extracted in the second source (see gen_pages exit condition)
assert load_data_table_counts(pipeline) == {"subpages": 10, "pages": 10}


def assert_imported_file(
pipeline: Pipeline,
table_name: str,
Expand Down
Loading