From c0cd7e09cdc08e2cb56cfb172e1a1b8d2dca0db3 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 1 Aug 2023 20:23:18 +0800 Subject: [PATCH 01/11] Parquet: fix and test inconsistent Boolean encoding --- cpp/src/parquet/encoding.cc | 5 ++++- cpp/src/parquet/encoding_test.cc | 35 +++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dda0e7701b1e4..429d446898e31 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -340,6 +340,8 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } + // Put arrow array cannot mix with PlainEncoder::PutImpl. + DCHECK_EQ(0, bit_writer_.bytes_written()); const auto& data = checked_cast(values); if (data.null_count() == 0) { @@ -354,6 +356,7 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco sink_.length(), n_valid); for (int64_t i = 0; i < data.length(); i++) { + // Only valid boolean data will call `writer.Next`. if (data.IsValid(i)) { if (data.Value(i)) { writer.Set(); @@ -365,7 +368,7 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco } writer.Finish(); } - sink_.UnsafeAdvance(data.length()); + sink_.UnsafeAdvance(data.length() - data.null_count()); } private: diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 7a910e4220831..006f0a1594150 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -580,7 +580,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::StringBuilder); + acc.builder = std::make_unique<::arrow::StringBuilder>(); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), @@ -665,6 +665,33 @@ class EncodingAdHocTyped : public ::testing::Test { ::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true); } + void PlainTwice(int seed) { + auto values_single = GetValues(seed); + auto encoder = MakeTypedEncoder( + Encoding::PLAIN, /*use_dictionary=*/false, column_descr()); + auto decoder = MakeTypedDecoder(Encoding::PLAIN, column_descr()); + + ASSERT_NO_THROW(encoder->Put(*values_single)); + ASSERT_NO_THROW(encoder->Put(*values_single)); + auto buf = encoder->FlushValues(); + + EXPECT_OK_AND_ASSIGN(auto values, + ::arrow::Concatenate({values_single, values_single})); + decoder->SetData(static_cast(values->length()), buf->data(), + static_cast(buf->size())); + + BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); + ASSERT_EQ(values->length() - values->null_count(), + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.Finish(&result)); + ASSERT_EQ(100, result->length()); + ::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true); + } + void ByteStreamSplit(int seed) { if (!std::is_same::value && !std::is_same::value) { @@ -882,6 +909,12 @@ TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) { } } +TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut2) { + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + this->PlainTwice(seed); + } +} + TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { this->ByteStreamSplit(seed); From 961e25568f81344604464b9b48ee9b8f833074b4 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 1 Aug 2023 21:37:50 +0800 Subject: [PATCH 02/11] fix compiling --- cpp/src/parquet/encoding_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 006f0a1594150..b1c00f4ba5348 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -26,6 +26,7 @@ #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" +#include "arrow/array/concatenate.h" #include "arrow/compute/cast.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" From 1765718a7f33521b50db63b5f5d92364abae6bca Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 2 Aug 2023 11:11:48 +0800 Subject: [PATCH 03/11] [Update] Fix comment --- cpp/src/parquet/encoding.cc | 3 +- cpp/src/parquet/encoding_test.cc | 52 +++++++++++--------------------- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 429d446898e31..d19ee5452990b 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -340,7 +340,8 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } - // Put arrow array cannot mix with PlainEncoder::PutImpl. + // Cannot Put arrow array when PlainEncoder::PutImpl has remaining writes + // in `bit_writer_`. DCHECK_EQ(0, bit_writer_.bytes_written()); const auto& data = checked_cast(values); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index b1c00f4ba5348..b8354964be8a7 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -642,54 +642,38 @@ class EncodingAdHocTyped : public ::testing::Test { static std::shared_ptr<::arrow::DataType> arrow_type(); - void Plain(int seed) { - auto values = GetValues(seed); + void Plain(int seed, int round = 1) { + auto random_array = GetValues(seed); auto encoder = MakeTypedEncoder( Encoding::PLAIN, /*use_dictionary=*/false, column_descr()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, column_descr()); - ASSERT_NO_THROW(encoder->Put(*values)); - auto buf = encoder->FlushValues(); - - int num_values = static_cast(values->length() - values->null_count()); - decoder->SetData(num_values, buf->data(), static_cast(buf->size())); - - BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); - ASSERT_EQ(num_values, - decoder->DecodeArrow(static_cast(values->length()), - static_cast(values->null_count()), - values->null_bitmap_data(), values->offset(), &acc)); - - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.Finish(&result)); - ASSERT_EQ(50, result->length()); - ::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true); - } - - void PlainTwice(int seed) { - auto values_single = GetValues(seed); - auto encoder = MakeTypedEncoder( - Encoding::PLAIN, /*use_dictionary=*/false, column_descr()); - auto decoder = MakeTypedDecoder(Encoding::PLAIN, column_descr()); - - ASSERT_NO_THROW(encoder->Put(*values_single)); - ASSERT_NO_THROW(encoder->Put(*values_single)); + for (int i = 0; i < round; ++i) { + ASSERT_NO_THROW(encoder->Put(*random_array)); + } + std::shared_ptr<::arrow::Array> values; + if (round == 1) { + values = random_array; + } else { + ::arrow::ArrayVector arrays; + arrays.resize(round, random_array); + EXPECT_OK_AND_ASSIGN(values, + ::arrow::Concatenate(arrays, ::arrow::default_memory_pool())); + } auto buf = encoder->FlushValues(); - EXPECT_OK_AND_ASSIGN(auto values, - ::arrow::Concatenate({values_single, values_single})); decoder->SetData(static_cast(values->length()), buf->data(), static_cast(buf->size())); BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); - ASSERT_EQ(values->length() - values->null_count(), + ASSERT_EQ(static_cast(values->length() - values->null_count()), decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; ASSERT_OK(acc.Finish(&result)); - ASSERT_EQ(100, result->length()); + ASSERT_EQ(values->length(), result->length()); ::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true); } @@ -910,9 +894,9 @@ TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) { } } -TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut2) { +TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) { for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - this->PlainTwice(seed); + this->Plain(seed, /*round=*/5); } } From 45bbef1eb1f489b043b4990f5a58e184e4578eef Mon Sep 17 00:00:00 2001 From: mwish Date: Sun, 6 Aug 2023 15:05:10 +0800 Subject: [PATCH 04/11] [Update] resolve comment --- cpp/src/parquet/encoding.cc | 8 +++++--- cpp/src/parquet/encoding_test.cc | 33 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index d19ee5452990b..de1a8117110d2 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -340,9 +340,11 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } - // Cannot Put arrow array when PlainEncoder::PutImpl has remaining writes - // in `bit_writer_`. - DCHECK_EQ(0, bit_writer_.bytes_written()); + if (ARROW_PREDICT_FALSE(bit_writer_.bytes_written() != 0)) { + throw ParquetException( + "Cannot Put arrow array when PlainEncoder " + "has remaining writes in `bit_writer_`"); + } const auto& data = checked_cast(values); if (data.null_count() == 0) { diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index b8354964be8a7..0afde5bb65a6f 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1645,6 +1645,39 @@ TEST_F(TestRleBooleanEncoding, AllNull) { /*null_probability*/ 1)); } +class TestPlainBooleanEncoding : public TestEncodingBase {}; + +TEST(TestPlainBooleanArrayEncoding, AdHocRoundTrip) { + std::vector> arrays{ + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"), + }; + + auto encoder = MakeTypedEncoder(Encoding::PLAIN, + /*use_dictionary=*/false); + for (const auto& array : arrays) { + encoder->Put(*array); + } + auto buffer = encoder->FlushValues(); + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); + decoder->SetData(static_cast(expected->length()), buffer->data(), + static_cast(buffer->size())); + + ::arrow::BooleanBuilder builder; + ASSERT_EQ(static_cast(expected->length() - expected->null_count()), + decoder->DecodeArrow(static_cast(expected->length()), + static_cast(expected->null_count()), + expected->null_bitmap_data(), 0, &builder)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(builder.Finish(&result)); + ASSERT_EQ(expected->length(), result->length()); + ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); +} + // ---------------------------------------------------------------------- // DELTA_LENGTH_BYTE_ARRAY encode/decode tests. From 809a7187ee96d44cdfc557f8c65833f6e5975101 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 8 Aug 2023 21:29:21 +0800 Subject: [PATCH 05/11] Rewrite implement using PutImpl --- cpp/src/parquet/encoding.cc | 40 ++++++++++++++----------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index de1a8117110d2..ad3b0459f357f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -340,38 +340,28 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } - if (ARROW_PREDICT_FALSE(bit_writer_.bytes_written() != 0)) { - throw ParquetException( - "Cannot Put arrow array when PlainEncoder " - "has remaining writes in `bit_writer_`"); - } - const auto& data = checked_cast(values); + if (data.null_count() == 0) { - PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length()))); - // no nulls, just dump the data - ::arrow::internal::CopyBitmap(data.data()->GetValues(1), data.offset(), - data.length(), sink_.mutable_data(), sink_.length()); + ArrowPoolVector boolean_data(data.length(), this->memory_pool()); + for (int i = 0; i < data.length(); ++i) { + boolean_data[i] = data.Value(i); + } + PutImpl(boolean_data, static_cast(data.length())); } else { - auto n_valid = bit_util::BytesForBits(data.length() - data.null_count()); - PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid)); - ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(), - sink_.length(), n_valid); - - for (int64_t i = 0; i < data.length(); i++) { - // Only valid boolean data will call `writer.Next`. + ArrowPoolVector boolean_data(data.length() - data.null_count(), + this->memory_pool()); + int boolean_data_index = 0; + for (int i = 0; i < data.length(); ++i) { if (data.IsValid(i)) { - if (data.Value(i)) { - writer.Set(); - } else { - writer.Clear(); - } - writer.Next(); + DCHECK(boolean_data_index < + static_cast(data.length() - data.null_count())); + boolean_data[boolean_data_index] = data.Value(i); + ++boolean_data_index; } } - writer.Finish(); + PutImpl(boolean_data, static_cast(data.length() - data.null_count())); } - sink_.UnsafeAdvance(data.length() - data.null_count()); } private: From 5c3a23e736015066910991d79933a759589c0b5f Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 10 Aug 2023 11:24:46 +0800 Subject: [PATCH 06/11] tiny reflect logic --- cpp/src/parquet/encoding.cc | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index ad3b0459f357f..f3087e9b017df 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -343,11 +343,24 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco const auto& data = checked_cast(values); if (data.null_count() == 0) { - ArrowPoolVector boolean_data(data.length(), this->memory_pool()); - for (int i = 0; i < data.length(); ++i) { - boolean_data[i] = data.Value(i); + int written_bytes = 0; + while (written_bytes < static_cast(data.length())) { + auto directly_copy_bits = + std::min(static_cast(data.length() - written_bytes), bits_available_); + int i = 0; + for (; i < directly_copy_bits; i++) { + bit_writer_.PutValue(data.Value(i + written_bytes), 1); + } + bits_available_ -= directly_copy_bits; + written_bytes += directly_copy_bits; + if (bits_available_ == 0) { + bit_writer_.Flush(); + PARQUET_THROW_NOT_OK( + sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + bit_writer_.Clear(); + bits_available_ = static_cast(bits_buffer_->size()) * 8; + } } - PutImpl(boolean_data, static_cast(data.length())); } else { ArrowPoolVector boolean_data(data.length() - data.null_count(), this->memory_pool()); @@ -390,6 +403,7 @@ void PlainEncoder::PutImpl(const SequenceType& src, int num_values) PARQUET_THROW_NOT_OK( sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); + bits_available_ = static_cast(bits_buffer_->size()) * 8; } } From 5e7d84191d232e8c3b60d0f792d2b7abd657e43e Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 10 Aug 2023 14:07:48 +0800 Subject: [PATCH 07/11] add a valid_bit_length_ for bug fixing --- cpp/src/parquet/encoding.cc | 75 ++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index f3087e9b017df..559a7063d0457 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -311,8 +311,8 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco bits_available_(kInMemoryDefaultCapacity * 8), bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), sink_(pool), - bit_writer_(bits_buffer_->mutable_data(), - static_cast(bits_buffer_->size())) {} + bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())), + valid_bit_length_(0) {} int64_t EstimatedDataEncodedSize() override; std::shared_ptr FlushValues() override; @@ -340,48 +340,51 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } + if (ARROW_PREDICT_FALSE(bit_writer_.bytes_written() != 0)) { + throw ParquetException( + "direct put to boolean cannot be interleaved with other " + "operations"); + } const auto& data = checked_cast(values); if (data.null_count() == 0) { - int written_bytes = 0; - while (written_bytes < static_cast(data.length())) { - auto directly_copy_bits = - std::min(static_cast(data.length() - written_bytes), bits_available_); - int i = 0; - for (; i < directly_copy_bits; i++) { - bit_writer_.PutValue(data.Value(i + written_bytes), 1); - } - bits_available_ -= directly_copy_bits; - written_bytes += directly_copy_bits; - if (bits_available_ == 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - bits_available_ = static_cast(bits_buffer_->size()) * 8; - } - } + PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length()))); + // no nulls, just dump the data + ::arrow::internal::CopyBitmap(data.data()->GetValues(1), data.offset(), + data.length(), sink_.mutable_data(), + valid_bit_length_); } else { - ArrowPoolVector boolean_data(data.length() - data.null_count(), - this->memory_pool()); - int boolean_data_index = 0; - for (int i = 0; i < data.length(); ++i) { + auto n_valid = bit_util::BytesForBits(data.length() - data.null_count()); + PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid)); + ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(), + valid_bit_length_, n_valid); + + for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { - DCHECK(boolean_data_index < - static_cast(data.length() - data.null_count())); - boolean_data[boolean_data_index] = data.Value(i); - ++boolean_data_index; + if (data.Value(i)) { + writer.Set(); + } else { + writer.Clear(); + } + writer.Next(); } } - PutImpl(boolean_data, static_cast(data.length() - data.null_count())); + writer.Finish(); } + valid_bit_length_ += data.length() - data.null_count(); } private: + // bits_available_, bits_buffer_ and bit_writer_ is and append to + // `sink_` when Put using `Put` and `PutSpace`. + // + // valid_bit_length_ is used when Put using `::arrow::BooleanArray`. + int bits_available_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; + int valid_bit_length_; template void PutImpl(const SequenceType& src, int num_values); @@ -389,6 +392,9 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco template void PlainEncoder::PutImpl(const SequenceType& src, int num_values) { + if (ARROW_PREDICT_FALSE(valid_bit_length_ != 0)) { + throw ParquetException("valid_bit_length_ must be zero"); + } int bit_offset = 0; if (bits_available_ > 0) { int bits_to_write = std::min(bits_available_, num_values); @@ -434,7 +440,16 @@ int64_t PlainEncoder::EstimatedDataEncodedSize() { } std::shared_ptr PlainEncoder::FlushValues() { - if (bits_available_ > 0) { + if (valid_bit_length_ > 0) { + if (ARROW_PREDICT_FALSE(bit_writer_.bytes_written() > 0)) { + throw ParquetException( + "direct put to boolean cannot be interleaved with other " + "operations"); + } + sink_.UnsafeAdvance(::arrow::bit_util::BytesForBits(valid_bit_length_)); + valid_bit_length_ = 0; + } + if (bits_available_ > 0 && bit_writer_.bytes_written() != 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); From ccdedb4c18408a074fcdca5ecef07bfeb973a269 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 10 Aug 2023 14:40:11 +0800 Subject: [PATCH 08/11] change valid_bit_length to i64 --- cpp/src/parquet/encoding.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 559a7063d0457..f61d794bc2dcd 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -384,7 +384,7 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; - int valid_bit_length_; + int64_t valid_bit_length_; template void PutImpl(const SequenceType& src, int num_values); From 68f28e8c461c89bbf08f96a050b0e97c6b9531a2 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 10 Aug 2023 18:53:27 +0800 Subject: [PATCH 09/11] refactor bit write using TypedBufferBuilder --- cpp/src/parquet/encoding.cc | 106 ++++-------------------------------- 1 file changed, 10 insertions(+), 96 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index f61d794bc2dcd..01e8ba38ea9c7 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -66,7 +66,6 @@ using ArrowPoolVector = std::vector>; namespace parquet { namespace { -constexpr int64_t kInMemoryDefaultCapacity = 1024; // The Parquet spec isn't very clear whether ByteArray lengths are signed or // unsigned, but the Java implementation uses signed ints. constexpr size_t kMaxByteArraySize = std::numeric_limits::max(); @@ -307,12 +306,7 @@ template <> class PlainEncoder : public EncoderImpl, virtual public BooleanEncoder { public: explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool) - : EncoderImpl(descr, Encoding::PLAIN, pool), - bits_available_(kInMemoryDefaultCapacity * 8), - bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), - sink_(pool), - bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())), - valid_bit_length_(0) {} + : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {} int64_t EstimatedDataEncodedSize() override; std::shared_ptr FlushValues() override; @@ -340,51 +334,24 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } - if (ARROW_PREDICT_FALSE(bit_writer_.bytes_written() != 0)) { - throw ParquetException( - "direct put to boolean cannot be interleaved with other " - "operations"); - } const auto& data = checked_cast(values); if (data.null_count() == 0) { - PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length()))); // no nulls, just dump the data - ::arrow::internal::CopyBitmap(data.data()->GetValues(1), data.offset(), - data.length(), sink_.mutable_data(), - valid_bit_length_); + PARQUET_THROW_NOT_OK(sink_.Reserve(data.length())); + sink_.UnsafeAppend(data.data()->GetValues(1), /*offset=*/0, data.length()); } else { - auto n_valid = bit_util::BytesForBits(data.length() - data.null_count()); - PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid)); - ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(), - valid_bit_length_, n_valid); - + PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count())); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { - if (data.Value(i)) { - writer.Set(); - } else { - writer.Clear(); - } - writer.Next(); + sink_.UnsafeAppend(data.Value(i)); } } - writer.Finish(); } - valid_bit_length_ += data.length() - data.null_count(); } private: - // bits_available_, bits_buffer_ and bit_writer_ is and append to - // `sink_` when Put using `Put` and `PutSpace`. - // - // valid_bit_length_ is used when Put using `::arrow::BooleanArray`. - - int bits_available_; - std::shared_ptr bits_buffer_; - ::arrow::BufferBuilder sink_; - ::arrow::bit_util::BitWriter bit_writer_; - int64_t valid_bit_length_; + ::arrow::TypedBufferBuilder sink_; template void PutImpl(const SequenceType& src, int num_values); @@ -392,70 +359,17 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco template void PlainEncoder::PutImpl(const SequenceType& src, int num_values) { - if (ARROW_PREDICT_FALSE(valid_bit_length_ != 0)) { - throw ParquetException("valid_bit_length_ must be zero"); - } - int bit_offset = 0; - if (bits_available_ > 0) { - int bits_to_write = std::min(bits_available_, num_values); - for (int i = 0; i < bits_to_write; i++) { - bit_writer_.PutValue(src[i], 1); - } - bits_available_ -= bits_to_write; - bit_offset = bits_to_write; - - if (bits_available_ == 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - bits_available_ = static_cast(bits_buffer_->size()) * 8; - } - } - - int bits_remaining = num_values - bit_offset; - while (bit_offset < num_values) { - bits_available_ = static_cast(bits_buffer_->size()) * 8; - - int bits_to_write = std::min(bits_available_, bits_remaining); - for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { - bit_writer_.PutValue(src[i], 1); - } - bit_offset += bits_to_write; - bits_available_ -= bits_to_write; - bits_remaining -= bits_to_write; - - if (bits_available_ == 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - } + PARQUET_THROW_NOT_OK(sink_.Reserve(num_values)); + for (int i = 0; i < num_values; ++i) { + sink_.UnsafeAppend(src[i]); } } int64_t PlainEncoder::EstimatedDataEncodedSize() { - int64_t position = sink_.length(); - return position + bit_writer_.bytes_written(); + return ::arrow::bit_util::BytesForBits(sink_.length()); } std::shared_ptr PlainEncoder::FlushValues() { - if (valid_bit_length_ > 0) { - if (ARROW_PREDICT_FALSE(bit_writer_.bytes_written() > 0)) { - throw ParquetException( - "direct put to boolean cannot be interleaved with other " - "operations"); - } - sink_.UnsafeAdvance(::arrow::bit_util::BytesForBits(valid_bit_length_)); - valid_bit_length_ = 0; - } - if (bits_available_ > 0 && bit_writer_.bytes_written() != 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - bits_available_ = static_cast(bits_buffer_->size()) * 8; - } - std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer)); return buffer; From 06dac34eea34030a60bea906b053991913e287d4 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 11 Aug 2023 00:13:14 +0800 Subject: [PATCH 10/11] resolve comment and bug fix --- cpp/src/parquet/encoding.cc | 3 ++- cpp/src/parquet/encoding_test.cc | 17 +++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 01e8ba38ea9c7..e972a86ccf0ef 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -339,7 +339,8 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco if (data.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK(sink_.Reserve(data.length())); - sink_.UnsafeAppend(data.data()->GetValues(1), /*offset=*/0, data.length()); + sink_.UnsafeAppend(data.data()->GetValues(1, 0), data.offset(), + data.length()); } else { PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count())); for (int64_t i = 0; i < data.length(); i++) { diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 0afde5bb65a6f..2a67ed159ceec 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -642,21 +642,20 @@ class EncodingAdHocTyped : public ::testing::Test { static std::shared_ptr<::arrow::DataType> arrow_type(); - void Plain(int seed, int round = 1) { + void Plain(int seed, int rounds = 1) { auto random_array = GetValues(seed); auto encoder = MakeTypedEncoder( Encoding::PLAIN, /*use_dictionary=*/false, column_descr()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, column_descr()); - for (int i = 0; i < round; ++i) { + for (int i = 0; i < rounds; ++i) { ASSERT_NO_THROW(encoder->Put(*random_array)); } std::shared_ptr<::arrow::Array> values; - if (round == 1) { + if (rounds == 1) { values = random_array; } else { - ::arrow::ArrayVector arrays; - arrays.resize(round, random_array); + ::arrow::ArrayVector arrays(rounds, random_array); EXPECT_OK_AND_ASSIGN(values, ::arrow::Concatenate(arrays, ::arrow::default_memory_pool())); } @@ -895,8 +894,10 @@ TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) { } TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) { + // Check that one can put several Arrow arrays into a given encoder + // and decode to the right values (see GH-36939) for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - this->Plain(seed, /*round=*/5); + this->Plain(seed, /*rounds=*/5); } } @@ -1645,8 +1646,8 @@ TEST_F(TestRleBooleanEncoding, AllNull) { /*null_probability*/ 1)); } -class TestPlainBooleanEncoding : public TestEncodingBase {}; - +// Check that one can put several Arrow arrays into a given encoder +// and decode to the right values (see GH-36939) TEST(TestPlainBooleanArrayEncoding, AdHocRoundTrip) { std::vector> arrays{ ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), From 8c23944a2cebc63d443f6baf4f1e44c96d80f21f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 10 Aug 2023 18:31:45 +0200 Subject: [PATCH 11/11] Improve tests to also exercise slicing, and arrays without nulls --- cpp/src/parquet/encoding_test.cc | 93 +++++++++++++++++++------------- 1 file changed, 55 insertions(+), 38 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 2a67ed159ceec..0ac5fd76e79c9 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -598,6 +598,39 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { } } +// Check that one can put several Arrow arrays into a given encoder +// and decode to the right values (see GH-36939) +TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) { + std::vector> arrays{ + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"), + }; + + auto encoder = MakeTypedEncoder(Encoding::PLAIN, + /*use_dictionary=*/false); + for (const auto& array : arrays) { + encoder->Put(*array); + } + auto buffer = encoder->FlushValues(); + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); + decoder->SetData(static_cast(expected->length()), buffer->data(), + static_cast(buffer->size())); + + ::arrow::BooleanBuilder builder; + ASSERT_EQ(static_cast(expected->length() - expected->null_count()), + decoder->DecodeArrow(static_cast(expected->length()), + static_cast(expected->null_count()), + expected->null_bitmap_data(), 0, &builder)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(builder.Finish(&result)); + ASSERT_EQ(expected->length(), result->length()); + ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); +} + template void GetDictDecoder(DictEncoder* encoder, int64_t num_values, std::shared_ptr* out_values, @@ -642,8 +675,8 @@ class EncodingAdHocTyped : public ::testing::Test { static std::shared_ptr<::arrow::DataType> arrow_type(); - void Plain(int seed, int rounds = 1) { - auto random_array = GetValues(seed); + void Plain(int seed, int rounds = 1, int offset = 0) { + auto random_array = GetValues(seed)->Slice(offset); auto encoder = MakeTypedEncoder( Encoding::PLAIN, /*use_dictionary=*/false, column_descr()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, column_descr()); @@ -888,17 +921,34 @@ using EncodingAdHocTypedCases = TYPED_TEST_SUITE(EncodingAdHocTyped, EncodingAdHocTypedCases); TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) { - for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + for (auto seed : {0, 1, 2, 3, 4}) { this->Plain(seed); } + // Same, but without nulls (this could trigger different code paths) + this->null_probability_ = 0.0; + for (auto seed : {0, 1, 2, 3, 4}) { + this->Plain(seed, /*rounds=*/3); + } } TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) { // Check that one can put several Arrow arrays into a given encoder // and decode to the right values (see GH-36939) - for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - this->Plain(seed, /*rounds=*/5); + for (auto seed : {0, 1, 2, 3, 4}) { + this->Plain(seed, /*rounds=*/3); } + // Same, but without nulls + this->null_probability_ = 0.0; + for (auto seed : {0, 1, 2, 3, 4}) { + this->Plain(seed, /*rounds=*/3); + } +} + +TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutSliced) { + this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3); + // Same, but without nulls + this->null_probability_ = 0.0; + this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3); } TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { @@ -1646,39 +1696,6 @@ TEST_F(TestRleBooleanEncoding, AllNull) { /*null_probability*/ 1)); } -// Check that one can put several Arrow arrays into a given encoder -// and decode to the right values (see GH-36939) -TEST(TestPlainBooleanArrayEncoding, AdHocRoundTrip) { - std::vector> arrays{ - ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), - ::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"), - ::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"), - ::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"), - }; - - auto encoder = MakeTypedEncoder(Encoding::PLAIN, - /*use_dictionary=*/false); - for (const auto& array : arrays) { - encoder->Put(*array); - } - auto buffer = encoder->FlushValues(); - auto decoder = MakeTypedDecoder(Encoding::PLAIN); - EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); - decoder->SetData(static_cast(expected->length()), buffer->data(), - static_cast(buffer->size())); - - ::arrow::BooleanBuilder builder; - ASSERT_EQ(static_cast(expected->length() - expected->null_count()), - decoder->DecodeArrow(static_cast(expected->length()), - static_cast(expected->null_count()), - expected->null_bitmap_data(), 0, &builder)); - - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(builder.Finish(&result)); - ASSERT_EQ(expected->length(), result->length()); - ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); -} - // ---------------------------------------------------------------------- // DELTA_LENGTH_BYTE_ARRAY encode/decode tests.