From d207aa840f6fe565f8e39e9be034bea003f5a01f Mon Sep 17 00:00:00 2001 From: jacketsj Date: Sun, 13 Oct 2024 18:41:52 -0700 Subject: [PATCH] feat: one-pass IVF_PQ accelerated builds (#3001) This feature improves disk IO dependence, but it is quite limited. This only works if the index type is IVF_PQ, and it will not work efficiently for local PQ in the future (unless we store _all_ the PQ models in VRAM). Importantly, this allows us to bypass local temp storage for storing residuals. However, this still stores PQ codes locally temporarily due to how we've implemented accelerator support, but these are much smaller (exact ratio depends on params). I tested on my local machine, which is sufficiently fast that the accelerated builds are mostly IO limited (but IO is also fast). I used wikipedia-40M New feature disabled: ![results_static_20241011_224535_plot_dataset_wikipedia-few-queries_k_10](https://github.com/user-attachments/assets/9a9285e1-1814-4215-a4c9-2a3f3a16c874) ivf training time: 52s ivf transform time: 89s pq training time: 18s pq assignment time: 143s create_index rust time: 8.9s New feature enabled: ![results_static_20241011_203303_plot_dataset_wikipedia-few-queries_k_10](https://github.com/user-attachments/assets/9d94f50b-e3b6-42f8-8357-3cb477e6279b) combined training time: 63.7s (not actually sure why this is faster, but it's not the big part anyway) combined transform time: 158.8s create_index rust time: 8.6s Improvement should be more noticeable for bigger datasets, as usual. --- python/python/benchmarks/test_index.py | 52 +++++++ python/python/lance/dataset.py | 71 ++++++++- python/python/lance/vector.py | 207 ++++++++++++++++++++++++- python/python/tests/test_dataset.py | 141 +++++++++++++++++ 4 files changed, 455 insertions(+), 16 deletions(-) diff --git a/python/python/benchmarks/test_index.py b/python/python/benchmarks/test_index.py index a8a8fa764e..1c96e14fa5 100644 --- a/python/python/benchmarks/test_index.py +++ b/python/python/benchmarks/test_index.py @@ -54,6 +54,41 @@ def test_create_ivf_pq(test_dataset, benchmark): ) +@pytest.mark.benchmark(group="create_index") +def test_create_ivf_pq_torch_cpu(test_dataset, benchmark): + from lance.dependencies import torch + + benchmark( + test_dataset.create_index, + column="vector", + index_type="IVF_PQ", + metric_type="L2", + num_partitions=8, + num_sub_vectors=2, + num_bits=8, + replace=True, + accelerator=torch.device("cpu"), + ) + + +@pytest.mark.benchmark(group="create_index") +def test_create_ivf_pq_torch_cpu_one_pass(test_dataset, benchmark): + from lance.dependencies import torch + + benchmark( + test_dataset.create_index, + column="vector", + index_type="IVF_PQ", + metric_type="L2", + num_partitions=8, + num_sub_vectors=2, + num_bits=8, + replace=True, + accelerator=torch.device("cpu"), + one_pass_ivfpq=True, + ) + + @pytest.mark.benchmark(group="create_index") @pytest.mark.cuda def test_create_ivf_pq_cuda(test_dataset, benchmark): @@ -70,6 +105,23 @@ def test_create_ivf_pq_cuda(test_dataset, benchmark): ) +@pytest.mark.benchmark(group="create_index") +@pytest.mark.cuda +def test_create_ivf_pq_cuda_one_pass(test_dataset, benchmark): + benchmark( + test_dataset.create_index, + column="vector", + index_type="IVF_PQ", + metric_type="L2", + num_partitions=8, + num_sub_vectors=2, + num_bits=8, + accelerator="cuda", + replace=True, + one_pass_ivfpq=True, + ) + + @pytest.mark.benchmark(group="optimize_index") @pytest.mark.parametrize("num_partitions", [256, 512]) @pytest.mark.parametrize("num_small_indexes", [5]) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index f246a03572..f450992b0c 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1448,6 +1448,7 @@ def create_index( precomputed_partition_dataset: Optional[str] = None, storage_options: Optional[Dict[str, str]] = None, filter_nan: bool = True, + one_pass_ivfpq: bool = False, **kwargs, ) -> LanceDataset: """Create index on column. @@ -1508,6 +1509,8 @@ def create_index( Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost. + one_pass_ivfpq: bool + Defaults to False. If enabled, index type must be "IVF_PQ". Reduces disk IO. kwargs : Parameters passed to the index building process. @@ -1631,6 +1634,58 @@ def create_index( raise NotImplementedError( f"Only {valid_index_types} index types supported. " f"Got {index_type}" ) + if index_type != "IVF_PQ" and one_pass_ivfpq: + raise ValueError( + f'one_pass_ivfpq requires index_type="IVF_PQ", got {index_type}' + ) + + # Handle timing for various parts of accelerated builds + timers = {} + if one_pass_ivfpq and accelerator is not None: + from .vector import ( + one_pass_assign_ivf_pq_on_accelerator, + one_pass_train_ivf_pq_on_accelerator, + ) + + logging.info("Doing one-pass ivfpq accelerated computations") + + timers["ivf+pq_train:start"] = time.time() + ivf_centroids, ivf_kmeans, pq_codebook, pq_kmeans_list = ( + one_pass_train_ivf_pq_on_accelerator( + self, + column[0], + num_partitions, + metric, + accelerator, + num_sub_vectors=num_sub_vectors, + batch_size=20480, + filter_nan=filter_nan, + ) + ) + timers["ivf+pq_train:end"] = time.time() + ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] + logging.info("ivf+pq training time: %ss", ivfpq_train_time) + timers["ivf+pq_assign:start"] = time.time() + shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator( + self, + column[0], + metric, + accelerator, + ivf_kmeans, + pq_kmeans_list, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_assign:end"] = time.time() + ivfpq_assign_time = ( + timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] + ) + logging.info("ivf+pq transform time: %ss", ivfpq_assign_time) + + kwargs["precomputed_shuffle_buffers"] = shuffle_buffers + kwargs["precomputed_shuffle_buffers_path"] = os.path.join( + shuffle_output_dir, "data" + ) if index_type.startswith("IVF"): if (ivf_centroids is not None) and (ivf_centroids_file is not None): raise ValueError( @@ -1659,9 +1714,6 @@ def create_index( ) kwargs["num_partitions"] = num_partitions - # Handle timing for various parts of accelerated builds - timers = {} - if (precomputed_partition_dataset is not None) and (ivf_centroids is None): raise ValueError( "ivf_centroids must be provided when" @@ -1692,7 +1744,7 @@ def create_index( ) kwargs["precomputed_partitions_file"] = precomputed_partition_dataset - if accelerator is not None and ivf_centroids is None: + if accelerator is not None and ivf_centroids is None and not one_pass_ivfpq: logging.info("Computing new precomputed partition dataset") # Use accelerator to train ivf centroids from .vector import ( @@ -1773,6 +1825,7 @@ def create_index( pq_codebook is None and accelerator is not None and "precomputed_partitions_file" in kwargs + and not one_pass_ivfpq ): logging.info("Computing new precomputed shuffle buffers for PQ.") partitions_file = kwargs["precomputed_partitions_file"] @@ -1852,13 +1905,15 @@ def create_index( if shuffle_partition_concurrency is not None: kwargs["shuffle_partition_concurrency"] = shuffle_partition_concurrency - times = [] - times.append(time.time()) + timers["final_create_index:start"] = time.time() self._ds.create_index( column, index_type, name, replace, storage_options, kwargs ) - times.append(time.time()) - logging.info("Final create_index time: %ss", times[1] - times[0]) + timers["final_create_index:end"] = time.time() + final_create_index_time = ( + timers["final_create_index:end"] - timers["final_create_index:start"] + ) + logging.info("Final create_index rust time: %ss", final_create_index_time) # Save disk space if "precomputed_shuffle_buffers_path" in kwargs.keys() and os.path.exists( kwargs["precomputed_shuffle_buffers_path"] diff --git a/python/python/lance/vector.py b/python/python/lance/vector.py index 79506adda0..7e3e38151b 100644 --- a/python/python/lance/vector.py +++ b/python/python/lance/vector.py @@ -174,14 +174,22 @@ def train_pq_codebook_on_accelerator( for sub_vector in range(num_sub_vectors): logging.info("Training IVF partitions using GPU(%s)", accelerator) + if num_sub_vectors == 1: + # sampler has different behaviour with one column + init_centroids_slice = init_centroids + else: + init_centroids_slice = init_centroids[field_names[sub_vector]] kmeans_local = KMeans( 256, max_iters=50, metric=metric_type, device=accelerator, - centroids=init_centroids[field_names[sub_vector]], + centroids=init_centroids_slice, ) - kmeans_local.fit(ds_fit, column=field_names[sub_vector]) + if num_sub_vectors == 1: + kmeans_local.fit(ds_fit) + else: + kmeans_local.fit(ds_fit, column=field_names[sub_vector]) ivf_centroids_local = kmeans_local.centroids.cpu().numpy() centroids_list.append(ivf_centroids_local) @@ -418,6 +426,7 @@ def compute_partitions( allow_cuda_tf32: bool = True, num_sub_vectors: Optional[int] = None, filter_nan: bool = True, + sample_size: Optional[int] = None, ) -> str: """Compute partitions for each row using GPU kmeans and spill to disk. @@ -456,6 +465,7 @@ def compute_partitions( batch_size=batch_size, with_row_id=True, columns=[column], + samples=sample_size, filter=filt, ) loader = torch.utils.data.DataLoader( @@ -492,16 +502,21 @@ def compute_partitions( progress.set_description("Assigning partitions") def _partition_assignment() -> Iterable[pa.RecordBatch]: + id_offset = 0 with torch.no_grad(): for batch in loader: - vecs = ( - batch[column] - .to(kmeans.device) - .reshape(-1, kmeans.centroids.shape[1]) - ) + if sample_size is None: + vecs = batch[column] + ids = batch["_rowid"].reshape(-1) + else: + # No row ids with sampling + vecs = batch + ids = torch.arange(id_offset, id_offset + vecs.size(0)) + id_offset += vecs.size(0) + + vecs = vecs.to(kmeans.device).reshape(-1, kmeans.centroids.shape[1]) partitions = kmeans.transform(vecs) - ids = batch["_rowid"].reshape(-1) # this is expected to be true, so just assert assert vecs.shape[0] == ids.shape[0] @@ -561,3 +576,179 @@ def _partition_assignment() -> Iterable[pa.RecordBatch]: logging.info("Saved precomputed partitions to %s", dst_dataset_uri) return str(dst_dataset_uri) + + +def one_pass_train_ivf_pq_on_accelerator( + dataset: LanceDataset, + column: str, + k: int, + metric_type: Literal["l2", "cosine", "dot"], + accelerator: Union[str, "torch.Device"], + num_sub_vectors: int, + batch_size: int = 1024 * 10 * 4, + *, + sample_rate: int = 256, + max_iters: int = 50, + filter_nan: bool = True, +): + centroids, kmeans = train_ivf_centroids_on_accelerator( + dataset, + column, + k, + metric_type, + accelerator, + batch_size, + sample_rate=sample_rate, + max_iters=max_iters, + filter_nan=filter_nan, + ) + dataset_residuals = compute_partitions( + dataset, + column, + kmeans, + batch_size, + num_sub_vectors=num_sub_vectors, + filter_nan=filter_nan, + sample_size=256 * 256, + ) + pq_codebook, kmeans_list = train_pq_codebook_on_accelerator( + dataset_residuals, metric_type, accelerator, num_sub_vectors, batch_size + ) + return centroids, kmeans, pq_codebook, kmeans_list + + +def one_pass_assign_ivf_pq_on_accelerator( + dataset: LanceDataset, + column: str, + metric_type: Literal["l2", "cosine", "dot"], + accelerator: Union[str, "torch.Device"], + ivf_kmeans: Any, # KMeans + pq_kmeans_list: List[Any], # List[KMeans] + dst_dataset_uri: Optional[Union[str, Path]] = None, + batch_size: int = 1024 * 10 * 4, + *, + filter_nan: bool = True, + allow_cuda_tf32: bool = True, +): + """Compute partitions for each row using GPU kmeans and spill to disk. + + Parameters + ---------- + + Returns + ------- + str + The absolute path of the ivfpq codes dataset, as precomputed partition buffers. + """ + torch.backends.cuda.matmul.allow_tf32 = allow_cuda_tf32 + + num_rows = dataset.count_rows() + + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + torch_ds = TorchDataset( + dataset, + batch_size=batch_size, + with_row_id=True, + columns=[column], + filter=filt, + ) + loader = torch.utils.data.DataLoader( + torch_ds, + batch_size=1, + pin_memory=True, + collate_fn=_collate_fn, + ) + + num_sub_vectors = len(pq_kmeans_list) + dim = ivf_kmeans.centroids.shape[1] + subvector_size = dim // num_sub_vectors + + output_schema = pa.schema( + [ + pa.field("_rowid", pa.uint64()), + pa.field("__ivf_part_id", pa.uint32()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ) + + progress = tqdm(total=num_rows) + + progress.set_description("Assigning partitions and computing pq codes") + + def _partition_and_pq_codes_assignment() -> Iterable[pa.RecordBatch]: + with torch.no_grad(): + for batch in loader: + vecs = ( + batch[column] + .to(ivf_kmeans.device) + .reshape(-1, ivf_kmeans.centroids.shape[1]) + ) + + partitions = ivf_kmeans.transform(vecs) + ids = batch["_rowid"].reshape(-1) + + # this is expected to be true, so just assert + assert vecs.shape[0] == ids.shape[0] + + # Ignore any invalid vectors. + mask_gpu = partitions.isfinite() + ids = ids.to(ivf_kmeans.device)[mask_gpu].cpu().reshape(-1) + partitions = partitions[mask_gpu].cpu() + vecs = vecs[mask_gpu] + + residual_vecs = vecs - ivf_kmeans.centroids[partitions] + pq_codes = torch.stack( + [ + pq_kmeans_list[i].transform( + residual_vecs[ + :, i * subvector_size : (i + 1) * subvector_size + ] + ) + for i in range(num_sub_vectors) + ], + dim=1, + ) + pq_codes = pq_codes.to(torch.uint8) + + pq_values = pa.array(pq_codes.cpu().numpy().reshape(-1)) + pq_codes = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors) + part_batch = pa.RecordBatch.from_arrays( + [ids, partitions, pq_codes], + schema=output_schema, + ) + + if len(part_batch) < len(ids): + logging.warning( + "%s vectors are ignored during partition assignment", + len(part_batch) - len(ids), + ) + + progress.update(part_batch.num_rows) + yield part_batch + + rbr = pa.RecordBatchReader.from_batches( + output_schema, _partition_and_pq_codes_assignment() + ) + if dst_dataset_uri is None: + dst_dataset_uri = tempfile.mkdtemp() + ds = write_dataset( + rbr, + dst_dataset_uri, + schema=output_schema, + data_storage_version="legacy", + ) + + progress.close() + + logging.info("Saved precomputed pq_codes to %s", dst_dataset_uri) + + shuffle_buffers = [ + data_file.path() + for frag in ds.get_fragments() + for data_file in frag.data_files() + ] + return dst_dataset_uri, shuffle_buffers diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 8bb4665c9c..ac49b6f90f 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -29,6 +29,7 @@ from lance._dataset.sharded_batch_iterator import ShardedBatchIterator from lance.commit import CommitConflictError from lance.debug import format_fragment +from lance.util import validate_vector_index # Various valid inputs for write_dataset input_schema = pa.schema([pa.field("a", pa.float64()), pa.field("b", pa.int64())]) @@ -2181,6 +2182,146 @@ def test_scan_with_row_ids(tmp_path: Path): assert tbl2["a"] == tbl["a"] +def test_random_dataset_recall_accelerated(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + num_partitions=2, + num_sub_vectors=32, + accelerator=torch.device("cpu"), + ) + validate_vector_index(dataset, "a", pass_threshold=0.5) + + +def test_random_dataset_recall_accelerated_one_pass(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + num_partitions=2, + num_sub_vectors=32, + accelerator=torch.device("cpu"), + one_pass_ivfpq=True, + ) + validate_vector_index(dataset, "a", pass_threshold=0.5) + + +def test_count_index_rows_accelerated(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + # assert we return None for index name that doesn't exist + index_name = "a_idx" + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_unindexed_rows"] + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_indexed_rows"] + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + name=index_name, + num_partitions=2, + num_sub_vectors=1, + accelerator=torch.device("cpu"), + ) + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 0 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + # append some data + new_table = pa.Table.from_pydict( + {"a": [[float(i) for i in range(32)] for _ in range(512)]}, schema=schema + ) + dataset = lance.write_dataset(new_table, base_dir, mode="append") + + # assert rows added since index was created are uncounted + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + +def test_count_index_rows_accelerated_one_pass(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + # assert we return None for index name that doesn't exist + index_name = "a_idx" + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_unindexed_rows"] + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_indexed_rows"] + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + name=index_name, + num_partitions=2, + num_sub_vectors=1, + accelerator=torch.device("cpu"), + one_pass_ivfpq=True, + ) + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 0 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + # append some data + new_table = pa.Table.from_pydict( + {"a": [[float(i) for i in range(32)] for _ in range(512)]}, schema=schema + ) + dataset = lance.write_dataset(new_table, base_dir, mode="append") + + # assert rows added since index was created are uncounted + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + def test_count_index_rows(tmp_path: Path): dims = 32 schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)])