Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support using general compression for numeric array #3020

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 120 additions & 37 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg;
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
use crate::encodings::physical::block_compress::CompressionScheme;
use crate::encodings::physical::block_compress::{CompressedBufferEncoder, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fsst::FsstArrayEncoder;
use crate::encodings::physical::packed_struct::PackedStructEncoder;
Expand Down Expand Up @@ -466,8 +466,14 @@ impl CoreArrayEncodingStrategy {
data_size: u64,
version: LanceFileVersion,
) -> Result<Box<dyn ArrayEncoder>> {
let bin_indices_encoder =
Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
let bin_indices_encoder = Self::choose_array_encoder(
arrays,
&DataType::UInt64,
data_size,
false,
version,
field_meta,
)?;

let compression = field_meta.and_then(Self::get_field_compression);

Expand All @@ -479,6 +485,28 @@ impl CoreArrayEncodingStrategy {
}
}

fn choose_encoder_for_numeric(
arrays: &[ArrayRef],
data_type: &DataType,
version: LanceFileVersion,
field_meta: Option<&HashMap<String, String>>,
) -> Box<dyn ArrayEncoder> {
let mut encoder: Option<Box<dyn ArrayEncoder>> = None;

if version >= LanceFileVersion::V2_1 {
if arrays[0].data_type() == data_type {
Copy link
Contributor Author

@niyue niyue Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace

I definitely think we should bitpack string offsets

If this is desirable, maybe the simplest change is to remove the condition if arrays[0].data_type() == data_type here so that bit packing would be applied. But this condition seems to be added prevously intentionally and I am not sure if this should be simply removed or not.

let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
encoder = Some(Box::new(BitpackedForNonNegArrayEncoder::new(
compressed_bit_width as usize,
data_type.clone(),
)));
} else if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When bitpacking is not applicable, check if field itself specifies some compression

encoder = Some(Box::new(CompressedBufferEncoder::new(compression)));
}
}
encoder.unwrap_or_else(|| Box::new(BasicEncoder::new(Box::new(ValueEncoder::default()))))
}

fn choose_array_encoder(
arrays: &[ArrayRef],
data_type: &DataType,
Expand All @@ -496,7 +524,7 @@ impl CoreArrayEncodingStrategy {
data_size,
use_dict_encoding,
version,
None,
field_meta,
)?,
*dimension as u32,
)))))
Expand Down Expand Up @@ -552,7 +580,7 @@ impl CoreArrayEncodingStrategy {
data_size,
false,
version,
None,
field_meta,
)?;

Ok(Box::new(BasicEncoder::new(Box::new(
Expand All @@ -579,43 +607,23 @@ impl CoreArrayEncodingStrategy {
data_size,
use_dict_encoding,
version,
None,
field_meta,
)?;
inner_encoders.push(inner_encoder);
}

Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
}
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
compressed_bit_width as usize,
data_type.clone(),
)))
} else {
Ok(Box::new(BasicEncoder::new(Box::new(
ValueEncoder::default(),
))))
}
}
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => Ok(
Self::choose_encoder_for_numeric(arrays, data_type, version, field_meta),
),

// TODO: for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values,
// then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first, I am
// thinking about putting this sparse array in the metadata so bitpacking remain using one page buffer only.
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
compressed_bit_width as usize,
data_type.clone(),
)))
} else {
Ok(Box::new(BasicEncoder::new(Box::new(
ValueEncoder::default(),
))))
}
}
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Ok(
Self::choose_encoder_for_numeric(arrays, data_type, version, field_meta),
),
_ => Ok(Box::new(BasicEncoder::new(Box::new(
ValueEncoder::default(),
)))),
Expand Down Expand Up @@ -1327,13 +1335,16 @@ pub async fn encode_batch(

#[cfg(test)]
pub mod tests {
use arrow_array::{ArrayRef, StringArray};
use std::sync::Arc;

use crate::version::LanceFileVersion;
use arrow_array::{Array, ArrayRef, StringArray, UInt8Array};
use arrow_schema::Field;
use std::collections::HashMap;
use std::sync::Arc;

use super::check_dict_encoding;
use super::check_fixed_size_encoding;
use super::{
check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy, COMPRESSION_META_KEY,
};
use super::{check_fixed_size_encoding, get_dict_encoding_threshold};

fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
let arr = StringArray::from(arr);
Expand Down Expand Up @@ -1454,4 +1465,76 @@ pub mod tests {
LanceFileVersion::V2_1
));
}

fn verify_array_encoder(
array: ArrayRef,
field_meta: Option<HashMap<String, String>>,
version: LanceFileVersion,
expected_encoder: &str,
) {
let encoding_strategy = CoreArrayEncodingStrategy { version };
let mut field = Field::new("test_field", array.data_type().clone(), true);
if let Some(field_meta) = field_meta {
field.set_metadata(field_meta.clone());
}
let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
assert!(encoder_result.is_ok());
let encoder = encoder_result.unwrap();
assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
}

#[test]
fn test_choose_encoder_for_numeric_field() {
verify_array_encoder(
Arc::new(UInt8Array::from(vec![1, 2, 3])),
None,
LanceFileVersion::V2_0,
"BasicEncoder { values_encoder: ValueEncoder }",
);

verify_array_encoder(
Arc::new(UInt8Array::from(vec![1, 2, 3])),
None,
LanceFileVersion::V2_1,
"BitpackedForNonNegArrayEncoder { compressed_bit_width: 2, original_data_type: UInt8 }",
);
}

#[test]
fn test_choose_encoder_for_fixed_width_string() {
verify_array_encoder(Arc::new(StringArray::from(vec!["a", "b", "c"])),
None,
LanceFileVersion::V2_1,
"BasicEncoder { values_encoder: FixedSizeBinaryEncoder { bytes_encoder: BasicEncoder { values_encoder: ValueEncoder }, byte_width: 1 } }");
}

#[test]
fn test_choose_encoder_for_low_cardinality_string() {
// create a low cardinality string array
let mut values = Vec::new();
for i in 0..get_dict_encoding_threshold() + 1 {
values.push(format!("value{}", i % 20));
}
verify_array_encoder(Arc::new(StringArray::from(values)),
None,
LanceFileVersion::V2_1,
"DictionaryEncoder { indices_encoder: BitpackedForNonNegArrayEncoder { compressed_bit_width: 8, original_data_type: UInt8 }, items_encoder: BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_scheme: None, buffer_compressor: None } }");
}

#[test]
fn test_choose_encoder_for_string() {
verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
None,
LanceFileVersion::V2_1,
"BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_scheme: None, buffer_compressor: None }");
}

#[test]
fn test_choose_encoder_for_zstd_compressed_string_field() {
verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
LanceFileVersion::V2_1,
"BinaryEncoder { indices_encoder: CompressedBufferEncoder { compression_scheme: Zstd, compressor: ZstdBufferCompressor }, compression_scheme: Some(Zstd), buffer_compressor: Some(ZstdBufferCompressor) }");
}
}
24 changes: 17 additions & 7 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,30 @@ impl ArrayEncoder for BinaryEncoder {
self.indices_encoder
.encode(indices, &DataType::UInt64, buffer_index)?;

let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();

assert!(encoded_indices_data.bits_per_value <= 64);

if let Some(buffer_compressor) = &self.buffer_compressor {
let mut compressed_data = Vec::with_capacity(data.data.len());
buffer_compressor.compress(&data.data, &mut compressed_data)?;
data.data = LanceBuffer::Owned(compressed_data);
}

let (bits_per_offset, offsets) = match encoded_indices.data {
DataBlock::FixedWidth(fixed_width) => {
let bits_per_value = fixed_width.bits_per_value as u8;
let offsets = fixed_width.data;
(bits_per_value, offsets)
}
DataBlock::Opaque(mut opaque) => {
let bits_per_offset = (opaque.buffers[0].len() as u64 / opaque.num_values) as u8;
let offsets = opaque.buffers[0].borrow_and_clone();
(bits_per_offset, offsets)
}
_ => panic!("Expected fixed width or opaque data block for indices"),
};
assert!(bits_per_offset <= 64);

let data = DataBlock::VariableWidth(VariableWidthBlock {
bits_per_offset: encoded_indices_data.bits_per_value as u8,
offsets: encoded_indices_data.data,
bits_per_offset,
offsets,
data: data.data,
num_values: data.num_values,
block_info: BlockInfo::new(),
Expand All @@ -527,7 +538,6 @@ impl ArrayEncoder for BinaryEncoder {

#[cfg(test)]
pub mod tests {

use arrow_array::{
builder::{LargeStringBuilder, StringBuilder},
ArrayRef, StringArray,
Expand Down
13 changes: 9 additions & 4 deletions rust/lance-encoding/src/encodings/physical/block_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,26 @@ impl GeneralBufferCompressor {
// An encoder which uses generic compression, such as zstd/lz4 to encode buffers
#[derive(Debug)]
pub struct CompressedBufferEncoder {
compression_scheme: CompressionScheme,
compressor: Box<dyn BufferCompressor>,
}

impl Default for CompressedBufferEncoder {
fn default() -> Self {
Self {
compression_scheme: CompressionScheme::Zstd,
compressor: GeneralBufferCompressor::get_compressor("zstd"),
}
}
}

impl CompressedBufferEncoder {
pub fn new(compression_type: &str) -> Self {
let compressor = GeneralBufferCompressor::get_compressor(compression_type);
Self { compressor }
pub fn new(compression_scheme: CompressionScheme) -> Self {
let compressor = GeneralBufferCompressor::get_compressor(&compression_scheme.to_string());
Self {
compression_scheme,
compressor,
}
}
}

Expand Down Expand Up @@ -133,7 +138,7 @@ impl ArrayEncoder for CompressedBufferEncoder {
let encoding = ProtobufUtils::flat_encoding(
uncompressed_data.bits_per_value,
comp_buf_index,
Some(CompressionScheme::Zstd),
Some(self.compression_scheme),
);

Ok(EncodedArray {
Expand Down
Loading