diff --git a/docs/read_and_write.rst b/docs/read_and_write.rst index e396b3a032..12f4505052 100644 --- a/docs/read_and_write.rst +++ b/docs/read_and_write.rst @@ -135,10 +135,128 @@ of Alice and Bob in the same example, we could write: dataset = lance.dataset("./alice_and_bob.lance") dataset.update({"age": "age + 2"}) -.. TODO: Once we implement MERGE, we should make a note that this method shouldn't be used -.. for updating single rows in a loop, and users should instead do bulk updates -.. using MERGE. +If you are trying to update a set of individual rows with new values then it is often +more efficient to use the merge insert operation described below. +.. code-block:: python + + import lance + + # Change the ages of both Alice and Bob + new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30}, + {"name": "Bob", "age": 20}]) + + # This works, but is inefficient, see below for a better approach + dataset = lance.dataset("./alice_and_bob.lance") + for idx in range(new_table.num_rows): + name = new_table[0][idx].as_py() + new_age = new_table[1][idx].as_py() + dataset.update({"age": new_age}, where=f"name='{name}'") + +Merge Insert +~~~~~~~~~~~~ + +Lance supports a merge insert operation. This can be used to add new data in bulk +while also (potentially) matching against existing data. This operation can be used +for a number of different use cases. + +Bulk Update +^^^^^^^^^^^ + +The :py:meth:`lance.LanceDataset.update` method is useful for updating rows based on +a filter. However, if we want to replace existing rows with new rows then a merge +insert operation would be more efficient: + +.. code-block:: python + + import lance + + # Change the ages of both Alice and Bob + new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30}, + {"name": "Bob", "age": 20}]) + dataset = lance.dataset("./alice_and_bob.lance") + # This will use `name` as the key for matching rows. Merge insert + # uses a JOIN internally and so you typically want this column to + # be a unique key or id of some kind. + dataset.merge_insert("name") \ + .when_matched_update_all() \ + .execute() + +Note that, similar to the update operation, rows that are modified will +be removed and inserted back into the table, changing their position to +the end. Also, the relative order of these rows could change because we +are using a hash-join operation internally. + +Insert if not Exists +^^^^^^^^^^^^^^^^^^^^ + +Sometimes we only want to insert data if we haven't already inserted it +before. This can happen, for example, when we have a batch of data but +we don't know which rows we've added previously and we don't want to +create duplicate rows. We can use the merge insert operation to achieve +this: + +.. code-block:: python + + import lance + + # Bob is already in the table, but Carla is new + new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30}, + {"name": "Carla", "age": 37}]) + + dataset = lance.dataset("./alice_and_bob.lance") + + # This will insert Carla but leave Bob unchanged + dataset.merge_insert("name") \ + .when_not_matched_insert_all() \ + .execute() + +Update or Insert (Upsert) +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sometimes we want to combine both of the above behaviors. If a row +already exists we want to update it. If the row does not exist we want +to add it. This operation is sometimes called "upsert". We can use +the merge insert operation to do this as well: + +.. code-block:: python + + import lance + + # Change Carla's age and insert David + new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27}, + {"name": "David", "age": 42}]) + + dataset = lance.dataset("./alice_and_bob.lance") + + # This will update Carla and insert David + dataset.merge_insert("name") \ + .when_matched_update_all() \ + .when_not_matched_insert_all() \ + .execute() + +Replace a Portion of Data +^^^^^^^^^^^^^^^^^^^^^^^^^ + +A less common, but still useful, behavior can be to replace some region +of existing rows (defined by a filter) with new data. This is similar +to performing both a delete and an insert in a single transaction. For +example: + +.. code-block:: python + + import lance + + new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46}, + {"name": "Francene", "age": 44}]) + + dataset = lance.dataset("./alice_and_bob.lance") + + # This will remove anyone above 40 and insert our new data + dataset.merge_insert("name") \ + .when_not_matched_insert_all() \ + .when_not_matched_by_source_delete("age >= 40") \ + .execute() diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 79d5012f94..ab28dea72c 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -89,9 +89,59 @@ class MergeInsertBuilder(_MergeInsertBuilder): def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None): + """Executes the merge insert operation + + There is no return value but the original dataset will be updated. + + Parameters + ---------- + + data_obj: ReaderLike + The new data to use as the source table for the operation. This parameter + can be any source of data (e.g. table / dataset) that + :func:`~lance.write_dataset` accepts. + schema: Optional[pa.Schema] + The schema of the data. This only needs to be supplied whenever the data + source is some kind of generator. + """ reader = _coerce_reader(data_obj, schema) super(MergeInsertBuilder, self).execute(reader) + # These next three overrides exist only to document the methods + + def when_matched_update_all(self): + """ + Configure the operation to update matched rows + + After this method is called, when the merge insert operation executes, + any rows that match both the source table and the target table will be + updated. The rows from the target table will be removed and the rows + from the source table will be added. + """ + return super(MergeInsertBuilder, self).when_matched_update_all() + + def when_not_matched_insert_all(self): + """ + Configure the operation to insert not matched rows + + After this method is called, when the merge insert operation executes, + any rows that exist only in the source table will be inserted into + the target table. + """ + return super(MergeInsertBuilder, self).when_not_matched_insert_all() + + def when_not_matched_by_source_delete(self, expr: Optional[str] = None): + """ + Configure the operation to delete source rows that do not match + + After this method is called, when the merge insert operation executes, + any rows that exist only in the target table will be deleted. An + optional filter can be specified to limit the scope of the delete + operation. If given (as an SQL filter) then only rows which match + the filter will be deleted. + """ + return super(MergeInsertBuilder, self).when_not_matched_by_source_delete(expr) + class LanceDataset(pa.dataset.Dataset): """A dataset in Lance format where the data is stored at the given uri.""" @@ -647,6 +697,60 @@ def merge_insert( self, on: Union[str, Iterable[str]], ): + """ + Returns a builder that can be used to create a "merge insert" operation + + This operation can add rows, update rows, and remove rows in a single + transaction. It is a very generic tool that can be used to create + behaviors like "insert if not exists", "update or insert (i.e. upsert)", + or even replace a portion of existing data with new data (e.g. replace + all data where month="january") + + The merge insert operation works by combining new data from a + **source table** with existing data in a **target table** by using a + join. There are three categories of records. + + "Matched" records are records that exist in both the source table and + the target table. "Not matched" records exist only in the source table + (e.g. these are new data). "Not matched by source" records exist only + in the target table (this is old data). + + The builder returned by this method can be used to customize what + should happen for each category of data. + + Please note that the data will be reordered as part of this + operation. This is because updated rows will be deleted from the + dataset and then reinserted at the end with the new values. The + order of the newly inserted rows may fluctuate randomly because a + hash-join operation is used internally. + + Parameters + ---------- + + on: Union[str, Iterable[str]] + A column (or columns) to join on. This is how records from the + source table and target table are matched. Typically this is some + kind of key or id column. + + Examples + -------- + >>> import lance + >>> import pyarrow as pa + >>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]}) + >>> dataset = lance.write_dataset(table, "example") + >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) + >>> # Perform a "upsert" operation + >>> dataset.merge_insert("a") \\ + ... .when_matched_update_all() \\ + ... .when_not_matched_insert_all() \\ + ... .execute(new_table) + >>> dataset.to_table().sort_by("a").to_pandas() + a b + 0 1 b + 1 2 x + 2 3 y + 3 4 z + """ return MergeInsertBuilder(self._ds, on) def update( diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 0796c05f2e..db9f62d85a 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -522,11 +522,6 @@ impl Merger { // borrow checker (the stream needs to be `sync` since it crosses an await point) let mut deleted_row_ids = self.deleted_rows.lock().unwrap(); - if self.params.insert_not_matched { - let not_matched = arrow::compute::filter_record_batch(&batch, &right_only)?; - let not_matched = not_matched.project(&right_cols)?; - batches.push(Ok(not_matched)); - } if self.params.update_matched { let matched = arrow::compute::filter_record_batch(&batch, &in_both)?; let row_ids = matched.column(row_id_col).as_primitive::(); @@ -534,6 +529,11 @@ impl Merger { let matched = matched.project(&right_cols)?; batches.push(Ok(matched)); } + if self.params.insert_not_matched { + let not_matched = arrow::compute::filter_record_batch(&batch, &right_only)?; + let not_matched = not_matched.project(&right_cols)?; + batches.push(Ok(not_matched)); + } match self.params.delete_not_matched_by_source { WhenNotMatchedBySource::Delete => { let unmatched = arrow::compute::filter(batch.column(row_id_col), &left_only)?;