Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): support create scaler indexes #2832

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 98 additions & 6 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use jni::JNIEnv;
use lance::dataset::{WriteMode, WriteParams};
use lance::index::vector::{StageParams, VectorIndexParams};
use lance_encoding::version::LanceFileVersion;
use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams, ScalarIndexType};
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
Expand Down Expand Up @@ -117,7 +118,7 @@ pub fn get_index_params(

let vector_index_params_option_object = env
.call_method(
index_params_obj,
&index_params_obj,
"getVectorIndexParams",
"()Ljava/util/Optional;",
&[],
Expand Down Expand Up @@ -249,10 +250,101 @@ pub fn get_index_params(
None
};

match vector_index_params_option {
Some(params) => Ok(Box::new(params) as Box<dyn IndexParams>),
None => Err(Error::input_error(
"VectorIndexParams not present".to_string(),
)),
if let Some(params) = vector_index_params_option {
return Ok(Box::new(params) as Box<dyn IndexParams>);
}

let scalar_index_params_option_object = env
.call_method(
&index_params_obj,
"getScalarIndexParams",
"()Ljava/util/Optional;",
&[],
)?
.l()?;

let scalar_index_params_option = if env
.call_method(&scalar_index_params_option_object, "isPresent", "()Z", &[])?
.z()?
{
let scalar_index_params_obj = env
.call_method(
&scalar_index_params_option_object,
"get",
"()Ljava/lang/Object;",
&[],
)?
.l()?;

let force_index_type: Option<ScalarIndexType> = env.get_optional_from_method(
&scalar_index_params_obj,
"getForceIndexType",
|env, force_index_type_obj| {
let enum_name = env
.call_method(&force_index_type_obj, "name", "()Ljava/lang/String;", &[])?
.l()?;
let enum_str: String = env.get_string(&JString::from(enum_name))?.into();

match enum_str.as_str() {
"BTREE" => Ok(ScalarIndexType::BTree),
"BITMAP" => Ok(ScalarIndexType::Bitmap),
"LABEL_LIST" => Ok(ScalarIndexType::LabelList),
"INVERTED" => Ok(ScalarIndexType::Inverted),
_ => Err(Error::input_error(format!(
"Unknown ScalarIndexType: {}",
enum_str
))),
}
},
)?;
Some(ScalarIndexParams { force_index_type })
} else {
None
};

if let Some(params) = scalar_index_params_option {
return Ok(Box::new(params) as Box<dyn IndexParams>);
}

let inverted_index_params_option_object = env
.call_method(
&index_params_obj,
"getInvertedIndexParams",
"()Ljava/util/Optional;",
&[],
)?
.l()?;

let inverted_index_params_option = if env
.call_method(
&inverted_index_params_option_object,
"isPresent",
"()Z",
&[],
)?
.z()?
{
let inverted_index_params_obj = env
.call_method(
&inverted_index_params_option_object,
"get",
"()Ljava/lang/Object;",
&[],
)?
.l()?;

let with_position =
env.get_boolean_from_method(&inverted_index_params_obj, "isWithPosition")?;
Some(InvertedIndexParams { with_position })
} else {
None
};

if let Some(params) = inverted_index_params_option {
return Ok(Box::new(params) as Box<dyn IndexParams>);
}

Err(Error::input_error(
"No valid index params presented".to_string(),
))?
}
139 changes: 78 additions & 61 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.lancedb.lance;

import com.lancedb.lance.index.IndexParams;
import com.lancedb.lance.index.IndexType;
import com.lancedb.lance.index.ScalarIndexParams;
import com.lancedb.lance.ipc.LanceScanner;
import com.lancedb.lance.ipc.ScanOptions;
import java.io.Closeable;
Expand All @@ -31,12 +30,9 @@
import org.apache.arrow.vector.types.pojo.Schema;

/**
* Class representing a Lance dataset, interfacing with the native lance
* library. This class
* provides functionality to open and manage datasets with native code. The
* native library is loaded
* statically and utilized through native methods. It implements the
* {@link java.io.Closeable}
* Class representing a Lance dataset, interfacing with the native lance library. This class
* provides functionality to open and manage datasets with native code. The native library is loaded
* statically and utilized through native methods. It implements the {@link java.io.Closeable}
* interface to ensure proper resource management.
*/
public class Dataset implements Closeable {
Expand All @@ -51,16 +47,15 @@ public class Dataset implements Closeable {

private final LockManager lockManager = new LockManager();

private Dataset() {
}
private Dataset() {}

/**
* Creates an empty dataset.
*
* @param allocator the buffer allocator
* @param path dataset uri
* @param schema dataset schema
* @param params write params
* @param path dataset uri
* @param schema dataset schema
* @param params write params
* @return Dataset
*/
public static Dataset create(BufferAllocator allocator, String path, Schema schema,
Expand All @@ -71,9 +66,9 @@ public static Dataset create(BufferAllocator allocator, String path, Schema sche
Preconditions.checkNotNull(params);
try (ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
Data.exportSchema(allocator, schema, null, arrowSchema);
Dataset dataset = createWithFfiSchema(arrowSchema.memoryAddress(),
path, params.getMaxRowsPerFile(), params.getMaxRowsPerGroup(),
params.getMaxBytesPerFile(), params.getMode());
Dataset dataset =
createWithFfiSchema(arrowSchema.memoryAddress(), path, params.getMaxRowsPerFile(),
params.getMaxRowsPerGroup(), params.getMaxBytesPerFile(), params.getMode());
dataset.allocator = allocator;
return dataset;
}
Expand All @@ -83,20 +78,19 @@ public static Dataset create(BufferAllocator allocator, String path, Schema sche
* Create a dataset with given stream.
*
* @param allocator buffer allocator
* @param stream arrow stream
* @param path dataset uri
* @param params write parameters
* @param stream arrow stream
* @param path dataset uri
* @param params write parameters
* @return Dataset
*/
public static Dataset create(BufferAllocator allocator, ArrowArrayStream stream,
String path, WriteParams params) {
public static Dataset create(BufferAllocator allocator, ArrowArrayStream stream, String path,
WriteParams params) {
Preconditions.checkNotNull(allocator);
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(params);
Dataset dataset = createWithFfiStream(stream.memoryAddress(), path,
params.getMaxRowsPerFile(), params.getMaxRowsPerGroup(),
params.getMaxBytesPerFile(), params.getMode());
Dataset dataset = createWithFfiStream(stream.memoryAddress(), path, params.getMaxRowsPerFile(),
params.getMaxRowsPerGroup(), params.getMaxBytesPerFile(), params.getMode());
dataset.allocator = allocator;
return dataset;
}
Expand All @@ -122,7 +116,7 @@ public static Dataset open(String path) {
/**
* Open a dataset from the specified path.
*
* @param path file path
* @param path file path
* @param options the open options
* @return Dataset
*/
Expand All @@ -133,7 +127,7 @@ public static Dataset open(String path, ReadOptions options) {
/**
* Open a dataset from the specified path.
*
* @param path file path
* @param path file path
* @param allocator Arrow buffer allocator
* @return Dataset
*/
Expand All @@ -145,8 +139,8 @@ public static Dataset open(String path, BufferAllocator allocator) {
* Open a dataset from the specified path with additional options.
*
* @param allocator Arrow buffer allocator
* @param path file path
* @param options the open options
* @param path file path
* @param options the open options
* @return Dataset
*/
public static Dataset open(BufferAllocator allocator, String path, ReadOptions options) {
Expand All @@ -156,18 +150,17 @@ public static Dataset open(BufferAllocator allocator, String path, ReadOptions o
/**
* Open a dataset from the specified path with additional options.
*
* @param path file path
* @param path file path
* @param options the open options
* @return Dataset
*/
private static Dataset open(BufferAllocator allocator, boolean selfManagedAllocator,
String path, ReadOptions options) {
private static Dataset open(BufferAllocator allocator, boolean selfManagedAllocator, String path,
ReadOptions options) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(allocator);
Preconditions.checkNotNull(options);
Dataset dataset = openNative(path, options.getVersion(),
options.getBlockSize(), options.getIndexCacheSize(),
options.getMetadataCacheSize());
Dataset dataset = openNative(path, options.getVersion(), options.getBlockSize(),
options.getIndexCacheSize(), options.getMetadataCacheSize());
dataset.allocator = allocator;
dataset.selfManagedAllocator = selfManagedAllocator;
return dataset;
Expand All @@ -179,16 +172,15 @@ private static native Dataset openNative(String path, Optional<Integer> version,
/**
* Create a new version of dataset.
*
* @param allocator the buffer allocator
* @param path The file path of the dataset to open.
* @param operation The operation to apply to the dataset.
* @param readVersion The version of the dataset that was used as the base for
* the changes.
* This is not needed for overwrite or restore operations.
* @param allocator the buffer allocator
* @param path The file path of the dataset to open.
* @param operation The operation to apply to the dataset.
* @param readVersion The version of the dataset that was used as the base for the changes. This
* is not needed for overwrite or restore operations.
* @return A new instance of {@link Dataset} linked to the opened dataset.
*/
public static Dataset commit(BufferAllocator allocator, String path,
FragmentOperation operation, Optional<Long> readVersion) {
public static Dataset commit(BufferAllocator allocator, String path, FragmentOperation operation,
Optional<Long> readVersion) {
Preconditions.checkNotNull(allocator);
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(operation);
Expand All @@ -213,8 +205,7 @@ public LanceScanner newScan() {
/**
* Create a new Dataset Scanner.
*
* @param batchSize the scan options with batch size, columns filter, and
* substrait
* @param batchSize the scan options with batch size, columns filter, and substrait
* @return a dataset scanner
*/
public LanceScanner newScan(long batchSize) {
Expand Down Expand Up @@ -262,23 +253,51 @@ public long latestVersion() {
private native long nativeLatestVersion();

/**
* Creates a new index on the dataset.
* Only vector indexes are supported.
* Creates a new index on the dataset. Only vector indexes are supported.
*
* @param columns the columns to index from
* @param columns the columns to index from
* @param indexType the index type
* @param name the name of the created index
* @param params index params
* @param replace whether to replace the existing index
* @param name the name of the created index
* @param params index params
* @param replace whether to replace the existing index
*/
public void createIndex(List<String> columns, IndexType indexType, Optional<String> name,
IndexParams params, boolean replace) {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
params = adjustParamsForIndexType(indexType, params);
nativeCreateIndex(columns, indexType.getValue(), name, params, replace);
}
}

/**
* Adjusts the IndexParams based on the given IndexType.
*
* @param indexType The type of index to create.
* @param params The original index parameters.
* @return Adjusted IndexParams suitable for the given IndexType.
*/
private IndexParams adjustParamsForIndexType(IndexType indexType, IndexParams params) {
ScalarIndexParams.ScalarIndexType scalarIndexType;

switch (indexType) {
case BITMAP:
scalarIndexType = ScalarIndexParams.ScalarIndexType.BITMAP;
break;
case BTREE:
scalarIndexType = ScalarIndexParams.ScalarIndexType.BTREE;
break;
case LABEL_LIST:
scalarIndexType = ScalarIndexParams.ScalarIndexType.LABEL_LIST;
break;
default:
return params;
}

return new IndexParams.Builder().setScalarIndexParams(
new ScalarIndexParams.Builder().setForceIndexType(scalarIndexType).build()).build();
}

private native void nativeCreateIndex(List<String> columns, int indexTypeCode,
Optional<String> name, IndexParams params, boolean replace);

Expand Down Expand Up @@ -346,8 +365,7 @@ public List<String> listIndexes() {
private native List<String> nativeListIndexes();

/**
* Closes this dataset and releases any system resources associated with it. If
* the dataset is
* Closes this dataset and releases any system resources associated with it. If the dataset is
* already closed, then invoking this method has no effect.
*/
@Override
Expand All @@ -364,8 +382,7 @@ public void close() {
}

/**
* Native method to release the Lance dataset resources associated with the
* given handle.
* Native method to release the Lance dataset resources associated with the given handle.
*
* @param handle The native handle to the dataset resource.
*/
Expand Down
Loading
Loading