Skip to content

Commit

Permalink
[Bug] Calculating the nulls from position slices in chunks. (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored May 10, 2023
1 parent 335cbb7 commit 170ccc8
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions rust/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> BinaryEncoder<'a> {
let end = offsets[offsets.len() - 1].as_usize();
let b = unsafe {
std::slice::from_raw_parts(
arr.to_data().buffers()[1].as_ptr().offset(start as isize),
arr.to_data().buffers()[1].as_ptr().add(start),
end - start,
)
};
Expand All @@ -86,7 +86,7 @@ impl<'a> BinaryEncoder<'a> {
.skip(1)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i64));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
}

let positions_offset = self.writer.tell();
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
if self.nullable {
let mut null_count = 0;
let mut null_buf = MutableBuffer::new_null(self.length);
positions
position_slice
.values()
.windows(2)
.enumerate()
Expand Down Expand Up @@ -294,6 +294,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> {
let end = indices.value(indices.len() - 1);

// TODO: make min batch size configurable.
// TODO: make reading positions in chunks too.
const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB
let positions = self
.get_positions(start as usize..(end + 1) as usize)
Expand Down Expand Up @@ -393,8 +394,8 @@ mod tests {
use arrow_select::concat::concat;

use arrow_array::{
new_empty_array, types::GenericStringType, GenericStringArray, LargeStringArray,
OffsetSizeTrait, StringArray,
cast::AsArray, new_empty_array, types::GenericStringType, BinaryArray, GenericStringArray,
LargeStringArray, OffsetSizeTrait, StringArray,
};
use object_store::path::Path;

Expand Down Expand Up @@ -585,4 +586,33 @@ mod tests {
assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10)));
}
}

#[tokio::test]
async fn test_write_binary_with_nulls() {
let data = BinaryArray::from_iter((0..60000).map(|v| {
if v % 4 != 0 {
Some::<&[u8]>(b"abcdefgh")
} else {
None
}
}));
let store = ObjectStore::memory();
let path = Path::from("/slices");

let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap();
// Write some garbage to reset "tell()".
object_writer.write_all(b"1234").await.unwrap();
let mut encoder = BinaryEncoder::new(&mut object_writer);

// let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();

let reader = store.open(&path).await.unwrap();
let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
let actual = decoder.take(&idx).await.unwrap();
let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
}
}

0 comments on commit 170ccc8

Please sign in to comment.