diff --git a/python/python/benchmarks/test_index.py b/python/python/benchmarks/test_index.py index a8a8fa764e..1c96e14fa5 100644 --- a/python/python/benchmarks/test_index.py +++ b/python/python/benchmarks/test_index.py @@ -54,6 +54,41 @@ def test_create_ivf_pq(test_dataset, benchmark): ) +@pytest.mark.benchmark(group="create_index") +def test_create_ivf_pq_torch_cpu(test_dataset, benchmark): + from lance.dependencies import torch + + benchmark( + test_dataset.create_index, + column="vector", + index_type="IVF_PQ", + metric_type="L2", + num_partitions=8, + num_sub_vectors=2, + num_bits=8, + replace=True, + accelerator=torch.device("cpu"), + ) + + +@pytest.mark.benchmark(group="create_index") +def test_create_ivf_pq_torch_cpu_one_pass(test_dataset, benchmark): + from lance.dependencies import torch + + benchmark( + test_dataset.create_index, + column="vector", + index_type="IVF_PQ", + metric_type="L2", + num_partitions=8, + num_sub_vectors=2, + num_bits=8, + replace=True, + accelerator=torch.device("cpu"), + one_pass_ivfpq=True, + ) + + @pytest.mark.benchmark(group="create_index") @pytest.mark.cuda def test_create_ivf_pq_cuda(test_dataset, benchmark): @@ -70,6 +105,23 @@ def test_create_ivf_pq_cuda(test_dataset, benchmark): ) +@pytest.mark.benchmark(group="create_index") +@pytest.mark.cuda +def test_create_ivf_pq_cuda_one_pass(test_dataset, benchmark): + benchmark( + test_dataset.create_index, + column="vector", + index_type="IVF_PQ", + metric_type="L2", + num_partitions=8, + num_sub_vectors=2, + num_bits=8, + accelerator="cuda", + replace=True, + one_pass_ivfpq=True, + ) + + @pytest.mark.benchmark(group="optimize_index") @pytest.mark.parametrize("num_partitions", [256, 512]) @pytest.mark.parametrize("num_small_indexes", [5]) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index f246a03572..f450992b0c 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1448,6 +1448,7 @@ def create_index( precomputed_partition_dataset: Optional[str] = None, storage_options: Optional[Dict[str, str]] = None, filter_nan: bool = True, + one_pass_ivfpq: bool = False, **kwargs, ) -> LanceDataset: """Create index on column. @@ -1508,6 +1509,8 @@ def create_index( Defaults to True. False is UNSAFE, and will cause a crash if any null/nan values are present (and otherwise will not). Disables the null filter used for nullable columns. Obtains a small speed boost. + one_pass_ivfpq: bool + Defaults to False. If enabled, index type must be "IVF_PQ". Reduces disk IO. kwargs : Parameters passed to the index building process. @@ -1631,6 +1634,58 @@ def create_index( raise NotImplementedError( f"Only {valid_index_types} index types supported. " f"Got {index_type}" ) + if index_type != "IVF_PQ" and one_pass_ivfpq: + raise ValueError( + f'one_pass_ivfpq requires index_type="IVF_PQ", got {index_type}' + ) + + # Handle timing for various parts of accelerated builds + timers = {} + if one_pass_ivfpq and accelerator is not None: + from .vector import ( + one_pass_assign_ivf_pq_on_accelerator, + one_pass_train_ivf_pq_on_accelerator, + ) + + logging.info("Doing one-pass ivfpq accelerated computations") + + timers["ivf+pq_train:start"] = time.time() + ivf_centroids, ivf_kmeans, pq_codebook, pq_kmeans_list = ( + one_pass_train_ivf_pq_on_accelerator( + self, + column[0], + num_partitions, + metric, + accelerator, + num_sub_vectors=num_sub_vectors, + batch_size=20480, + filter_nan=filter_nan, + ) + ) + timers["ivf+pq_train:end"] = time.time() + ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] + logging.info("ivf+pq training time: %ss", ivfpq_train_time) + timers["ivf+pq_assign:start"] = time.time() + shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator( + self, + column[0], + metric, + accelerator, + ivf_kmeans, + pq_kmeans_list, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_assign:end"] = time.time() + ivfpq_assign_time = ( + timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] + ) + logging.info("ivf+pq transform time: %ss", ivfpq_assign_time) + + kwargs["precomputed_shuffle_buffers"] = shuffle_buffers + kwargs["precomputed_shuffle_buffers_path"] = os.path.join( + shuffle_output_dir, "data" + ) if index_type.startswith("IVF"): if (ivf_centroids is not None) and (ivf_centroids_file is not None): raise ValueError( @@ -1659,9 +1714,6 @@ def create_index( ) kwargs["num_partitions"] = num_partitions - # Handle timing for various parts of accelerated builds - timers = {} - if (precomputed_partition_dataset is not None) and (ivf_centroids is None): raise ValueError( "ivf_centroids must be provided when" @@ -1692,7 +1744,7 @@ def create_index( ) kwargs["precomputed_partitions_file"] = precomputed_partition_dataset - if accelerator is not None and ivf_centroids is None: + if accelerator is not None and ivf_centroids is None and not one_pass_ivfpq: logging.info("Computing new precomputed partition dataset") # Use accelerator to train ivf centroids from .vector import ( @@ -1773,6 +1825,7 @@ def create_index( pq_codebook is None and accelerator is not None and "precomputed_partitions_file" in kwargs + and not one_pass_ivfpq ): logging.info("Computing new precomputed shuffle buffers for PQ.") partitions_file = kwargs["precomputed_partitions_file"] @@ -1852,13 +1905,15 @@ def create_index( if shuffle_partition_concurrency is not None: kwargs["shuffle_partition_concurrency"] = shuffle_partition_concurrency - times = [] - times.append(time.time()) + timers["final_create_index:start"] = time.time() self._ds.create_index( column, index_type, name, replace, storage_options, kwargs ) - times.append(time.time()) - logging.info("Final create_index time: %ss", times[1] - times[0]) + timers["final_create_index:end"] = time.time() + final_create_index_time = ( + timers["final_create_index:end"] - timers["final_create_index:start"] + ) + logging.info("Final create_index rust time: %ss", final_create_index_time) # Save disk space if "precomputed_shuffle_buffers_path" in kwargs.keys() and os.path.exists( kwargs["precomputed_shuffle_buffers_path"] diff --git a/python/python/lance/vector.py b/python/python/lance/vector.py index 79506adda0..7e3e38151b 100644 --- a/python/python/lance/vector.py +++ b/python/python/lance/vector.py @@ -174,14 +174,22 @@ def train_pq_codebook_on_accelerator( for sub_vector in range(num_sub_vectors): logging.info("Training IVF partitions using GPU(%s)", accelerator) + if num_sub_vectors == 1: + # sampler has different behaviour with one column + init_centroids_slice = init_centroids + else: + init_centroids_slice = init_centroids[field_names[sub_vector]] kmeans_local = KMeans( 256, max_iters=50, metric=metric_type, device=accelerator, - centroids=init_centroids[field_names[sub_vector]], + centroids=init_centroids_slice, ) - kmeans_local.fit(ds_fit, column=field_names[sub_vector]) + if num_sub_vectors == 1: + kmeans_local.fit(ds_fit) + else: + kmeans_local.fit(ds_fit, column=field_names[sub_vector]) ivf_centroids_local = kmeans_local.centroids.cpu().numpy() centroids_list.append(ivf_centroids_local) @@ -418,6 +426,7 @@ def compute_partitions( allow_cuda_tf32: bool = True, num_sub_vectors: Optional[int] = None, filter_nan: bool = True, + sample_size: Optional[int] = None, ) -> str: """Compute partitions for each row using GPU kmeans and spill to disk. @@ -456,6 +465,7 @@ def compute_partitions( batch_size=batch_size, with_row_id=True, columns=[column], + samples=sample_size, filter=filt, ) loader = torch.utils.data.DataLoader( @@ -492,16 +502,21 @@ def compute_partitions( progress.set_description("Assigning partitions") def _partition_assignment() -> Iterable[pa.RecordBatch]: + id_offset = 0 with torch.no_grad(): for batch in loader: - vecs = ( - batch[column] - .to(kmeans.device) - .reshape(-1, kmeans.centroids.shape[1]) - ) + if sample_size is None: + vecs = batch[column] + ids = batch["_rowid"].reshape(-1) + else: + # No row ids with sampling + vecs = batch + ids = torch.arange(id_offset, id_offset + vecs.size(0)) + id_offset += vecs.size(0) + + vecs = vecs.to(kmeans.device).reshape(-1, kmeans.centroids.shape[1]) partitions = kmeans.transform(vecs) - ids = batch["_rowid"].reshape(-1) # this is expected to be true, so just assert assert vecs.shape[0] == ids.shape[0] @@ -561,3 +576,179 @@ def _partition_assignment() -> Iterable[pa.RecordBatch]: logging.info("Saved precomputed partitions to %s", dst_dataset_uri) return str(dst_dataset_uri) + + +def one_pass_train_ivf_pq_on_accelerator( + dataset: LanceDataset, + column: str, + k: int, + metric_type: Literal["l2", "cosine", "dot"], + accelerator: Union[str, "torch.Device"], + num_sub_vectors: int, + batch_size: int = 1024 * 10 * 4, + *, + sample_rate: int = 256, + max_iters: int = 50, + filter_nan: bool = True, +): + centroids, kmeans = train_ivf_centroids_on_accelerator( + dataset, + column, + k, + metric_type, + accelerator, + batch_size, + sample_rate=sample_rate, + max_iters=max_iters, + filter_nan=filter_nan, + ) + dataset_residuals = compute_partitions( + dataset, + column, + kmeans, + batch_size, + num_sub_vectors=num_sub_vectors, + filter_nan=filter_nan, + sample_size=256 * 256, + ) + pq_codebook, kmeans_list = train_pq_codebook_on_accelerator( + dataset_residuals, metric_type, accelerator, num_sub_vectors, batch_size + ) + return centroids, kmeans, pq_codebook, kmeans_list + + +def one_pass_assign_ivf_pq_on_accelerator( + dataset: LanceDataset, + column: str, + metric_type: Literal["l2", "cosine", "dot"], + accelerator: Union[str, "torch.Device"], + ivf_kmeans: Any, # KMeans + pq_kmeans_list: List[Any], # List[KMeans] + dst_dataset_uri: Optional[Union[str, Path]] = None, + batch_size: int = 1024 * 10 * 4, + *, + filter_nan: bool = True, + allow_cuda_tf32: bool = True, +): + """Compute partitions for each row using GPU kmeans and spill to disk. + + Parameters + ---------- + + Returns + ------- + str + The absolute path of the ivfpq codes dataset, as precomputed partition buffers. + """ + torch.backends.cuda.matmul.allow_tf32 = allow_cuda_tf32 + + num_rows = dataset.count_rows() + + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + torch_ds = TorchDataset( + dataset, + batch_size=batch_size, + with_row_id=True, + columns=[column], + filter=filt, + ) + loader = torch.utils.data.DataLoader( + torch_ds, + batch_size=1, + pin_memory=True, + collate_fn=_collate_fn, + ) + + num_sub_vectors = len(pq_kmeans_list) + dim = ivf_kmeans.centroids.shape[1] + subvector_size = dim // num_sub_vectors + + output_schema = pa.schema( + [ + pa.field("_rowid", pa.uint64()), + pa.field("__ivf_part_id", pa.uint32()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ) + + progress = tqdm(total=num_rows) + + progress.set_description("Assigning partitions and computing pq codes") + + def _partition_and_pq_codes_assignment() -> Iterable[pa.RecordBatch]: + with torch.no_grad(): + for batch in loader: + vecs = ( + batch[column] + .to(ivf_kmeans.device) + .reshape(-1, ivf_kmeans.centroids.shape[1]) + ) + + partitions = ivf_kmeans.transform(vecs) + ids = batch["_rowid"].reshape(-1) + + # this is expected to be true, so just assert + assert vecs.shape[0] == ids.shape[0] + + # Ignore any invalid vectors. + mask_gpu = partitions.isfinite() + ids = ids.to(ivf_kmeans.device)[mask_gpu].cpu().reshape(-1) + partitions = partitions[mask_gpu].cpu() + vecs = vecs[mask_gpu] + + residual_vecs = vecs - ivf_kmeans.centroids[partitions] + pq_codes = torch.stack( + [ + pq_kmeans_list[i].transform( + residual_vecs[ + :, i * subvector_size : (i + 1) * subvector_size + ] + ) + for i in range(num_sub_vectors) + ], + dim=1, + ) + pq_codes = pq_codes.to(torch.uint8) + + pq_values = pa.array(pq_codes.cpu().numpy().reshape(-1)) + pq_codes = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors) + part_batch = pa.RecordBatch.from_arrays( + [ids, partitions, pq_codes], + schema=output_schema, + ) + + if len(part_batch) < len(ids): + logging.warning( + "%s vectors are ignored during partition assignment", + len(part_batch) - len(ids), + ) + + progress.update(part_batch.num_rows) + yield part_batch + + rbr = pa.RecordBatchReader.from_batches( + output_schema, _partition_and_pq_codes_assignment() + ) + if dst_dataset_uri is None: + dst_dataset_uri = tempfile.mkdtemp() + ds = write_dataset( + rbr, + dst_dataset_uri, + schema=output_schema, + data_storage_version="legacy", + ) + + progress.close() + + logging.info("Saved precomputed pq_codes to %s", dst_dataset_uri) + + shuffle_buffers = [ + data_file.path() + for frag in ds.get_fragments() + for data_file in frag.data_files() + ] + return dst_dataset_uri, shuffle_buffers diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 8bb4665c9c..ac49b6f90f 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -29,6 +29,7 @@ from lance._dataset.sharded_batch_iterator import ShardedBatchIterator from lance.commit import CommitConflictError from lance.debug import format_fragment +from lance.util import validate_vector_index # Various valid inputs for write_dataset input_schema = pa.schema([pa.field("a", pa.float64()), pa.field("b", pa.int64())]) @@ -2181,6 +2182,146 @@ def test_scan_with_row_ids(tmp_path: Path): assert tbl2["a"] == tbl["a"] +def test_random_dataset_recall_accelerated(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + num_partitions=2, + num_sub_vectors=32, + accelerator=torch.device("cpu"), + ) + validate_vector_index(dataset, "a", pass_threshold=0.5) + + +def test_random_dataset_recall_accelerated_one_pass(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + num_partitions=2, + num_sub_vectors=32, + accelerator=torch.device("cpu"), + one_pass_ivfpq=True, + ) + validate_vector_index(dataset, "a", pass_threshold=0.5) + + +def test_count_index_rows_accelerated(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + # assert we return None for index name that doesn't exist + index_name = "a_idx" + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_unindexed_rows"] + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_indexed_rows"] + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + name=index_name, + num_partitions=2, + num_sub_vectors=1, + accelerator=torch.device("cpu"), + ) + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 0 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + # append some data + new_table = pa.Table.from_pydict( + {"a": [[float(i) for i in range(32)] for _ in range(512)]}, schema=schema + ) + dataset = lance.write_dataset(new_table, base_dir, mode="append") + + # assert rows added since index was created are uncounted + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + +def test_count_index_rows_accelerated_one_pass(tmp_path: Path): + dims = 32 + schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)]) + values = pc.random(512 * dims).cast("float32") + table = pa.Table.from_pydict( + {"a": pa.FixedSizeListArray.from_arrays(values, dims)}, schema=schema + ) + + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + # assert we return None for index name that doesn't exist + index_name = "a_idx" + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_unindexed_rows"] + with pytest.raises(KeyError): + dataset.stats.index_stats(index_name)["num_indexed_rows"] + + from lance.dependencies import torch + + # create index and assert no rows are uncounted + dataset.create_index( + "a", + "IVF_PQ", + name=index_name, + num_partitions=2, + num_sub_vectors=1, + accelerator=torch.device("cpu"), + one_pass_ivfpq=True, + ) + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 0 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + # append some data + new_table = pa.Table.from_pydict( + {"a": [[float(i) for i in range(32)] for _ in range(512)]}, schema=schema + ) + dataset = lance.write_dataset(new_table, base_dir, mode="append") + + # assert rows added since index was created are uncounted + assert dataset.stats.index_stats(index_name)["num_unindexed_rows"] == 512 + assert dataset.stats.index_stats(index_name)["num_indexed_rows"] == 512 + + def test_count_index_rows(tmp_path: Path): dims = 32 schema = pa.schema([pa.field("a", pa.list_(pa.float32(), dims), False)])