Skip to content

Commit

Permalink
Fix/1571 Incremental: Optionally raise, load, or ignore raise records…
Browse files Browse the repository at this point in the history
… with cursor_path missing or None value (#1576)

* allows specification of what happens on cursor_path missing or cursor_path having the value None: raise differentiated exceptions, exclude row, or include row.

* Documents handling None values at the incremental cursor

* fixes incremental extract crashing if one record has cursor_path = None

* test that add_map can be used to transform items before the incremental function is called

* Unifies treating of None values for python Objects (including pydantic), pandas, and arrow

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
willi-mueller and rudolfix committed Sep 2, 2024
1 parent 7c803f0 commit c4c9195
Show file tree
Hide file tree
Showing 7 changed files with 624 additions and 35 deletions.
18 changes: 16 additions & 2 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc
from dlt.extract.incremental.typing import (
IncrementalColumnState,
TCursorValue,
LastValueFunc,
OnCursorValueMissing,
)
from dlt.extract.pipe import Pipe
from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform
from dlt.extract.incremental.transform import (
Expand Down Expand Up @@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
>>> info = p.run(r, destination="duckdb")
Args:
cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run.
last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max`
primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks
Expand All @@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class.
The values passed explicitly to Incremental will be ignored.
Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude
"""

# this is config/dataclass so declare members
Expand All @@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
end_value: Optional[Any] = None
row_order: Optional[TSortOrder] = None
allow_external_schedulers: bool = False
on_cursor_value_missing: OnCursorValueMissing = "raise"

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand All @@ -118,6 +125,7 @@ def __init__(
end_value: Optional[TCursorValue] = None,
row_order: Optional[TSortOrder] = None,
allow_external_schedulers: bool = False,
on_cursor_value_missing: OnCursorValueMissing = "raise",
) -> None:
# make sure that path is valid
if cursor_path:
Expand All @@ -133,6 +141,11 @@ def __init__(
self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key
self.row_order = row_order
self.allow_external_schedulers = allow_external_schedulers
if on_cursor_value_missing not in ["raise", "include", "exclude"]:
raise ValueError(
f"Unexpected argument for on_cursor_value_missing. Got {on_cursor_value_missing}"
)
self.on_cursor_value_missing = on_cursor_value_missing

self._cached_state: IncrementalColumnState = None
"""State dictionary cached on first access"""
Expand Down Expand Up @@ -171,6 +184,7 @@ def _make_transforms(self) -> None:
self.last_value_func,
self._primary_key,
set(self._cached_state["unique_hashes"]),
self.on_cursor_value_missing,
)

@classmethod
Expand Down
19 changes: 17 additions & 2 deletions dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@


class IncrementalCursorPathMissing(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None:
def __init__(
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
) -> None:
self.json_path = json_path
self.item = item
msg = (
msg
or f"Cursor element with JSON path `{json_path}` was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database."
)
super().__init__(pipe_name, msg)


class IncrementalCursorPathHasValueNone(PipeException):
def __init__(
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
) -> None:
self.json_path = json_path
self.item = item
msg = (
msg
or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database."
or f"Cursor element with JSON path `{json_path}` has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows"
)
super().__init__(pipe_name, msg)

Expand Down
89 changes: 69 additions & 20 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, date # noqa: I251
from typing import Any, Optional, Set, Tuple, List
from datetime import datetime # noqa: I251
from typing import Any, Optional, Set, Tuple, List, Type

from dlt.common.exceptions import MissingDependencyException
from dlt.common.utils import digest128
Expand All @@ -11,8 +11,9 @@
IncrementalCursorInvalidCoercion,
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
IncrementalCursorPathHasValueNone,
)
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing
from dlt.extract.utils import resolve_column_value
from dlt.extract.items import TTableHintTemplate
from dlt.common.schema.typing import TColumnNames
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
last_value_func: LastValueFunc[TCursorValue],
primary_key: Optional[TTableHintTemplate[TColumnNames]],
unique_hashes: Set[str],
on_cursor_value_missing: OnCursorValueMissing = "raise",
) -> None:
self.resource_name = resource_name
self.cursor_path = cursor_path
Expand All @@ -67,6 +69,7 @@ def __init__(
self.primary_key = primary_key
self.unique_hashes = unique_hashes
self.start_unique_hashes = set(unique_hashes)
self.on_cursor_value_missing = on_cursor_value_missing

# compile jsonpath
self._compiled_cursor_path = compile_path(cursor_path)
Expand Down Expand Up @@ -116,21 +119,39 @@ class JsonIncremental(IncrementalTransform):
def find_cursor_value(self, row: TDataItem) -> Any:
"""Finds value in row at cursor defined by self.cursor_path.
Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict
Will use compiled JSONPath if present.
Otherwise, reverts to field access if row is dict, Pydantic model, or of other class.
"""
row_value: Any = None
key_exc: Type[Exception] = IncrementalCursorPathHasValueNone
if self._compiled_cursor_path:
row_values = find_values(self._compiled_cursor_path, row)
if row_values:
row_value = row_values[0]
# ignores the other found values, e.g. when the path is $data.items[*].created_at
try:
row_value = find_values(self._compiled_cursor_path, row)[0]
except IndexError:
# empty list so raise a proper exception
row_value = None
key_exc = IncrementalCursorPathMissing
else:
try:
row_value = row[self.cursor_path]
except Exception:
pass
if row_value is None:
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
return row_value
try:
row_value = row[self.cursor_path]
except TypeError:
# supports Pydantic models and other classes
row_value = getattr(row, self.cursor_path)
except (KeyError, AttributeError):
# attr not found so raise a proper exception
row_value = None
key_exc = IncrementalCursorPathMissing

# if we have a value - return it
if row_value is not None:
return row_value

if self.on_cursor_value_missing == "raise":
# raise missing path or None value exception
raise key_exc(self.resource_name, self.cursor_path, row)
elif self.on_cursor_value_missing == "exclude":
return None

def __call__(
self,
Expand All @@ -144,6 +165,12 @@ def __call__(
return row, False, False

row_value = self.find_cursor_value(row)
if row_value is None:
if self.on_cursor_value_missing == "exclude":
return None, False, False
else:
return row, False, False

last_value = self.last_value
last_value_func = self.last_value_func

Expand Down Expand Up @@ -299,6 +326,7 @@ def __call__(

# TODO: Json path support. For now assume the cursor_path is a column name
cursor_path = self.cursor_path

# The new max/min value
try:
# NOTE: datetimes are always pendulum in UTC
Expand All @@ -310,11 +338,16 @@ def __call__(
self.resource_name,
cursor_path,
tbl,
f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths"
f"Column name `{cursor_path}` was not found in the arrow table. Nested JSON paths"
" are not supported for arrow tables and dataframes, the incremental cursor_path"
" must be a column name.",
) from e

if tbl.schema.field(cursor_path).nullable:
tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl)

tbl = tbl_without_null

# If end_value is provided, filter to include table rows that are "less" than end_value
if self.end_value is not None:
try:
Expand Down Expand Up @@ -396,12 +429,28 @@ def __call__(
)
)

# drop the temp unique index before concat and returning
if "_dlt_index" in tbl.schema.names:
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])

if self.on_cursor_value_missing == "include":
if isinstance(tbl, pa.RecordBatch):
assert isinstance(tbl_with_null, pa.RecordBatch)
tbl = pa.Table.from_batches([tbl, tbl_with_null])
else:
tbl = pa.concat_tables([tbl, tbl_with_null])

if len(tbl) == 0:
return None, start_out_of_range, end_out_of_range
try:
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])
except KeyError:
pass
if is_pandas:
return tbl.to_pandas(), start_out_of_range, end_out_of_range
tbl = tbl.to_pandas()
return tbl, start_out_of_range, end_out_of_range

def _process_null_at_cursor_path(self, tbl: "pa.Table") -> Tuple["pa.Table", "pa.Table"]:
mask = pa.compute.is_valid(tbl[self.cursor_path])
rows_without_null = tbl.filter(mask)
rows_with_null = tbl.filter(pa.compute.invert(mask))
if self.on_cursor_value_missing == "raise":
if rows_with_null.num_rows > 0:
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path)
return rows_without_null, rows_with_null
3 changes: 2 additions & 1 deletion dlt/extract/incremental/typing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence
from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence


TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
OnCursorValueMissing = Literal["raise", "include", "exclude"]


class IncrementalColumnState(TypedDict):
Expand Down
69 changes: 61 additions & 8 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ than `end_value`.

:::caution
In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close
generator associated with a row that is out of range. You can still use still call `can_close()` method on
generator associated with a row that is out of range. You can still call the `can_close()` method on
incremental and exit yield loop when true.
:::

Expand Down Expand Up @@ -907,22 +907,75 @@ Consider the example below for reading incremental loading parameters from "conf
```
`id_after` incrementally stores the latest `cursor_path` value for future pipeline runs.

### Loading NULL values in the incremental cursor field
### Loading when incremental cursor path is missing or value is None/NULL

When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`.
For example, the following source data will raise an error:
You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`.

When loading incrementally with the default settings, there are two assumptions:
1. each row contains the cursor path
2. each row is expected to contain a value at the cursor path that is not `None`.

For example, the two following source data will raise an error:
```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at")):
def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2, "updated_at": 2},
{"id": 2, "created_at": 2}, # cursor field is missing
]

list(some_data_without_cursor_path())

@dlt.resource
def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_at")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 3, "created_at": 4, "updated_at": None}, # value at cursor field is None
]

list(some_data_without_cursor_value())
```


To process a data set where some records do not include the incremental cursor path or where the values at the cursor path are `None,` there are the following four options:

1. Configure the incremental load to raise an exception in case there is a row where the cursor path is missing or has the value `None` using `incremental(..., on_cursor_value_missing="raise")`. This is the default behavior.
2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`.
3. Configure the incremental load to exclude the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="exclude")`.
4. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None`. [See docs below](#transform-records-before-incremental-processing)

Here is an example of including rows where the incremental cursor value is missing or `None`:
```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
]

result = list(some_data())
assert len(result) == 3
assert result[1] == {"id": 2, "created_at": 2}
assert result[2] == {"id": 3, "created_at": 4, "updated_at": None}
```

If you do not want to import records without the cursor path or where the value at the cursor path is `None` use the following incremental configuration:

```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="exclude")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
]

list(some_data())
result = list(some_data())
assert len(result) == 1
```

### Transform records before incremental processing
If you want to load data that includes `None` values you can transform the records before the incremental processing.
You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data).

Expand Down Expand Up @@ -1162,4 +1215,4 @@ sources:
}
```

Verify that the `last_value` is updated between pipeline runs.
Verify that the `last_value` is updated between pipeline runs.
Loading

0 comments on commit c4c9195

Please sign in to comment.