From bb88df3f4d1d1133b405831f5ae1b57ef32cd2b3 Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Fri, 18 Oct 2024 11:38:19 +0800 Subject: [PATCH] Support using general compression for numeric array. --- rust/lance-encoding/src/encoder.rs | 157 +++++++++++++----- .../src/encodings/physical/binary.rs | 24 ++- .../src/encodings/physical/block_compress.rs | 13 +- 3 files changed, 146 insertions(+), 48 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 4351c47a98..265753aebc 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -24,7 +24,7 @@ use crate::encodings::logical::r#struct::StructFieldEncoder; use crate::encodings::logical::r#struct::StructStructuralEncoder; use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg; use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder; -use crate::encodings::physical::block_compress::CompressionScheme; +use crate::encodings::physical::block_compress::{CompressedBufferEncoder, CompressionScheme}; use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; use crate::encodings::physical::fsst::FsstArrayEncoder; use crate::encodings::physical::packed_struct::PackedStructEncoder; @@ -466,8 +466,14 @@ impl CoreArrayEncodingStrategy { data_size: u64, version: LanceFileVersion, ) -> Result> { - let bin_indices_encoder = - Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?; + let bin_indices_encoder = Self::choose_array_encoder( + arrays, + &DataType::UInt64, + data_size, + false, + version, + field_meta, + )?; let compression = field_meta.and_then(Self::get_field_compression); @@ -479,6 +485,28 @@ impl CoreArrayEncodingStrategy { } } + fn choose_encoder_for_numeric( + arrays: &[ArrayRef], + data_type: &DataType, + version: LanceFileVersion, + field_meta: Option<&HashMap>, + ) -> Box { + let mut encoder: Option> = None; + + if version >= LanceFileVersion::V2_1 { + if arrays[0].data_type() == data_type { + let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays); + encoder = Some(Box::new(BitpackedForNonNegArrayEncoder::new( + compressed_bit_width as usize, + data_type.clone(), + ))); + } else if let Some(compression) = field_meta.and_then(Self::get_field_compression) { + encoder = Some(Box::new(CompressedBufferEncoder::new(compression))); + } + } + encoder.unwrap_or_else(|| Box::new(BasicEncoder::new(Box::new(ValueEncoder::default())))) + } + fn choose_array_encoder( arrays: &[ArrayRef], data_type: &DataType, @@ -496,7 +524,7 @@ impl CoreArrayEncodingStrategy { data_size, use_dict_encoding, version, - None, + field_meta, )?, *dimension as u32, ))))) @@ -552,7 +580,7 @@ impl CoreArrayEncodingStrategy { data_size, false, version, - None, + field_meta, )?; Ok(Box::new(BasicEncoder::new(Box::new( @@ -579,43 +607,23 @@ impl CoreArrayEncodingStrategy { data_size, use_dict_encoding, version, - None, + field_meta, )?; inner_encoders.push(inner_encoder); } Ok(Box::new(PackedStructEncoder::new(inner_encoders))) } - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { - if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type { - let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays); - Ok(Box::new(BitpackedForNonNegArrayEncoder::new( - compressed_bit_width as usize, - data_type.clone(), - ))) - } else { - Ok(Box::new(BasicEncoder::new(Box::new( - ValueEncoder::default(), - )))) - } - } + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => Ok( + Self::choose_encoder_for_numeric(arrays, data_type, version, field_meta), + ), // TODO: for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values, // then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first, I am // thinking about putting this sparse array in the metadata so bitpacking remain using one page buffer only. - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { - if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type { - let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays); - Ok(Box::new(BitpackedForNonNegArrayEncoder::new( - compressed_bit_width as usize, - data_type.clone(), - ))) - } else { - Ok(Box::new(BasicEncoder::new(Box::new( - ValueEncoder::default(), - )))) - } - } + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Ok( + Self::choose_encoder_for_numeric(arrays, data_type, version, field_meta), + ), _ => Ok(Box::new(BasicEncoder::new(Box::new( ValueEncoder::default(), )))), @@ -1327,13 +1335,16 @@ pub async fn encode_batch( #[cfg(test)] pub mod tests { - use arrow_array::{ArrayRef, StringArray}; - use std::sync::Arc; - use crate::version::LanceFileVersion; + use arrow_array::{Array, ArrayRef, StringArray, UInt8Array}; + use arrow_schema::Field; + use std::collections::HashMap; + use std::sync::Arc; - use super::check_dict_encoding; - use super::check_fixed_size_encoding; + use super::{ + check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy, COMPRESSION_META_KEY, + }; + use super::{check_fixed_size_encoding, get_dict_encoding_threshold}; fn is_dict_encoding_applicable(arr: Vec>, threshold: u64) -> bool { let arr = StringArray::from(arr); @@ -1454,4 +1465,76 @@ pub mod tests { LanceFileVersion::V2_1 )); } + + fn verify_array_encoder( + array: ArrayRef, + field_meta: Option>, + version: LanceFileVersion, + expected_encoder: &str, + ) { + let encoding_strategy = CoreArrayEncodingStrategy { version }; + let mut field = Field::new("test_field", array.data_type().clone(), true); + if let Some(field_meta) = field_meta { + field.set_metadata(field_meta.clone()); + } + let lance_field = lance_core::datatypes::Field::try_from(field).unwrap(); + let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field); + assert!(encoder_result.is_ok()); + let encoder = encoder_result.unwrap(); + assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder); + } + + #[test] + fn test_choose_encoder_for_numeric_field() { + verify_array_encoder( + Arc::new(UInt8Array::from(vec![1, 2, 3])), + None, + LanceFileVersion::V2_0, + "BasicEncoder { values_encoder: ValueEncoder }", + ); + + verify_array_encoder( + Arc::new(UInt8Array::from(vec![1, 2, 3])), + None, + LanceFileVersion::V2_1, + "BitpackedForNonNegArrayEncoder { compressed_bit_width: 2, original_data_type: UInt8 }", + ); + } + + #[test] + fn test_choose_encoder_for_fixed_width_string() { + verify_array_encoder(Arc::new(StringArray::from(vec!["a", "b", "c"])), + None, + LanceFileVersion::V2_1, + "BasicEncoder { values_encoder: FixedSizeBinaryEncoder { bytes_encoder: BasicEncoder { values_encoder: ValueEncoder }, byte_width: 1 } }"); + } + + #[test] + fn test_choose_encoder_for_low_cardinality_string() { + // create a low cardinality string array + let mut values = Vec::new(); + for i in 0..get_dict_encoding_threshold() + 1 { + values.push(format!("value{}", i % 20)); + } + verify_array_encoder(Arc::new(StringArray::from(values)), + None, + LanceFileVersion::V2_1, + "DictionaryEncoder { indices_encoder: BitpackedForNonNegArrayEncoder { compressed_bit_width: 8, original_data_type: UInt8 }, items_encoder: BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_scheme: None, buffer_compressor: None } }"); + } + + #[test] + fn test_choose_encoder_for_string() { + verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])), + None, + LanceFileVersion::V2_1, + "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_scheme: None, buffer_compressor: None }"); + } + + #[test] + fn test_choose_encoder_for_zstd_compressed_string_field() { + verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])), + Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])), + LanceFileVersion::V2_1, + "BinaryEncoder { indices_encoder: CompressedBufferEncoder { compression_scheme: Zstd, compressor: ZstdBufferCompressor }, compression_scheme: Some(Zstd), buffer_compressor: Some(ZstdBufferCompressor) }"); + } } diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 91dc5f133d..dd8be88a40 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -490,19 +490,30 @@ impl ArrayEncoder for BinaryEncoder { self.indices_encoder .encode(indices, &DataType::UInt64, buffer_index)?; - let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap(); - - assert!(encoded_indices_data.bits_per_value <= 64); - if let Some(buffer_compressor) = &self.buffer_compressor { let mut compressed_data = Vec::with_capacity(data.data.len()); buffer_compressor.compress(&data.data, &mut compressed_data)?; data.data = LanceBuffer::Owned(compressed_data); } + let (bits_per_offset, offsets) = match encoded_indices.data { + DataBlock::FixedWidth(fixed_width) => { + let bits_per_value = fixed_width.bits_per_value as u8; + let offsets = fixed_width.data; + (bits_per_value, offsets) + } + DataBlock::Opaque(mut opaque) => { + let bits_per_offset = (opaque.buffers[0].len() as u64 / opaque.num_values) as u8; + let offsets = opaque.buffers[0].borrow_and_clone(); + (bits_per_offset, offsets) + } + _ => panic!("Expected fixed width or opaque data block for indices"), + }; + assert!(bits_per_offset <= 64); + let data = DataBlock::VariableWidth(VariableWidthBlock { - bits_per_offset: encoded_indices_data.bits_per_value as u8, - offsets: encoded_indices_data.data, + bits_per_offset, + offsets, data: data.data, num_values: data.num_values, block_info: BlockInfo::new(), @@ -527,7 +538,6 @@ impl ArrayEncoder for BinaryEncoder { #[cfg(test)] pub mod tests { - use arrow_array::{ builder::{LargeStringBuilder, StringBuilder}, ArrayRef, StringArray, diff --git a/rust/lance-encoding/src/encodings/physical/block_compress.rs b/rust/lance-encoding/src/encodings/physical/block_compress.rs index b1a00825a5..95db7af684 100644 --- a/rust/lance-encoding/src/encodings/physical/block_compress.rs +++ b/rust/lance-encoding/src/encodings/physical/block_compress.rs @@ -89,21 +89,26 @@ impl GeneralBufferCompressor { // An encoder which uses generic compression, such as zstd/lz4 to encode buffers #[derive(Debug)] pub struct CompressedBufferEncoder { + compression_scheme: CompressionScheme, compressor: Box, } impl Default for CompressedBufferEncoder { fn default() -> Self { Self { + compression_scheme: CompressionScheme::Zstd, compressor: GeneralBufferCompressor::get_compressor("zstd"), } } } impl CompressedBufferEncoder { - pub fn new(compression_type: &str) -> Self { - let compressor = GeneralBufferCompressor::get_compressor(compression_type); - Self { compressor } + pub fn new(compression_scheme: CompressionScheme) -> Self { + let compressor = GeneralBufferCompressor::get_compressor(&compression_scheme.to_string()); + Self { + compression_scheme, + compressor, + } } } @@ -133,7 +138,7 @@ impl ArrayEncoder for CompressedBufferEncoder { let encoding = ProtobufUtils::flat_encoding( uncompressed_data.bits_per_value, comp_buf_index, - Some(CompressionScheme::Zstd), + Some(self.compression_scheme), ); Ok(EncodedArray {