Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LanceDB - Remove Orphaned Chunks #1620

Open
wants to merge 145 commits into
base: devel
Choose a base branch
from

Conversation

Pipboyguy
Copy link
Collaborator

@Pipboyguy Pipboyguy commented Jul 21, 2024

Description

This PR lays the groundwork for managing chunked documents and their embeddings efficiently in LanceDB, focusing merge with referential integrity. This PR does not implement document chunking/splitting, which will be addressed in #1615.

  • Automatically remove orphaned chunks when the parent document is updated or deleted.

Related Issues

Signed-off-by: Marcel Coetzee <[email protected]>
@Pipboyguy Pipboyguy linked an issue Jul 21, 2024 that may be closed by this pull request
Copy link

netlify bot commented Jul 21, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit a5a1657
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/66f3129615f22e00080eaf5c

@Pipboyguy Pipboyguy self-assigned this Jul 21, 2024
@Pipboyguy Pipboyguy added bug Something isn't working destination Issue related to new destinations labels Jul 21, 2024
Signed-off-by: Marcel Coetzee <[email protected]>
@Pipboyguy Pipboyguy requested review from sh-rp and rudolfix July 21, 2024 21:21
# Remove orphaned parent IDs.
if parent_table_name:
try:
parent_tbl = db_client.open_table(parent_table_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not work, there is no guarantee that the parent table is fully loaded (with all possible load jobs) at the point in time of the execution of the child table job. If we do it this way, we will need table chain followup jobs, similar to the way merge jobs are done. You can have a look at the default sql client for that.

Copy link
Collaborator Author

@Pipboyguy Pipboyguy Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduled orphan removals separately in LanceDBRemoveOrphansJob

"Couldn't open lancedb database. Batch WILL BE RETRIED"
) from e

parent_ids = set(pc.unique(parent_tbl.to_arrow()["_dlt_id"]).to_pylist())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this approach will work well for large dataset..

Copy link
Collaborator Author

@Pipboyguy Pipboyguy Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subqueries don't work so we can't use the datafusion sql engine here. It will have to be batched. there will have to be some processing done client side

Copy link
Collaborator Author

@Pipboyguy Pipboyguy Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sh-rp Made this easier on the client by utilizing lance projection push-down, only pulling the columns we need

@sh-rp
Copy link
Collaborator

sh-rp commented Jul 23, 2024

Thanks for starting work on this. I am wondering if it would be possible to get this to work without relying on the parent tables so that adding a parent table is optional in this scenario. Theoretically it should be possible to use the dlt_load_id to discover outdated embeddings, but I have not thought it through 100%. Do you think the following is somehow possible assuming we are only using 1 table:

  • Every "original" document has a unique ID which will be inherited as a column by all embeddings (it will just be sent as column from the resource but we need a hint for the schema to identify this column)
  • We have this compound key created from the original document id and the row hash for doing the merge update the way you already do it (I think)
  • After the full table is loaded, we run a follow up job, which will identify each "original" document id that has new data loaded during this run (we can find those with the dlt_load_id) and then will do a delete on all records that match these original document IDs not do NOT match the current load id.

@Pipboyguy
Copy link
Collaborator Author

Pipboyguy commented Jul 27, 2024

@sh-rp I've added this hint to be used in with root table orphan removal. It works and is absolutely a valid strategy!

If the user wants to remove root table orphans, they need to explicitly define the hint as mentioned. Surprisingly, the primary and merge key works for nested tables though.

Regardless, in order to avoid confusion, I made the document id field hint raise an exception on merge disposition if primary key is also defined as this leads to confusing behaviour.

…-efficient-update-strategy-for-chunked-documents
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
Signed-off-by: Marcel Coetzee <[email protected]>
…efficient-update-strategy-for-chunked-documents

# Conflicts:
#	dlt/destinations/impl/lancedb/lancedb_adapter.py
#	tests/load/lancedb/test_merge.py
Signed-off-by: Marcel Coetzee <[email protected]>
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we are getting there!

@@ -192,11 +206,14 @@ def upload_batch(
elif write_disposition == "replace":
tbl.add(records, mode="overwrite")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot overwrite tables here. what if you have many jobs for a single table? they will override themselves. please rewrite your tests for replace to generate many rows and use

os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "2"
os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "2"

which will make sure that you get many jobs. (make sure in your test that you have many jobs per table)

dlt already takes care of truncating the right tables in initialize_storage. other destinations simply do append or merge here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

with FileStorage.open_zipsafe_ro(file_path, mode="rb") as f:
payload_arrow_table: pa.Table = pq.read_table(f)

if target_is_root_table:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is really good but IMO there's a mistake when deleting nested tables:

  • to delete orphan from root table we use merge key of the root table (good)
  • to delete from child table we use all dlt_ids that you can find in root table - not in child table why? because if you remove (1) all nested elements in root table you will not delete all orphans (2) or you remove one row from root table you also won't find the proper root key

please write tests for that.

also could you take taxi dataset and try to merge all rows (where IN clause will be insane long). I wonder what happens: will that be o(n^2) operation or they will do a hash table on IN clause

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#%%
import time

import lancedb
import matplotlib.pyplot as plt
import numpy as np
import pyarrow as pa
#%%
dim = 1536
num_rows = 1_000_000
batch_size = 10_000
lancedb_path = "/tmp/.lancedb"
table_name = "vectors"
#%%
def next_batch(batch_size, offset):
    values = pa.array(np.random.rand(dim * batch_size).astype('float32'))
    return pa.table({
        'id': pa.array([offset + j for j in range(batch_size)]),
        'vector': pa.FixedSizeListArray.from_arrays(values, dim),
        'metric': pa.array(np.random.rand(batch_size)),
    }).to_batches()[0]


def batch_iter(num_rows):
    i = 0
    while i < num_rows:
        current_batch_size = min(batch_size, num_rows - i)
        yield next_batch(current_batch_size, i)
        i += current_batch_size


def create_filter_condition(field_name: str, values: np.ndarray) -> str:
    return f"{field_name} IN ({', '.join(map(str, values))})"
#%%
db = lancedb.connect(lancedb_path)
schema = next_batch(1, 0).schema
table = db.create_table(table_name, data=batch_iter(num_rows), schema=schema, mode="overwrite")

in_clause_sizes = [1000, 5000, 10000, 50000, 100000]
execution_times_no_index = []
execution_times_with_index = []

# Measure execution times without an index.
for size in in_clause_sizes:
    unique_ids = np.random.choice(num_rows, size, replace=False)
    filter_condition = create_filter_condition('id', unique_ids)

    start_time = time.time()
    _ = table.search().where(filter_condition).limit(10).to_pandas()
    end_time = time.time()

    execution_time_no_index = end_time - start_time
    execution_times_no_index.append(execution_time_no_index)

    print(f"Without Index - IN clause size: {size}, Execution time: {execution_time_no_index:.2f} seconds")

# Measure execution times with index.
table.create_scalar_index("id", index_type="BTREE")
for size in in_clause_sizes:
    unique_ids = np.random.choice(num_rows, size, replace=False)
    filter_condition = create_filter_condition('id', unique_ids)

    start_time = time.time()
    _ = table.search().where(filter_condition).limit(10).to_pandas()
    end_time = time.time()

    execution_time_with_index = end_time - start_time
    execution_times_with_index.append(execution_time_with_index)

    print(f"With Index - IN clause size: {size}, Execution time: {execution_time_with_index:.2f} seconds")
#%%
plt.figure(figsize=(10, 6))
plt.plot(in_clause_sizes, execution_times_no_index, marker='o', label='Without Index', color='blue')
plt.plot(in_clause_sizes, execution_times_with_index, marker='*', label='With Index', color='red')
plt.title('Execution Time vs IN Clause Size')
plt.xlabel('Number of IDs in IN Clause')
plt.ylabel('Execution Time (seconds)')
plt.grid(True)
plt.legend()
plt.show()
#%%

image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting a scalar index does seem to offers constant time complexity. Clearly this is worth investigating for future PRs.

dlt/destinations/impl/lancedb/lancedb_client.py Outdated Show resolved Hide resolved
…egy-for-chunked-documents

# Conflicts:
#	dlt/destinations/impl/lancedb/lancedb_client.py
#	docs/website/docs/dlt-ecosystem/destinations/lancedb.md
#	poetry.lock
#	tests/load/lancedb/test_pipeline.py
#	tests/load/lancedb/utils.py
… conditions with replace disposition

Signed-off-by: Marcel Coetzee <[email protected]>
…egy-for-chunked-documents

# Conflicts:
#	poetry.lock
Signed-off-by: Marcel Coetzee <[email protected]>
…egy-for-chunked-documents

# Conflicts:
#	dlt/destinations/impl/lancedb/factory.py
#	dlt/destinations/impl/lancedb/lancedb_client.py
#	dlt/destinations/impl/lancedb/schema.py
#	poetry.lock
#	tests/load/lancedb/test_pipeline.py
Signed-off-by: Marcel Coetzee <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working destination Issue related to new destinations
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

Chunk and Embedding Management in LanceDB
3 participants