Skip to content

Commit

Permalink
Merge branch 'main' into FilterVecIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintBacchus authored Oct 12, 2024
2 parents b5091fe + cdac5de commit c67edfd
Show file tree
Hide file tree
Showing 22 changed files with 1,163 additions and 111 deletions.
1 change: 1 addition & 0 deletions java/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.iml
spark/dependency-reduced-pom.xml
94 changes: 68 additions & 26 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,21 @@ impl BlockingDataset {
Ok(Self { inner })
}

pub fn commit(uri: &str, operation: Operation, read_version: Option<u64>) -> Result<Self> {
pub fn commit(
uri: &str,
operation: Operation,
read_version: Option<u64>,
storage_options: HashMap<String, String>,
) -> Result<Self> {
let object_store_registry = Arc::new(ObjectStoreRegistry::default());
let inner = RT.block_on(Dataset::commit(
uri,
operation,
read_version,
None,
Some(ObjectStoreParams {
storage_options: Some(storage_options),
..Default::default()
}),
None,
object_store_registry,
false, // TODO: support enable_v2_manifest_paths
Expand Down Expand Up @@ -142,10 +150,11 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local
_obj: JObject,
arrow_schema_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -156,19 +165,22 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
)
)
}

#[allow(clippy::too_many_arguments)]
fn inner_create_with_ffi_schema<'local>(
env: &mut JNIEnv<'local>,
arrow_schema_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
Expand All @@ -182,6 +194,7 @@ fn inner_create_with_ffi_schema<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}
Expand All @@ -192,10 +205,11 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local
_obj: JObject,
arrow_array_stream_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -206,19 +220,22 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
)
)
}

#[allow(clippy::too_many_arguments)]
fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
arrow_array_stream_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -229,17 +246,20 @@ fn inner_create_with_ffi_stream<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}

#[allow(clippy::too_many_arguments)]
fn create_dataset<'local>(
env: &mut JNIEnv<'local>,
path: JString,
max_rows_per_file: JObject,
max_rows_per_group: JObject,
max_bytes_per_file: JObject,
mode: JObject,
storage_options_obj: JObject,
reader: impl RecordBatchReader + Send + 'static,
) -> Result<JObject<'local>> {
let path_str = path.extract(env)?;
Expand All @@ -250,6 +270,7 @@ fn create_dataset<'local>(
&max_rows_per_group,
&max_bytes_per_file,
&mode,
&storage_options_obj,
)?;

let dataset = BlockingDataset::write(reader, &path_str, Some(write_params))?;
Expand Down Expand Up @@ -290,20 +311,28 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_commitAppend<'local>(
mut env: JNIEnv<'local>,
_obj: JObject,
path: JString,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
env,
inner_commit_append(&mut env, path, read_version_obj, fragments_obj)
inner_commit_append(
&mut env,
path,
read_version_obj,
fragments_obj,
storage_options_obj
)
)
}

pub fn inner_commit_append<'local>(
env: &mut JNIEnv<'local>,
path: JString,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment)
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment)
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let json_fragments = env.get_strings(&fragments_obj)?;
let mut fragments: Vec<Fragment> = Vec::new();
Expand All @@ -314,7 +343,20 @@ pub fn inner_commit_append<'local>(
let op = Operation::Append { fragments };
let path_str = path.extract(env)?;
let read_version = env.get_u64_opt(&read_version_obj)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version)?;
let jmap = JMap::from_env(env, &storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?;
dataset.into_java(env)
}

Expand Down
64 changes: 37 additions & 27 deletions java/core/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JString<'local> {
ok_or_throw_with_return!(
env,
Expand All @@ -93,7 +94,8 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
),
JString::default()
)
Expand All @@ -105,11 +107,12 @@ fn inner_create_with_ffi_array<'local>(
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JString<'local>> {
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
Expand All @@ -132,6 +135,7 @@ fn inner_create_with_ffi_array<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}
Expand All @@ -142,11 +146,12 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
_obj: JObject,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JString<'a> {
ok_or_throw_with_return!(
env,
Expand All @@ -158,7 +163,8 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
),
JString::default()
)
Expand All @@ -169,11 +175,12 @@ fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JString<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -186,6 +193,7 @@ fn inner_create_with_ffi_stream<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}
Expand All @@ -194,11 +202,12 @@ fn inner_create_with_ffi_stream<'local>(
fn create_fragment<'a>(
env: &mut JNIEnv<'a>,
dataset_uri: JString,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
reader: impl RecordBatchReader + Send + 'static,
) -> Result<JString<'a>> {
let path_str = dataset_uri.extract(env)?;
Expand All @@ -211,6 +220,7 @@ fn create_fragment<'a>(
&max_rows_per_group,
&max_bytes_per_file,
&mode,
&storage_options_obj,
)?;
let fragment = RT.block_on(FileFragment::create(
&path_str,
Expand Down
23 changes: 22 additions & 1 deletion java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
use std::sync::Arc;

use arrow::array::Float32Array;
use jni::objects::{JObject, JString};
use jni::objects::{JMap, JObject, JString};
use jni::JNIEnv;
use lance::dataset::{WriteMode, WriteParams};
use lance::index::vector::{StageParams, VectorIndexParams};
use lance::io::ObjectStoreParams;
use lance_encoding::version::LanceFileVersion;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
Expand All @@ -31,13 +32,15 @@ use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;

use lance_index::vector::Query;
use std::collections::HashMap;

pub fn extract_write_params(
env: &mut JNIEnv,
max_rows_per_file: &JObject,
max_rows_per_group: &JObject,
max_bytes_per_file: &JObject,
mode: &JObject,
storage_options_obj: &JObject,
) -> Result<WriteParams> {
let mut write_params = WriteParams::default();

Expand All @@ -55,6 +58,24 @@ pub fn extract_write_params(
}
// Java code always sets the data storage version to Legacy for now
write_params.data_storage_version = Some(LanceFileVersion::Legacy);
let jmap = JMap::from_env(env, storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;

write_params.store_params = Some(ObjectStoreParams {
storage_options: Some(storage_options),
..Default::default()
});
Ok(write_params)
}

Expand Down
Loading

0 comments on commit c67edfd

Please sign in to comment.