Skip to content

Commit

Permalink
Add compression-level configuration for general compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
niyue committed Oct 23, 2024
1 parent 536e73d commit 2a43118
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 57 deletions.
1 change: 1 addition & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ message FixedSizeList {

message Compression {
string scheme = 1;
optional int32 level = 2;
}

// Fixed width items placed contiguously in a buffer
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub use field::SchemaCompareOptions;
pub use schema::Schema;

pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression";
pub const COMPRESSION_LEVEL_META_KEY: &str = "lance-encoding:compression-level";
pub const BLOB_META_KEY: &str = "lance-encoding:blob";
pub const PACKED_STRUCT_LEGACY_META_KEY: &str = "packed";
pub const PACKED_STRUCT_META_KEY: &str = "lance-encoding:packed";
Expand Down
64 changes: 56 additions & 8 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{
Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_META_KEY,
PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
};
use lance_core::{Error, Result};
use snafu::{location, Location};
Expand All @@ -24,7 +24,7 @@ use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg;
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
use crate::encodings::physical::block_compress::CompressionScheme;
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fsst::FsstArrayEncoder;
use crate::encodings::physical::packed_struct::PackedStructEncoder;
Expand Down Expand Up @@ -454,9 +454,18 @@ impl CoreArrayEncodingStrategy {
&& data_size > 4 * 1024 * 1024
}

fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionScheme> {
fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
let compression = field_meta.get(COMPRESSION_META_KEY)?;
Some(compression.parse::<CompressionScheme>().unwrap())
let compression_scheme = compression.parse::<CompressionScheme>();
match compression_scheme {
Ok(compression_scheme) => Some(CompressionConfig::new(
compression_scheme,
field_meta
.get(COMPRESSION_LEVEL_META_KEY)
.and_then(|level| level.parse().ok()),
)),
Err(_) => None,
}
}

fn default_binary_encoder(
Expand Down Expand Up @@ -1327,13 +1336,15 @@ pub async fn encode_batch(

#[cfg(test)]
pub mod tests {
use crate::version::LanceFileVersion;
use arrow_array::{ArrayRef, StringArray};
use arrow_schema::Field;
use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
use std::collections::HashMap;
use std::sync::Arc;

use crate::version::LanceFileVersion;

use super::check_dict_encoding;
use super::check_fixed_size_encoding;
use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy};

fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
let arr = StringArray::from(arr);
Expand Down Expand Up @@ -1454,4 +1465,41 @@ pub mod tests {
LanceFileVersion::V2_1
));
}

fn verify_array_encoder(
array: ArrayRef,
field_meta: Option<HashMap<String, String>>,
version: LanceFileVersion,
expected_encoder: &str,
) {
let encoding_strategy = CoreArrayEncodingStrategy { version };
let mut field = Field::new("test_field", array.data_type().clone(), true);
if let Some(field_meta) = field_meta {
field.set_metadata(field_meta.clone());
}
let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
assert!(encoder_result.is_ok());
let encoder = encoder_result.unwrap();
assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
}

#[test]
fn test_choose_encoder_for_zstd_compressed_string_field() {
verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
LanceFileVersion::V2_1,
"BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
}

#[test]
fn test_choose_encoder_for_zstd_compression_level() {
verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
Some(HashMap::from([
(COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
(COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
])),
LanceFileVersion::V2_1,
"BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
}
}
64 changes: 47 additions & 17 deletions rust/lance-encoding/src/encodings/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_schema::DataType;
use block_compress::CompressionScheme;
use block_compress::CompressionConfig;
use fsst::FsstPageScheduler;
use lance_arrow::DataTypeExt;
use packed_struct::PackedStructPageScheduler;

use crate::{
decoder::PageScheduler,
format::pb::{self, PackedStruct},
};

use self::{
basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
value::ValuePageScheduler,
};
use crate::encodings::physical::block_compress::CompressionScheme;
use crate::{
decoder::PageScheduler,
format::pb::{self, PackedStruct},
};

pub mod basic;
pub mod binary;
Expand Down Expand Up @@ -68,17 +68,14 @@ fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
/// Convert a protobuf buffer encoding into a physical page scheduler
fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
let compression_scheme: CompressionScheme = if encoding.compression.is_none() {
CompressionScheme::None
let compression_config: CompressionConfig = if encoding.compression.is_none() {
CompressionConfig::new(CompressionScheme::None, None)
} else {
encoding
.compression
.as_ref()
.unwrap()
.scheme
.as_str()
.parse()
.unwrap()
let compression = encoding.compression.as_ref().unwrap();
CompressionConfig::new(
compression.scheme.as_str().parse().unwrap(),
compression.level,
)
};
match encoding.bits_per_value {
1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
Expand All @@ -93,7 +90,7 @@ fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn Pag
bits_per_value / 8,
buffer_offset,
buffer_size,
compression_scheme,
compression_config,
))
}
}
Expand Down Expand Up @@ -288,3 +285,36 @@ pub fn decoder_from_array_encoding(
pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
}
}

#[cfg(test)]
mod tests {
use crate::encodings::physical::{get_buffer_decoder, ColumnBuffers, FileBuffers, PageBuffers};
use crate::format::pb;

#[test]
fn test_get_buffer_decoder_for_compressed_buffer() {
let page_scheduler = get_buffer_decoder(
&pb::Flat {
buffer: Some(pb::Buffer {
buffer_index: 0,
buffer_type: pb::buffer::BufferType::File as i32,
}),
bits_per_value: 8,
compression: Some(pb::Compression {
scheme: "zstd".to_string(),
level: Some(0),
}),
},
&PageBuffers {
column_buffers: ColumnBuffers {
file_buffers: FileBuffers {
positions_and_sizes: &[(0, 100)],
},
positions_and_sizes: &[],
},
positions_and_sizes: &[],
},
);
assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
}
}
14 changes: 6 additions & 8 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow_array::{PrimitiveArray, UInt64Array};
use arrow_schema::DataType;
use lance_core::Result;

use super::block_compress::{BufferCompressor, CompressionScheme, GeneralBufferCompressor};
use super::block_compress::{BufferCompressor, CompressionConfig, GeneralBufferCompressor};

struct IndicesNormalizer {
indices: Vec<u64>,
Expand Down Expand Up @@ -345,20 +345,19 @@ impl PrimitivePageDecoder for BinaryPageDecoder {
#[derive(Debug)]
pub struct BinaryEncoder {
indices_encoder: Box<dyn ArrayEncoder>,
compression_scheme: Option<CompressionScheme>,
compression_config: Option<CompressionConfig>,
buffer_compressor: Option<Box<dyn BufferCompressor>>,
}

impl BinaryEncoder {
pub fn new(
indices_encoder: Box<dyn ArrayEncoder>,
compression_scheme: Option<CompressionScheme>,
compression_config: Option<CompressionConfig>,
) -> Self {
let buffer_compressor = compression_scheme
.map(|scheme| GeneralBufferCompressor::get_compressor(&scheme.to_string()));
let buffer_compressor = compression_config.map(GeneralBufferCompressor::get_compressor);
Self {
indices_encoder,
compression_scheme,
compression_config,
buffer_compressor,
}
}
Expand Down Expand Up @@ -515,7 +514,7 @@ impl ArrayEncoder for BinaryEncoder {
let bytes_encoding = ProtobufUtils::flat_encoding(
/*bits_per_value=*/ 8,
bytes_buffer_index,
self.compression_scheme,
self.compression_config,
);

let encoding =
Expand All @@ -527,7 +526,6 @@ impl ArrayEncoder for BinaryEncoder {

#[cfg(test)]
pub mod tests {

use arrow_array::{
builder::{LargeStringBuilder, StringBuilder},
ArrayRef, StringArray,
Expand Down
Loading

0 comments on commit 2a43118

Please sign in to comment.