Skip to content

Commit

Permalink
feat: add a blob encoding for large binary values (#2868)
Browse files Browse the repository at this point in the history
Compared to the regular large binary encoding this will have
considerably less metadata (since there will be fewer pages). In the
future it will also be possible to read just the descriptions (allowing
for different ways of reading the data such as file objects).

The scheduling algorithm is also more conservative as well. We don't
fetch descriptions until we need them and we only read in one batch of
blob data at a time (instead of an entire page). This means the reader
will probably be slightly less performant (though, when reading values
this large, it shouldn't be too noticeable) but will use considerably
less RAM.
  • Loading branch information
westonpace authored Oct 4, 2024
1 parent add0f34 commit 73ab2b5
Show file tree
Hide file tree
Showing 23 changed files with 865 additions and 145 deletions.
7 changes: 7 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,18 @@ message ZoneIndex {
ColumnEncoding inner = 3;
}

// Marks a column as blob data. It will contain a packed struct
// with fields position and size (u64)
message Blob {
ColumnEncoding inner = 1;
}

// Encodings that describe a column of values
message ColumnEncoding {
oneof column_encoding {
// No special encoding, just column values
google.protobuf.Empty values = 1;
ZoneIndex zone_index = 2;
Blob blob = 3;
}
}
27 changes: 27 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,30 @@ def test_compression(tmp_path):
size_compress = os.path.getsize(tmp_path / "compress.lance")

assert size_compress < size_default


def test_blob(tmp_path):
# 100 1MiB values. If we store as regular large_binary we end up
# with several pages of values. If we store as a blob we get a
# single page
vals = pa.array([b"0" * (1024 * 1024) for _ in range(100)], pa.large_binary())
schema_no_blob = pa.schema([pa.field("val", pa.large_binary())])
schema_blob = pa.schema(
[pa.field("val", pa.large_binary(), metadata={"lance-encoding:blob": "true"})]
)

path = tmp_path / "no_blob.lance"
with LanceFileWriter(str(path), schema_no_blob) as writer:
writer.write_batch(pa.table({"val": vals}))

reader = LanceFileReader(str(path))
assert len(reader.metadata().columns[0].pages) > 1
assert reader.read_all().to_table() == pa.table({"val": vals})

path = tmp_path / "blob.lance"
with LanceFileWriter(str(path), schema_blob) as writer:
writer.write_batch(pa.table({"val": vals}))

reader = LanceFileReader(str(path))
assert len(reader.metadata().columns[0].pages) == 1
assert reader.read_all().to_table() == pa.table({"val": vals})
46 changes: 28 additions & 18 deletions rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use lance_encoding::{
},
encoder::{
encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodedColumn, EncodingOptions,
FieldEncoder,
FieldEncoder, OutOfLineBuffers,
},
format::pb,
EncodingsIo,
Expand Down Expand Up @@ -560,8 +560,7 @@ impl ZoneMapsFieldEncoder {
Ok(())
}

async fn maps_to_metadata(&mut self) -> Result<LanceBuffer> {
let maps = std::mem::take(&mut self.maps);
async fn maps_to_metadata(maps: Vec<CreatedZoneMap>) -> Result<LanceBuffer> {
let (mins, (maxes, null_counts)): (Vec<_>, (Vec<_>, Vec<_>)) = maps
.into_iter()
.map(|mp| (mp.min, (mp.max, mp.null_count)))
Expand Down Expand Up @@ -599,42 +598,53 @@ impl FieldEncoder for ZoneMapsFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
// TODO: If we do the zone map calculation as part of the encoding task then we can
// parallelize statistics gathering. Could be faster too since the encoding task is
// going to need to access the same data (although the input to an encoding task is
// probably too big for the CPU cache anyways). We can worry about this if we need
// to improve write speed.
self.update(&array)?;
self.items_encoder.maybe_encode(array)
self.items_encoder.maybe_encode(array, external_buffers)
}

fn flush(&mut self) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
self.items_encoder.flush()
fn flush(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
self.items_encoder.flush(external_buffers)
}

fn finish(&mut self) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
async move {
if self.cur_offset > 0 {
// Create final map
self.new_map()?;
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
if self.cur_offset > 0 {
// Create final map
if let Err(err) = self.new_map() {
return async move { Err(err) }.boxed();
}
let items_columns = self.items_encoder.finish().await?;
if items_columns.len() != 1 {
return Err(Error::InvalidInput {
source: format!("attempt to apply zone maps to a field encoder that generated {} columns of data (expected 1)", items_columns.len()).into(),
location: location!()})
}
let maps = std::mem::take(&mut self.maps);
let rows_per_zone = self.rows_per_map;
let items_columns = self.items_encoder.finish(external_buffers);

async move {
let items_columns = items_columns.await?;
if items_columns.is_empty() {
return Err(Error::invalid_input("attempt to apply zone maps to a field encoder that generated zero columns of data".to_string(), location!()))
}
let items_column = items_columns.into_iter().next().unwrap();
let final_pages = items_column.final_pages;
let mut column_buffers = items_column.column_buffers;
let zone_buffer_index = column_buffers.len();
column_buffers.push(self.maps_to_metadata().await?);
column_buffers.push(Self::maps_to_metadata(maps).await?);
let column_encoding = pb::ColumnEncoding {
column_encoding: Some(pb::column_encoding::ColumnEncoding::ZoneIndex(Box::new(
pb::ZoneIndex {
inner: Some(Box::new(items_column.encoding)),
rows_per_zone: self.rows_per_map,
rows_per_zone,
zone_map_buffer: Some(pb::Buffer {
buffer_index: zone_buffer_index as u32,
buffer_type: i32::from(pb::buffer::BufferType::Column),
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ futures.workspace = true
fsst.workspace = true
hex = "0.4.3"
itertools.workspace = true
lazy_static.workspace = true
log.workspace = true
num-traits.workspace = true
prost.workspace = true
Expand Down
25 changes: 24 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ use tracing::instrument;
use crate::data::DataBlock;
use crate::encoder::{values_column_encoding, EncodedBatch};
use crate::encodings::logical::binary::BinaryFieldScheduler;
use crate::encodings::logical::blob::{BlobFieldScheduler, DESC_FIELD};
use crate::encodings::logical::list::{ListFieldScheduler, OffsetPageInfo};
use crate::encodings::logical::primitive::PrimitiveFieldScheduler;
use crate::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb;
use crate::format::pb::{self, column_encoding};
use crate::{BufferScheduler, EncodingsIo};

// If users are getting batches over 10MiB large then it's time to reduce the batch size
Expand Down Expand Up @@ -754,6 +755,18 @@ impl CoreFieldDecoderStrategy {
)) as Arc<dyn FieldScheduler>),
))
}

fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
if let column_encoding::ColumnEncoding::Blob(blob) =
column_info.encoding.column_encoding.as_ref().unwrap()
{
let mut column_info = column_info.clone();
column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
Some(column_info)
} else {
None
}
}
}

impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
Expand All @@ -776,6 +789,16 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
return Ok((chain, Ok(scheduler)));
} else if data_type.is_binary_like() {
let column_info = column_infos.next().unwrap().clone();
if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
let desc_scheduler = self.create_primitive_scheduler(
DESC_FIELD.data_type(),
chain.current_path(),
&blob_col,
buffers,
)?;
let blob_scheduler = Arc::new(BlobFieldScheduler::new(desc_scheduler));
return Ok((chain, Ok(blob_scheduler)));
}
if let Some(page_info) = column_info.page_infos.first() {
if matches!(
page_info.encoding,
Expand Down
105 changes: 91 additions & 14 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use snafu::{location, Location};

use crate::buffer::LanceBuffer;
use crate::data::DataBlock;
use crate::encodings::logical::blob::{BlobFieldEncoder, DESC_FIELD};
use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg;
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
Expand All @@ -38,6 +39,9 @@ use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use std::collections::hash_map::RandomState;

pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression";
pub const BLOB_META_KEY: &str = "lance-encoding:blob";
pub const PACKED_STRUCT_LEGACY_META_KEY: &str = "packed";
pub const PACKED_STRUCT_META_KEY: &str = "lance-encoding:packed";

/// An encoded array
///
Expand Down Expand Up @@ -137,6 +141,48 @@ impl Default for EncodedColumn {
}
}

/// A tool to reserve space for buffers that are not in-line with the data
///
/// In most cases, buffers are stored in the page and referred to in the encoding
/// metadata by their index in the page. This keeps all buffers within a page together.
/// As a result, most encoders should not need to use this structure.
///
/// In some cases (currently only the large binary encoding) there is a need to access
/// buffers that are not in the page (becuase storing the position / offset of every page
/// in the page metadata would be too expensive).
///
/// To do this you can add a buffer with `add_buffer` and then use the returned position
/// in some way (in the large binary encoding the returned position is stored in the page
/// data as a position / size array).
pub struct OutOfLineBuffers {
position: u64,
buffers: Vec<LanceBuffer>,
}

impl OutOfLineBuffers {
pub fn new(base_position: u64) -> Self {
Self {
position: base_position,
buffers: Vec::new(),
}
}

pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
let position = self.position;
self.position += buffer.len() as u64;
self.buffers.push(buffer);
position
}

pub fn take_buffers(self) -> Vec<LanceBuffer> {
self.buffers
}

pub fn reset_position(&mut self, position: u64) {
self.position = position;
}
}

/// A task to create a page of data
pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;

Expand All @@ -162,7 +208,11 @@ pub trait FieldEncoder: Send {
/// than a single disk page.
///
/// It could also return an empty Vec if there is not enough data yet to encode any pages.
fn maybe_encode(&mut self, array: ArrayRef) -> Result<Vec<EncodeTask>>;
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
) -> Result<Vec<EncodeTask>>;
/// Flush any remaining data from the buffers into encoding tasks
///
/// Each encode task produces a single page. The order of these pages will be maintained
Expand All @@ -171,13 +221,16 @@ pub trait FieldEncoder: Send {
///
/// This may be called intermittently throughout encoding but will always be called
/// once at the end of encoding just before calling finish
fn flush(&mut self) -> Result<Vec<EncodeTask>>;
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
/// Finish encoding and return column metadata
///
/// This is called only once, after all encode tasks have completed
///
/// This returns a Vec because a single field may have created multiple columns
fn finish(&mut self) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;

/// The number of output columns this encoding will create
fn num_columns(&self) -> u32;
Expand Down Expand Up @@ -651,12 +704,27 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
) -> Result<Box<dyn FieldEncoder>> {
let data_type = field.data_type();
if Self::is_primitive_type(&data_type) {
Ok(Box::new(PrimitiveFieldEncoder::try_new(
options,
self.array_encoding_strategy.clone(),
column_index.next_column_index(field.id as u32),
field.clone(),
)?))
let column_index = column_index.next_column_index(field.id as u32);
if field.metadata.contains_key(BLOB_META_KEY) {
let mut packed_meta = HashMap::new();
packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
let desc_field =
Field::try_from(DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
options,
self.array_encoding_strategy.clone(),
column_index,
desc_field,
)?);
Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
} else {
Ok(Box::new(PrimitiveFieldEncoder::try_new(
options,
self.array_encoding_strategy.clone(),
column_index,
field.clone(),
)?))
}
} else {
match data_type {
DataType::List(_child) | DataType::LargeList(_child) => {
Expand All @@ -680,9 +748,9 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
DataType::Struct(_) => {
let field_metadata = &field.metadata;
if field_metadata
.get("packed")
.get(PACKED_STRUCT_LEGACY_META_KEY)
.map(|v| v == "true")
.unwrap_or(false)
.unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
{
Ok(Box::new(PrimitiveFieldEncoder::try_new(
options,
Expand Down Expand Up @@ -825,17 +893,26 @@ pub async fn encode_batch(
let mut page_table = Vec::new();
let mut col_idx_offset = 0;
for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
let mut tasks = encoder.maybe_encode(arr.clone())?;
tasks.extend(encoder.flush()?);
let mut external_buffers = OutOfLineBuffers::new(data_buffer.len() as u64);
let mut tasks = encoder.maybe_encode(arr.clone(), &mut external_buffers)?;
tasks.extend(encoder.flush(&mut external_buffers)?);
for buffer in external_buffers.take_buffers() {
data_buffer.extend_from_slice(&buffer);
}
let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
for task in tasks {
let encoded_page = task.await?;
// Write external buffers first
pages
.entry(encoded_page.column_idx)
.or_default()
.push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
}
let encoded_columns = encoder.finish().await?;
let mut external_buffers = OutOfLineBuffers::new(data_buffer.len() as u64);
let encoded_columns = encoder.finish(&mut external_buffers).await?;
for buffer in external_buffers.take_buffers() {
data_buffer.extend_from_slice(&buffer);
}
let num_columns = encoded_columns.len();
for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
let col_idx = col_idx + col_idx_offset;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod binary;
pub mod blob;
pub mod list;
pub mod primitive;
pub mod r#struct;
Loading

0 comments on commit 73ab2b5

Please sign in to comment.