diff --git a/rust/src/encodings/binary.rs b/rust/src/encodings/binary.rs index fe831e37cd..caf49891f4 100644 --- a/rust/src/encodings/binary.rs +++ b/rust/src/encodings/binary.rs @@ -74,7 +74,7 @@ impl<'a> BinaryEncoder<'a> { let end = offsets[offsets.len() - 1].as_usize(); let b = unsafe { std::slice::from_raw_parts( - arr.to_data().buffers()[1].as_ptr().offset(start as isize), + arr.to_data().buffers()[1].as_ptr().add(start), end - start, ) }; @@ -86,7 +86,7 @@ impl<'a> BinaryEncoder<'a> { .skip(1) .map(|b| b.as_usize() - start_offset + last_offset) .for_each(|o| pos_builder.append_value(o as i64)); - last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize; + last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize; } let positions_offset = self.writer.tell(); @@ -215,7 +215,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> { if self.nullable { let mut null_count = 0; let mut null_buf = MutableBuffer::new_null(self.length); - positions + position_slice .values() .windows(2) .enumerate() @@ -294,6 +294,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> { let end = indices.value(indices.len() - 1); // TODO: make min batch size configurable. + // TODO: make reading positions in chunks too. const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB let positions = self .get_positions(start as usize..(end + 1) as usize) @@ -393,8 +394,8 @@ mod tests { use arrow_select::concat::concat; use arrow_array::{ - new_empty_array, types::GenericStringType, GenericStringArray, LargeStringArray, - OffsetSizeTrait, StringArray, + cast::AsArray, new_empty_array, types::GenericStringType, BinaryArray, GenericStringArray, + LargeStringArray, OffsetSizeTrait, StringArray, }; use object_store::path::Path; @@ -585,4 +586,33 @@ mod tests { assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10))); } } + + #[tokio::test] + async fn test_write_binary_with_nulls() { + let data = BinaryArray::from_iter((0..60000).map(|v| { + if v % 4 != 0 { + Some::<&[u8]>(b"abcdefgh") + } else { + None + } + })); + let store = ObjectStore::memory(); + let path = Path::from("/slices"); + + let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap(); + // Write some garbage to reset "tell()". + object_writer.write_all(b"1234").await.unwrap(); + let mut encoder = BinaryEncoder::new(&mut object_writer); + + // let arrs = arr.iter().map(|a| a as &dyn Array).collect::>(); + let pos = encoder.encode(&[&data]).await.unwrap(); + object_writer.shutdown().await.unwrap(); + + let reader = store.open(&path).await.unwrap(); + let decoder = BinaryDecoder::::new(reader.as_ref(), pos, data.len(), true); + let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]); + let actual = decoder.take(&idx).await.unwrap(); + let values: Vec> = vec![None, Some(b"abcdefgh"), None]; + assert_eq!(actual.as_binary::(), &BinaryArray::from(values)); + } }