-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rest_api: filter, exclude, transform API responses #495
Changes from 11 commits
f64bd42
5f9835e
65471f5
85dd1d7
abe9337
c59e40c
08c29f2
d9ba2ae
d2dcd3c
b7fba2a
784106b
0ae9523
c65d26f
6ed8f11
ad01964
99ff64a
d376292
ae894ce
83b7373
57293eb
b548656
95838a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
"""Generic API Source""" | ||
|
||
from copy import deepcopy | ||
from typing import Type, Any, Dict, List, Optional, Generator, Callable, cast, Union | ||
import graphlib # type: ignore[import,unused-ignore] | ||
|
@@ -33,6 +34,7 @@ | |
IncrementalParamConfig, | ||
RESTAPIConfig, | ||
ParamBindType, | ||
ProcessingSteps, | ||
) | ||
from .config_setup import ( | ||
IncrementalParam, | ||
|
@@ -222,6 +224,7 @@ def create_resources( | |
request_params = endpoint_config.get("params", {}) | ||
request_json = endpoint_config.get("json", None) | ||
paginator = create_paginator(endpoint_config.get("paginator")) | ||
processing_steps = endpoint_resource.pop("processing_steps", []) | ||
|
||
resolved_param: ResolvedParam = resolved_param_map[resource_name] | ||
|
||
|
@@ -253,6 +256,14 @@ def create_resources( | |
endpoint_resource, {"endpoint", "include_from_parent"} | ||
) | ||
|
||
def process(resource, processing_steps) -> Any: | ||
for step in processing_steps: | ||
if "filter" in step: | ||
resource.add_filter(step["filter"]) | ||
if "map" in step: | ||
resource.add_map(step["map"]) | ||
return resource | ||
|
||
if resolved_param is None: | ||
|
||
def paginate_resource( | ||
|
@@ -278,15 +289,15 @@ def paginate_resource( | |
incremental_cursor_transform, | ||
) | ||
|
||
yield from client.paginate( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to keep the |
||
for page in client.paginate( | ||
francescomucio marked this conversation as resolved.
Show resolved
Hide resolved
burnash marked this conversation as resolved.
Show resolved
Hide resolved
|
||
method=method, | ||
path=path, | ||
params=params, | ||
json=json, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it a typo or there's a reason to remove |
||
paginator=paginator, | ||
data_selector=data_selector, | ||
hooks=hooks, | ||
) | ||
): | ||
yield page | ||
|
||
resources[resource_name] = dlt.resource( | ||
paginate_resource, | ||
|
@@ -300,6 +311,9 @@ def paginate_resource( | |
data_selector=endpoint_config.get("data_selector"), | ||
hooks=hooks, | ||
) | ||
resources[resource_name] = process( | ||
resources[resource_name], processing_steps | ||
) | ||
|
||
else: | ||
predecessor = resources[resolved_param.resolve_config["resource"]] | ||
|
@@ -362,6 +376,10 @@ def paginate_dependent_resource( | |
hooks=hooks, | ||
) | ||
|
||
burnash marked this conversation as resolved.
Show resolved
Hide resolved
|
||
resources[resource_name] = process( | ||
resources[resource_name], processing_steps | ||
) | ||
|
||
return resources | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -240,6 +240,11 @@ class Endpoint(TypedDict, total=False): | |
incremental: Optional[IncrementalConfig] | ||
|
||
|
||
class ProcessingSteps(TypedDict): | ||
filter: Optional[Callable[[Any], bool]] | ||
map: Optional[Callable[[Any], Any]] | ||
|
||
|
||
class ResourceBase(TypedDict, total=False): | ||
"""Defines hints that may be passed to `dlt.resource` decorator""" | ||
|
||
|
@@ -254,6 +259,10 @@ class ResourceBase(TypedDict, total=False): | |
table_format: Optional[TTableHintTemplate[TTableFormat]] | ||
selected: Optional[bool] | ||
parallelized: Optional[bool] | ||
processing_steps: Optional[List[ProcessingSteps]] | ||
# row_filter: Optional[Callable[[Any], bool]] | ||
# transform: Optional[Callable[[Any], Any]] | ||
# exclude_columns: Optional[List[jsonpath.TJsonPath]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove the commented code and implement these ready-to-use operation as a follow-up PR |
||
|
||
|
||
class EndpointResourceBase(ResourceBase, total=False): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import dlt | ||
import pytest | ||
from dlt.sources.helpers.rest_client.paginators import SinglePagePaginator | ||
|
||
from sources.rest_api import rest_api_source, RESTAPIConfig | ||
from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts | ||
|
||
|
||
def _make_pipeline(destination_name: str): | ||
return dlt.pipeline( | ||
pipeline_name="rest_api", | ||
destination=destination_name, | ||
dataset_name="rest_api_data", | ||
full_refresh=True, | ||
) | ||
|
||
|
||
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) | ||
def test_rest_api_source(destination_name: str) -> None: | ||
config: RESTAPIConfig = { | ||
"client": { | ||
"base_url": "https://pokeapi.co/api/v2/", | ||
}, | ||
"resource_defaults": { | ||
"endpoint": { | ||
"params": { | ||
"limit": 1000, | ||
}, | ||
} | ||
}, | ||
"resources": [ | ||
{ | ||
"name": "pokemon_list", | ||
"endpoint": "pokemon", | ||
"processing_steps": [ | ||
{"filter": lambda x: x["name"] == "bulbasaur"}, | ||
], | ||
}, | ||
], | ||
} | ||
data = rest_api_source(config) | ||
pipeline = _make_pipeline(destination_name) | ||
load_info = pipeline.run(data) | ||
print(load_info) | ||
assert_load_info(load_info) | ||
table_names = [t["name"] for t in pipeline.default_schema.data_tables()] | ||
table_counts = load_table_counts(pipeline, *table_names) | ||
|
||
assert table_counts.keys() == {"pokemon_list"} | ||
|
||
assert table_counts["pokemon_list"] == 1 | ||
|
||
|
||
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) | ||
def test_dependent_resource(destination_name: str) -> None: | ||
config = { | ||
"client": { | ||
"base_url": "https://pokeapi.co/api/v2/", | ||
}, | ||
"resource_defaults": { | ||
"endpoint": { | ||
"params": { | ||
"limit": 1000, | ||
}, | ||
} | ||
}, | ||
"resources": [ | ||
{ | ||
"name": "pokemon_list", | ||
"endpoint": { | ||
"path": "pokemon", | ||
"paginator": SinglePagePaginator(), | ||
"data_selector": "results", | ||
"params": { | ||
"limit": 2, | ||
}, | ||
}, | ||
"selected": False, | ||
"processing_steps": [ | ||
{"filter": lambda x: x["name"] == "bulbasaur"}, | ||
], | ||
}, | ||
{ | ||
"name": "pokemon", | ||
"endpoint": { | ||
"path": "pokemon/{name}", | ||
"params": { | ||
"name": { | ||
"type": "resolve", | ||
"resource": "pokemon_list", | ||
"field": "name", | ||
}, | ||
}, | ||
}, | ||
}, | ||
], | ||
} | ||
|
||
data = rest_api_source(config) | ||
pipeline = _make_pipeline(destination_name) | ||
load_info = pipeline.run(data) | ||
assert_load_info(load_info) | ||
table_names = [t["name"] for t in pipeline.default_schema.data_tables()] | ||
table_counts = load_table_counts(pipeline, *table_names) | ||
|
||
assert set(table_counts.keys()) == { | ||
"pokemon", | ||
"pokemon__types", | ||
"pokemon__stats", | ||
"pokemon__moves__version_group_details", | ||
"pokemon__moves", | ||
"pokemon__game_indices", | ||
"pokemon__forms", | ||
"pokemon__abilities", | ||
} | ||
|
||
assert table_counts["pokemon"] == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome!