Skip to content

Commit

Permalink
docs: document the merge insert operation in python (#1892)
Browse files Browse the repository at this point in the history
Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
westonpace and wjones127 authored Jan 31, 2024
1 parent 1a67a17 commit eccb929
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 8 deletions.
124 changes: 121 additions & 3 deletions docs/read_and_write.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,128 @@ of Alice and Bob in the same example, we could write:
dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"age": "age + 2"})
.. TODO: Once we implement MERGE, we should make a note that this method shouldn't be used
.. for updating single rows in a loop, and users should instead do bulk updates
.. using MERGE.
If you are trying to update a set of individual rows with new values then it is often
more efficient to use the merge insert operation described below.

.. code-block:: python
import lance
# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20}])
# This works, but is inefficient, see below for a better approach
dataset = lance.dataset("./alice_and_bob.lance")
for idx in range(new_table.num_rows):
name = new_table[0][idx].as_py()
new_age = new_table[1][idx].as_py()
dataset.update({"age": new_age}, where=f"name='{name}'")
Merge Insert
~~~~~~~~~~~~

Lance supports a merge insert operation. This can be used to add new data in bulk
while also (potentially) matching against existing data. This operation can be used
for a number of different use cases.

Bulk Update
^^^^^^^^^^^

The :py:meth:`lance.LanceDataset.update` method is useful for updating rows based on
a filter. However, if we want to replace existing rows with new rows then a merge
insert operation would be more efficient:

.. code-block:: python
import lance
# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will use `name` as the key for matching rows. Merge insert
# uses a JOIN internally and so you typically want this column to
# be a unique key or id of some kind.
dataset.merge_insert("name") \
.when_matched_update_all() \
.execute()
Note that, similar to the update operation, rows that are modified will
be removed and inserted back into the table, changing their position to
the end. Also, the relative order of these rows could change because we
are using a hash-join operation internally.

Insert if not Exists
^^^^^^^^^^^^^^^^^^^^

Sometimes we only want to insert data if we haven't already inserted it
before. This can happen, for example, when we have a batch of data but
we don't know which rows we've added previously and we don't want to
create duplicate rows. We can use the merge insert operation to achieve
this:

.. code-block:: python
import lance
# Bob is already in the table, but Carla is new
new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30},
{"name": "Carla", "age": 37}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will insert Carla but leave Bob unchanged
dataset.merge_insert("name") \
.when_not_matched_insert_all() \
.execute()
Update or Insert (Upsert)
^^^^^^^^^^^^^^^^^^^^^^^^^

Sometimes we want to combine both of the above behaviors. If a row
already exists we want to update it. If the row does not exist we want
to add it. This operation is sometimes called "upsert". We can use
the merge insert operation to do this as well:

.. code-block:: python
import lance
# Change Carla's age and insert David
new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27},
{"name": "David", "age": 42}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will update Carla and insert David
dataset.merge_insert("name") \
.when_matched_update_all() \
.when_not_matched_insert_all() \
.execute()
Replace a Portion of Data
^^^^^^^^^^^^^^^^^^^^^^^^^

A less common, but still useful, behavior can be to replace some region
of existing rows (defined by a filter) with new data. This is similar
to performing both a delete and an insert in a single transaction. For
example:

.. code-block:: python
import lance
new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46},
{"name": "Francene", "age": 44}])
dataset = lance.dataset("./alice_and_bob.lance")
# This will remove anyone above 40 and insert our new data
dataset.merge_insert("name") \
.when_not_matched_insert_all() \
.when_not_matched_by_source_delete("age >= 40") \
.execute()
Expand Down
104 changes: 104 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,59 @@

class MergeInsertBuilder(_MergeInsertBuilder):
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
"""Executes the merge insert operation
There is no return value but the original dataset will be updated.
Parameters
----------
data_obj: ReaderLike
The new data to use as the source table for the operation. This parameter
can be any source of data (e.g. table / dataset) that
:func:`~lance.write_dataset` accepts.
schema: Optional[pa.Schema]
The schema of the data. This only needs to be supplied whenever the data
source is some kind of generator.
"""
reader = _coerce_reader(data_obj, schema)
super(MergeInsertBuilder, self).execute(reader)

# These next three overrides exist only to document the methods

def when_matched_update_all(self):
"""
Configure the operation to update matched rows
After this method is called, when the merge insert operation executes,
any rows that match both the source table and the target table will be
updated. The rows from the target table will be removed and the rows
from the source table will be added.
"""
return super(MergeInsertBuilder, self).when_matched_update_all()

def when_not_matched_insert_all(self):
"""
Configure the operation to insert not matched rows
After this method is called, when the merge insert operation executes,
any rows that exist only in the source table will be inserted into
the target table.
"""
return super(MergeInsertBuilder, self).when_not_matched_insert_all()

def when_not_matched_by_source_delete(self, expr: Optional[str] = None):
"""
Configure the operation to delete source rows that do not match
After this method is called, when the merge insert operation executes,
any rows that exist only in the target table will be deleted. An
optional filter can be specified to limit the scope of the delete
operation. If given (as an SQL filter) then only rows which match
the filter will be deleted.
"""
return super(MergeInsertBuilder, self).when_not_matched_by_source_delete(expr)


class LanceDataset(pa.dataset.Dataset):
"""A dataset in Lance format where the data is stored at the given uri."""
Expand Down Expand Up @@ -647,6 +697,60 @@ def merge_insert(
self,
on: Union[str, Iterable[str]],
):
"""
Returns a builder that can be used to create a "merge insert" operation
This operation can add rows, update rows, and remove rows in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")
The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.
"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data). "Not matched by source" records exist only
in the target table (this is old data).
The builder returned by this method can be used to customize what
should happen for each category of data.
Please note that the data will be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values. The
order of the newly inserted rows may fluctuate randomly because a
hash-join operation is used internally.
Parameters
----------
on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.
Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_table)
>>> dataset.to_table().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
"""
return MergeInsertBuilder(self._ds, on)

def update(
Expand Down
10 changes: 5 additions & 5 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,18 +522,18 @@ impl Merger {
// borrow checker (the stream needs to be `sync` since it crosses an await point)
let mut deleted_row_ids = self.deleted_rows.lock().unwrap();

if self.params.insert_not_matched {
let not_matched = arrow::compute::filter_record_batch(&batch, &right_only)?;
let not_matched = not_matched.project(&right_cols)?;
batches.push(Ok(not_matched));
}
if self.params.update_matched {
let matched = arrow::compute::filter_record_batch(&batch, &in_both)?;
let row_ids = matched.column(row_id_col).as_primitive::<UInt64Type>();
deleted_row_ids.extend(row_ids.values());
let matched = matched.project(&right_cols)?;
batches.push(Ok(matched));
}
if self.params.insert_not_matched {
let not_matched = arrow::compute::filter_record_batch(&batch, &right_only)?;
let not_matched = not_matched.project(&right_cols)?;
batches.push(Ok(not_matched));
}
match self.params.delete_not_matched_by_source {
WhenNotMatchedBySource::Delete => {
let unmatched = arrow::compute::filter(batch.column(row_id_col), &left_only)?;
Expand Down

0 comments on commit eccb929

Please sign in to comment.