Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/lancedb/lance into tokenizer
Browse files Browse the repository at this point in the history
  • Loading branch information
BubbleCal committed Oct 17, 2024
2 parents 0101fea + 707b78d commit 724fc04
Show file tree
Hide file tree
Showing 95 changed files with 6,238 additions and 617 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
"rust/lance-table",
"rust/lance-test-macros",
"rust/lance-testing",
"rust/lance-encoding/compression-algo/fsst",
"rust/lance-encoding/src/compression_algo/fsst",
]
exclude = ["python"]
# Python package needs to be built by maturin.
Expand Down Expand Up @@ -111,7 +111,7 @@ datafusion-physical-expr = { version = "41.0", features = [
] }
deepsize = "0.2.0"
either = "1.0"
fsst = { version = "=0.18.4", path = "./rust/lance-encoding/compression-algo/fsst" }
fsst = { version = "=0.18.4", path = "./rust/lance-encoding/src/compression_algo/fsst" }
futures = "0.3"
http = "0.2.9"
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
Expand Down
1 change: 1 addition & 0 deletions java/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.iml
spark/dependency-reduced-pom.xml
94 changes: 68 additions & 26 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,21 @@ impl BlockingDataset {
Ok(Self { inner })
}

pub fn commit(uri: &str, operation: Operation, read_version: Option<u64>) -> Result<Self> {
pub fn commit(
uri: &str,
operation: Operation,
read_version: Option<u64>,
storage_options: HashMap<String, String>,
) -> Result<Self> {
let object_store_registry = Arc::new(ObjectStoreRegistry::default());
let inner = RT.block_on(Dataset::commit(
uri,
operation,
read_version,
None,
Some(ObjectStoreParams {
storage_options: Some(storage_options),
..Default::default()
}),
None,
object_store_registry,
false, // TODO: support enable_v2_manifest_paths
Expand Down Expand Up @@ -142,10 +150,11 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local
_obj: JObject,
arrow_schema_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -156,19 +165,22 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
)
)
}

#[allow(clippy::too_many_arguments)]
fn inner_create_with_ffi_schema<'local>(
env: &mut JNIEnv<'local>,
arrow_schema_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
Expand All @@ -182,6 +194,7 @@ fn inner_create_with_ffi_schema<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}
Expand All @@ -192,10 +205,11 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local
_obj: JObject,
arrow_array_stream_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -206,19 +220,22 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
)
)
}

#[allow(clippy::too_many_arguments)]
fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
arrow_array_stream_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -229,17 +246,20 @@ fn inner_create_with_ffi_stream<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}

#[allow(clippy::too_many_arguments)]
fn create_dataset<'local>(
env: &mut JNIEnv<'local>,
path: JString,
max_rows_per_file: JObject,
max_rows_per_group: JObject,
max_bytes_per_file: JObject,
mode: JObject,
storage_options_obj: JObject,
reader: impl RecordBatchReader + Send + 'static,
) -> Result<JObject<'local>> {
let path_str = path.extract(env)?;
Expand All @@ -250,6 +270,7 @@ fn create_dataset<'local>(
&max_rows_per_group,
&max_bytes_per_file,
&mode,
&storage_options_obj,
)?;

let dataset = BlockingDataset::write(reader, &path_str, Some(write_params))?;
Expand Down Expand Up @@ -290,20 +311,28 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_commitAppend<'local>(
mut env: JNIEnv<'local>,
_obj: JObject,
path: JString,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
env,
inner_commit_append(&mut env, path, read_version_obj, fragments_obj)
inner_commit_append(
&mut env,
path,
read_version_obj,
fragments_obj,
storage_options_obj
)
)
}

pub fn inner_commit_append<'local>(
env: &mut JNIEnv<'local>,
path: JString,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment)
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment)
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let json_fragments = env.get_strings(&fragments_obj)?;
let mut fragments: Vec<Fragment> = Vec::new();
Expand All @@ -314,7 +343,20 @@ pub fn inner_commit_append<'local>(
let op = Operation::Append { fragments };
let path_str = path.extract(env)?;
let read_version = env.get_u64_opt(&read_version_obj)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version)?;
let jmap = JMap::from_env(env, &storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?;
dataset.into_java(env)
}

Expand Down
64 changes: 37 additions & 27 deletions java/core/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JString<'local> {
ok_or_throw_with_return!(
env,
Expand All @@ -93,7 +94,8 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
),
JString::default()
)
Expand All @@ -105,11 +107,12 @@ fn inner_create_with_ffi_array<'local>(
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JString<'local>> {
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
Expand All @@ -132,6 +135,7 @@ fn inner_create_with_ffi_array<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}
Expand All @@ -142,11 +146,12 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
_obj: JObject,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> JString<'a> {
ok_or_throw_with_return!(
env,
Expand All @@ -158,7 +163,8 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>(
max_rows_per_file,
max_rows_per_group,
max_bytes_per_file,
mode
mode,
storage_options_obj
),
JString::default()
)
Expand All @@ -169,11 +175,12 @@ fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JString<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -186,6 +193,7 @@ fn inner_create_with_ffi_stream<'local>(
max_rows_per_group,
max_bytes_per_file,
mode,
storage_options_obj,
reader,
)
}
Expand All @@ -194,11 +202,12 @@ fn inner_create_with_ffi_stream<'local>(
fn create_fragment<'a>(
env: &mut JNIEnv<'a>,
dataset_uri: JString,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
reader: impl RecordBatchReader + Send + 'static,
) -> Result<JString<'a>> {
let path_str = dataset_uri.extract(env)?;
Expand All @@ -211,6 +220,7 @@ fn create_fragment<'a>(
&max_rows_per_group,
&max_bytes_per_file,
&mode,
&storage_options_obj,
)?;
let fragment = RT.block_on(FileFragment::create(
&path_str,
Expand Down
Loading

0 comments on commit 724fc04

Please sign in to comment.