diff --git a/protos/encodings.proto b/protos/encodings.proto index a91efe10e1..c14c5d2926 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -158,6 +158,7 @@ message FixedSizeList { message Compression { string scheme = 1; + optional int32 level = 2; } // Fixed width items placed contiguously in a buffer diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 219f263f22..b32d4b9084 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -24,6 +24,7 @@ pub use field::SchemaCompareOptions; pub use schema::Schema; pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; +pub const COMPRESSION_LEVEL_META_KEY: &str = "lance-encoding:compression-level"; pub const BLOB_META_KEY: &str = "lance-encoding:blob"; pub const PACKED_STRUCT_LEGACY_META_KEY: &str = "packed"; pub const PACKED_STRUCT_META_KEY: &str = "lance-encoding:packed"; diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 4d72407d02..9bf1a229cd 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -9,8 +9,8 @@ use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; use lance_arrow::DataTypeExt; use lance_core::datatypes::{ - Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_META_KEY, - PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY, + Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY, + COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY, }; use lance_core::{Error, Result}; use snafu::{location, Location}; @@ -24,7 +24,7 @@ use crate::encodings::logical::r#struct::StructFieldEncoder; use crate::encodings::logical::r#struct::StructStructuralEncoder; use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg; use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder; -use crate::encodings::physical::block_compress::CompressionScheme; +use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme}; use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; use crate::encodings::physical::fsst::FsstArrayEncoder; use crate::encodings::physical::packed_struct::PackedStructEncoder; @@ -454,9 +454,18 @@ impl CoreArrayEncodingStrategy { && data_size > 4 * 1024 * 1024 } - fn get_field_compression(field_meta: &HashMap) -> Option { + fn get_field_compression(field_meta: &HashMap) -> Option { let compression = field_meta.get(COMPRESSION_META_KEY)?; - Some(compression.parse::().unwrap()) + let compression_scheme = compression.parse::(); + match compression_scheme { + Ok(compression_scheme) => Some(CompressionConfig::new( + compression_scheme, + field_meta + .get(COMPRESSION_LEVEL_META_KEY) + .and_then(|level| level.parse().ok()), + )), + Err(_) => None, + } } fn default_binary_encoder( @@ -1327,13 +1336,15 @@ pub async fn encode_batch( #[cfg(test)] pub mod tests { + use crate::version::LanceFileVersion; use arrow_array::{ArrayRef, StringArray}; + use arrow_schema::Field; + use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY}; + use std::collections::HashMap; use std::sync::Arc; - use crate::version::LanceFileVersion; - - use super::check_dict_encoding; use super::check_fixed_size_encoding; + use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy}; fn is_dict_encoding_applicable(arr: Vec>, threshold: u64) -> bool { let arr = StringArray::from(arr); @@ -1454,4 +1465,41 @@ pub mod tests { LanceFileVersion::V2_1 )); } + + fn verify_array_encoder( + array: ArrayRef, + field_meta: Option>, + version: LanceFileVersion, + expected_encoder: &str, + ) { + let encoding_strategy = CoreArrayEncodingStrategy { version }; + let mut field = Field::new("test_field", array.data_type().clone(), true); + if let Some(field_meta) = field_meta { + field.set_metadata(field_meta.clone()); + } + let lance_field = lance_core::datatypes::Field::try_from(field).unwrap(); + let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field); + assert!(encoder_result.is_ok()); + let encoder = encoder_result.unwrap(); + assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder); + } + + #[test] + fn test_choose_encoder_for_zstd_compressed_string_field() { + verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])), + Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])), + LanceFileVersion::V2_1, + "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }"); + } + + #[test] + fn test_choose_encoder_for_zstd_compression_level() { + verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])), + Some(HashMap::from([ + (COMPRESSION_META_KEY.to_string(), "zstd".to_string()), + (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string()) + ])), + LanceFileVersion::V2_1, + "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }"); + } } diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index 4dddae15f8..cbedb5dd17 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -2,21 +2,21 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use arrow_schema::DataType; -use block_compress::CompressionScheme; +use block_compress::CompressionConfig; use fsst::FsstPageScheduler; use lance_arrow::DataTypeExt; use packed_struct::PackedStructPageScheduler; -use crate::{ - decoder::PageScheduler, - format::pb::{self, PackedStruct}, -}; - use self::{ basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler, dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler, value::ValuePageScheduler, }; +use crate::encodings::physical::block_compress::CompressionScheme; +use crate::{ + decoder::PageScheduler, + format::pb::{self, PackedStruct}, +}; pub mod basic; pub mod binary; @@ -68,17 +68,14 @@ fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) { /// Convert a protobuf buffer encoding into a physical page scheduler fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box { let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers); - let compression_scheme: CompressionScheme = if encoding.compression.is_none() { - CompressionScheme::None + let compression_config: CompressionConfig = if encoding.compression.is_none() { + CompressionConfig::new(CompressionScheme::None, None) } else { - encoding - .compression - .as_ref() - .unwrap() - .scheme - .as_str() - .parse() - .unwrap() + let compression = encoding.compression.as_ref().unwrap(); + CompressionConfig::new( + compression.scheme.as_str().parse().unwrap(), + compression.level, + ) }; match encoding.bits_per_value { 1 => Box::new(DenseBitmapScheduler::new(buffer_offset)), @@ -93,7 +90,7 @@ fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box unreachable!(), } } + +#[cfg(test)] +mod tests { + use crate::encodings::physical::{get_buffer_decoder, ColumnBuffers, FileBuffers, PageBuffers}; + use crate::format::pb; + + #[test] + fn test_get_buffer_decoder_for_compressed_buffer() { + let page_scheduler = get_buffer_decoder( + &pb::Flat { + buffer: Some(pb::Buffer { + buffer_index: 0, + buffer_type: pb::buffer::BufferType::File as i32, + }), + bits_per_value: 8, + compression: Some(pb::Compression { + scheme: "zstd".to_string(), + level: Some(0), + }), + }, + &PageBuffers { + column_buffers: ColumnBuffers { + file_buffers: FileBuffers { + positions_and_sizes: &[(0, 100)], + }, + positions_and_sizes: &[], + }, + positions_and_sizes: &[], + }, + ); + assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }"); + } +} diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 91dc5f133d..de8c50cb49 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -29,7 +29,7 @@ use arrow_array::{PrimitiveArray, UInt64Array}; use arrow_schema::DataType; use lance_core::Result; -use super::block_compress::{BufferCompressor, CompressionScheme, GeneralBufferCompressor}; +use super::block_compress::{BufferCompressor, CompressionConfig, GeneralBufferCompressor}; struct IndicesNormalizer { indices: Vec, @@ -345,20 +345,19 @@ impl PrimitivePageDecoder for BinaryPageDecoder { #[derive(Debug)] pub struct BinaryEncoder { indices_encoder: Box, - compression_scheme: Option, + compression_config: Option, buffer_compressor: Option>, } impl BinaryEncoder { pub fn new( indices_encoder: Box, - compression_scheme: Option, + compression_config: Option, ) -> Self { - let buffer_compressor = compression_scheme - .map(|scheme| GeneralBufferCompressor::get_compressor(&scheme.to_string())); + let buffer_compressor = compression_config.map(GeneralBufferCompressor::get_compressor); Self { indices_encoder, - compression_scheme, + compression_config, buffer_compressor, } } @@ -515,7 +514,7 @@ impl ArrayEncoder for BinaryEncoder { let bytes_encoding = ProtobufUtils::flat_encoding( /*bits_per_value=*/ 8, bytes_buffer_index, - self.compression_scheme, + self.compression_config, ); let encoding = @@ -527,7 +526,6 @@ impl ArrayEncoder for BinaryEncoder { #[cfg(test)] pub mod tests { - use arrow_array::{ builder::{LargeStringBuilder, StringBuilder}, ArrayRef, StringArray, diff --git a/rust/lance-encoding/src/encodings/physical/block_compress.rs b/rust/lance-encoding/src/encodings/physical/block_compress.rs index b1a00825a5..fce9988c6f 100644 --- a/rust/lance-encoding/src/encodings/physical/block_compress.rs +++ b/rust/lance-encoding/src/encodings/physical/block_compress.rs @@ -16,7 +16,26 @@ use crate::{ format::ProtobufUtils, }; -pub const COMPRESSION_META_KEY: &str = "lance:compression"; +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct CompressionConfig { + pub(crate) scheme: CompressionScheme, + pub(crate) level: Option, +} + +impl CompressionConfig { + pub(crate) fn new(scheme: CompressionScheme, level: Option) -> CompressionConfig { + CompressionConfig { scheme, level } + } +} + +impl Default for CompressionConfig { + fn default() -> Self { + Self { + scheme: CompressionScheme::Zstd, + level: Some(0), + } + } +} #[derive(Debug, Clone, Copy, PartialEq)] pub enum CompressionScheme { @@ -55,11 +74,19 @@ pub trait BufferCompressor: std::fmt::Debug + Send + Sync { } #[derive(Debug, Default)] -pub struct ZstdBufferCompressor {} +pub struct ZstdBufferCompressor { + compression_level: i32, +} + +impl ZstdBufferCompressor { + pub fn new(compression_level: i32) -> Self { + Self { compression_level } + } +} impl BufferCompressor for ZstdBufferCompressor { fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = zstd::Encoder::new(output_buf, 0)?; + let mut encoder = zstd::Encoder::new(output_buf, self.compression_level)?; encoder.write_all(input_buf)?; match encoder.finish() { Ok(_) => Ok(()), @@ -74,14 +101,30 @@ impl BufferCompressor for ZstdBufferCompressor { } } +#[derive(Debug, Default)] +pub struct NoopBufferCompressor {} + +impl BufferCompressor for NoopBufferCompressor { + fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + output_buf.extend_from_slice(input_buf); + Ok(()) + } + + fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + output_buf.extend_from_slice(input_buf); + Ok(()) + } +} + pub struct GeneralBufferCompressor {} impl GeneralBufferCompressor { - pub fn get_compressor(compression_type: &str) -> Box { - match compression_type { - "" => Box::::default(), - "zstd" => Box::::default(), - _ => panic!("Unsupported compression type: {}", compression_type), + pub fn get_compressor(compression_config: CompressionConfig) -> Box { + match compression_config.scheme { + CompressionScheme::Zstd => Box::new(ZstdBufferCompressor::new( + compression_config.level.unwrap_or(0), + )), + CompressionScheme::None => Box::new(NoopBufferCompressor {}), } } } @@ -95,14 +138,17 @@ pub struct CompressedBufferEncoder { impl Default for CompressedBufferEncoder { fn default() -> Self { Self { - compressor: GeneralBufferCompressor::get_compressor("zstd"), + compressor: GeneralBufferCompressor::get_compressor(CompressionConfig { + scheme: CompressionScheme::Zstd, + level: Some(0), + }), } } } impl CompressedBufferEncoder { - pub fn new(compression_type: &str) -> Self { - let compressor = GeneralBufferCompressor::get_compressor(compression_type); + pub fn new(compression_config: CompressionConfig) -> Self { + let compressor = GeneralBufferCompressor::get_compressor(compression_config); Self { compressor } } } @@ -133,7 +179,7 @@ impl ArrayEncoder for CompressedBufferEncoder { let encoding = ProtobufUtils::flat_encoding( uncompressed_data.bits_per_value, comp_buf_index, - Some(CompressionScheme::Zstd), + Some(CompressionConfig::new(CompressionScheme::Zstd, None)), ); Ok(EncodedArray { @@ -142,3 +188,50 @@ impl ArrayEncoder for CompressedBufferEncoder { }) } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::DataType; + use std::str::FromStr; + use crate::buffer::LanceBuffer; + use crate::data::FixedWidthDataBlock; + + #[test] + fn test_compression_scheme_from_str() { + assert_eq!( + CompressionScheme::from_str("none").unwrap(), + CompressionScheme::None + ); + assert_eq!( + CompressionScheme::from_str("zstd").unwrap(), + CompressionScheme::Zstd + ); + } + + #[test] + fn test_compression_scheme_from_str_invalid() { + assert!(CompressionScheme::from_str("invalid").is_err()); + } + + #[test] + fn test_compressed_buffer_encoder() { + let encoder = CompressedBufferEncoder::default(); + let data = DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: 64, + data: LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4, 5, 6, 7]), + num_values: 8, + block_info: BlockInfo::new(), + used_encoding: UsedEncoding::new(), + }); + + let mut buffer_index = 0; + let encoded_array_result = encoder.encode(data, &DataType::Int64, &mut buffer_index); + assert!(encoded_array_result.is_ok(), "{:?}", encoded_array_result); + let encoded_array = encoded_array_result.unwrap(); + assert_eq!(encoded_array.data.num_values(), 8); + let buffers = encoded_array.data.into_buffers(); + assert_eq!(buffers.len(), 1); + assert!(buffers[0].len() < 64 * 8); + } +} diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index d549a0b031..d436eb4414 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -26,7 +26,7 @@ use crate::{ use lance_core::{Error, Result}; -use super::block_compress::{CompressionScheme, GeneralBufferCompressor}; +use super::block_compress::{CompressionConfig, CompressionScheme, GeneralBufferCompressor}; /// Scheduler for a simple encoding where buffers of fixed-size items are stored as-is on disk #[derive(Debug, Clone, Copy)] @@ -36,7 +36,7 @@ pub struct ValuePageScheduler { bytes_per_value: u64, buffer_offset: u64, buffer_size: u64, - compression_scheme: CompressionScheme, + compression_config: CompressionConfig, } impl ValuePageScheduler { @@ -44,13 +44,13 @@ impl ValuePageScheduler { bytes_per_value: u64, buffer_offset: u64, buffer_size: u64, - compression_scheme: CompressionScheme, + compression_config: CompressionConfig, ) -> Self { Self { bytes_per_value, buffer_offset, buffer_size, - compression_scheme, + compression_config, } } } @@ -63,7 +63,7 @@ impl PageScheduler for ValuePageScheduler { top_level_row: u64, ) -> BoxFuture<'static, Result>> { let (mut min, mut max) = (u64::MAX, 0); - let byte_ranges = if self.compression_scheme == CompressionScheme::None { + let byte_ranges = if self.compression_config.scheme == CompressionScheme::None { ranges .iter() .map(|range| { @@ -94,7 +94,7 @@ impl PageScheduler for ValuePageScheduler { let bytes = scheduler.submit_request(byte_ranges, top_level_row); let bytes_per_value = self.bytes_per_value; - let range_offsets = if self.compression_scheme != CompressionScheme::None { + let range_offsets = if self.compression_config.scheme != CompressionScheme::None { ranges .iter() .map(|range| { @@ -107,6 +107,7 @@ impl PageScheduler for ValuePageScheduler { vec![] }; + let compression_config = self.compression_config; async move { let bytes = bytes.await?; @@ -115,6 +116,7 @@ impl PageScheduler for ValuePageScheduler { data: bytes, uncompressed_data: Arc::new(Mutex::new(None)), uncompressed_range_offsets: range_offsets, + compression_config, }) as Box) } .boxed() @@ -126,13 +128,14 @@ struct ValuePageDecoder { data: Vec, uncompressed_data: Arc>>>, uncompressed_range_offsets: Vec>, + compression_config: CompressionConfig, } impl ValuePageDecoder { fn decompress(&self) -> Result> { // for compressed page, it is guaranteed that only one range is passed let bytes_u8: Vec = self.data[0].to_vec(); - let buffer_compressor = GeneralBufferCompressor::get_compressor(""); + let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config); let mut uncompressed_bytes: Vec = Vec::new(); buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?; @@ -371,7 +374,6 @@ impl FixedPerValueCompressor for ValueEncoder { // public tests module because we share the PRIMITIVE_TYPES constant with fixed_size_list #[cfg(test)] pub(crate) mod tests { - use arrow_schema::{DataType, Field, TimeUnit}; use crate::testing::check_round_trip_encoding_random; diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index a34bc1d931..ba2cc1dc10 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -23,7 +23,7 @@ use pb::{ FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable, PackedStruct, PageLayout, }; -use crate::encodings::physical::block_compress::CompressionScheme; +use crate::encodings::physical::block_compress::CompressionConfig; use self::pb::Constant; @@ -72,7 +72,7 @@ impl ProtobufUtils { pub fn flat_encoding( bits_per_value: u64, buffer_index: u32, - compression: Option, + compression: Option, ) -> ArrayEncoding { ArrayEncoding { array_encoding: Some(ArrayEncodingEnum::Flat(Flat { @@ -81,8 +81,9 @@ impl ProtobufUtils { buffer_index, buffer_type: BufferType::Page as i32, }), - compression: compression.map(|compression_scheme| pb::Compression { - scheme: compression_scheme.to_string(), + compression: compression.map(|compression_config| pb::Compression { + scheme: compression_config.scheme.to_string(), + level: compression_config.level, }), })), }