From 844aa5ea060c912fd90240d534e18e3de5c617cc Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 3 May 2024 20:44:21 -0300 Subject: [PATCH 01/20] Take: Add VectorKernel::ChunkedExec to SelectionKernelData --- .../arrow/compute/kernels/vector_selection_internal.cc | 1 + .../arrow/compute/kernels/vector_selection_internal.h | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc index 7189d42850e79..f5685ffa4139e 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc @@ -60,6 +60,7 @@ void RegisterSelectionFunction(const std::string& name, FunctionDoc doc, {std::move(kernel_data.value_type), std::move(kernel_data.selection_type)}, OutputType(FirstType)); base_kernel.exec = kernel_data.exec; + base_kernel.exec_chunked = kernel_data.chunked_exec; DCHECK_OK(func->AddKernel(base_kernel)); } kernels.clear(); diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.h b/cpp/src/arrow/compute/kernels/vector_selection_internal.h index 887bf08354120..558423733ca2b 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_internal.h +++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/array/data.h" @@ -34,9 +35,18 @@ using FilterState = OptionsWrapper; using TakeState = OptionsWrapper; struct SelectionKernelData { + SelectionKernelData(InputType value_type, InputType selection_type, + ArrayKernelExec exec, + VectorKernel::ChunkedExec chunked_exec = NULLPTR) + : value_type(std::move(value_type)), + selection_type(std::move(selection_type)), + exec(exec), + chunked_exec(chunked_exec) {} + InputType value_type; InputType selection_type; ArrayKernelExec exec; + VectorKernel::ChunkedExec chunked_exec; }; void RegisterSelectionFunction(const std::string& name, FunctionDoc doc, From 9797c9f30e89272f84bd55c10990510953ab4d2b Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 3 May 2024 22:02:11 -0300 Subject: [PATCH 02/20] Take: VectorKernel::output_chunked should be false for "array_take" We will ensure "array_take" returns a ChunkedArray if at least one input is chunked, just like "take" does. Even when the output fits in a single chunk. --- cpp/src/arrow/compute/kernels/vector_selection.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index b265673e23c86..b047763098c0d 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -293,6 +293,8 @@ std::shared_ptr MakeIndicesNonZeroFunction(std::string name, VectorKernel kernel; kernel.null_handling = NullHandling::OUTPUT_NOT_NULL; kernel.mem_allocation = MemAllocation::NO_PREALLOCATE; + // "array_take" ensures that the output will be be chunked when at least one + // input is chunked, so we need to set this to false. kernel.output_chunked = false; kernel.exec = IndicesNonZeroExec; kernel.exec_chunked = IndicesNonZeroExecChunked; @@ -338,6 +340,7 @@ void RegisterVectorSelection(FunctionRegistry* registry) { VectorKernel take_base; take_base.init = TakeState::Init; take_base.can_execute_chunkwise = false; + take_base.output_chunked = false; RegisterSelectionFunction("array_take", array_take_doc, take_base, std::move(take_kernels), GetDefaultTakeOptions(), registry); From a135471d25680b4a09a2290ba73ee54081d0a12c Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 3 May 2024 22:17:20 -0300 Subject: [PATCH 03/20] Take: Make "array_take" handle CA->C cases by populating VectorKernel::exec_chunked Before this commit, only the "take" meta function could handle CA parameters. --- .../kernels/vector_selection_take_internal.cc | 175 +++++++++++++++--- 1 file changed, 148 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index c45cc552a2cc5..a770401fd34c2 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -579,20 +579,13 @@ class TakeMetaFunction : public MetaFunction { return result.array(); } - static Result> TakeCAA( - const std::shared_ptr& values, const Array& indices, - const TakeOptions& options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(auto values_array, - ChunkedArrayAsArray(values, ctx->memory_pool())); - std::vector args = {std::move(values_array), indices}; - return TakeAAA(args, options, ctx); - } - static Result> TakeCAC( const std::shared_ptr& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(auto new_chunk, TakeCAA(values, indices, options, ctx)); - return std::make_shared(MakeArray(std::move(new_chunk))); + // "array_take" can handle CA->C cases directly + // (via their VectorKernel::exec_chunked) + ARROW_ASSIGN_OR_RAISE(auto result, CallArrayTake({values, indices}, options, ctx)); + return result.chunked_array(); } static Result> TakeCCC( @@ -721,6 +714,115 @@ class TakeMetaFunction : public MetaFunction { // ---------------------------------------------------------------------- +/// \brief Prepare the output array like ExecuteArrayKernel::PrepareOutput() +std::shared_ptr PrepareOutput(const ExecBatch& batch, int64_t length) { + DCHECK_EQ(batch.length, length); + auto out = std::make_shared(batch.values[0].type(), length); + out->buffers.resize(batch.values[0].type()->layout().buffers.size()); + return out; +} + +Status CallAAAKernel(ArrayKernelExec take_aaa_exec, KernelContext* ctx, + std::shared_ptr values, + std::shared_ptr indices, Datum* out) { + int64_t batch_length = values->length; + std::vector args = {std::move(values), std::move(indices)}; + ExecBatch array_array_batch(std::move(args), batch_length); + DCHECK_EQ(out->kind(), Datum::ARRAY); + ExecSpan exec_span{array_array_batch}; + ExecResult result; + result.value = out->array(); + return take_aaa_exec(ctx, exec_span, &result); +} + +/// \brief Generic VectorKernel::exec_chunked for CA->A cases. +/// +/// This function concatenates the chunks of values and then calls the +/// AA->A take kernel. +/// +/// \param take_aaa_exec The AA->A take kernel to use. +Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx, + const ExecBatch& batch, Datum* out) { + auto& args = batch.values; + if (args[0].kind() == Datum::CHUNKED_ARRAY && args[1].kind() == Datum::ARRAY) { + auto& values = args[0].chunked_array(); + auto& indices = args[1].array(); + ARROW_ASSIGN_OR_RAISE(auto values_array, TakeMetaFunction::ChunkedArrayAsArray( + values, ctx->memory_pool())); + DCHECK_EQ(values_array->length(), batch.length); + Datum result = PrepareOutput(batch, batch.length); + RETURN_NOT_OK( + CallAAAKernel(take_aaa_exec, ctx, values_array->data(), indices, &result)); + out->value = std::make_shared(MakeArray(result.array())); + return Status::OK(); + } + return Status::NotImplemented( + "Unsupported kinds for 'array_take', try using 'take': " + "values=", + args[0].ToString(), "indices=", args[1].ToString()); +} + +template +struct GenericTakeChunkedExecFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + return GenericTakeChunkedExec(kTakeAAAExec, ctx, batch, out); + } + + // XXX: to be removed + static Status ExecNonChunked(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + Datum result = + std::make_shared(MakeArray(PrepareOutput(batch, batch.length))); + RETURN_NOT_OK(Exec(ctx, batch, &result)); + DCHECK_EQ(result.chunked_array()->num_chunks(), 1); + out->value = result.chunked_array()->chunk(0)->data(); + return Status::OK(); + } +}; + +Status SpecialTakeChunkedExec(const ArrayKernelExec take_aaa_exec, + VectorKernel::ChunkedExec take_caa_exec, KernelContext* ctx, + const ExecBatch& batch, Datum* out) { + Datum result = PrepareOutput(batch, batch.length); + auto* pool = ctx->memory_pool(); + auto& args = batch.values; + if (args[0].kind() == Datum::CHUNKED_ARRAY && args[1].kind() == Datum::ARRAY) { + auto& values = args[0].chunked_array(); + auto& indices = args[1].array(); + std::shared_ptr single_chunk = nullptr; + if (values->num_chunks() == 0 || values->length() == 0) { + ARROW_ASSIGN_OR_RAISE(single_chunk, + MakeArrayOfNull(values->type(), /*length=*/0, pool)); + } else if (values->num_chunks() == 1) { + single_chunk = values->chunk(0); + } + if (single_chunk) { + DCHECK_EQ(single_chunk->length(), batch.length); + // If the ChunkedArray was cheaply converted to a single chunk, + // we can use the AA->A take kernel directly. + RETURN_NOT_OK( + CallAAAKernel(take_aaa_exec, ctx, single_chunk->data(), indices, &result)); + } + // Instead of concatenating the chunks, we call the CA->A take kernel + // which has a more efficient implementation for this case. At this point, + // that implementation doesn't have to care about empty or single-chunk + // ChunkedArrays. + RETURN_NOT_OK(take_caa_exec(ctx, batch, &result)); + out->value = std::make_shared(MakeArray(result.array())); + return Status::OK(); + } + return Status::NotImplemented( + "Unsupported kinds for 'array_take', try using 'take': " + "values=", + args[0].ToString(), "indices=", args[1].ToString()); +} + +template +struct SpecialTakeChunkedExecFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + return SpecialTakeChunkedExec(kTakeAAAExec, kTakeCAAExec, ctx, batch, out); + } +}; + } // namespace const TakeOptions* GetDefaultTakeOptions() { @@ -736,22 +838,41 @@ void PopulateTakeKernels(std::vector* out) { auto take_indices = match::Integer(); *out = { - {InputType(match::Primitive()), take_indices, FixedWidthTakeExec}, - {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec}, - {InputType(match::LargeBinaryLike()), take_indices, LargeVarBinaryTakeExec}, - {InputType(match::FixedSizeBinaryLike()), take_indices, FixedWidthTakeExec}, - {InputType(null()), take_indices, NullTakeExec}, - {InputType(Type::DICTIONARY), take_indices, DictionaryTake}, - {InputType(Type::EXTENSION), take_indices, ExtensionTake}, - {InputType(Type::LIST), take_indices, ListTakeExec}, - {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec}, - {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec}, - {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec}, - {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec}, - {InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec}, - {InputType(Type::SPARSE_UNION), take_indices, SparseUnionTakeExec}, - {InputType(Type::STRUCT), take_indices, StructTakeExec}, - {InputType(Type::MAP), take_indices, MapTakeExec}, + {InputType(match::Primitive()), take_indices, FixedWidthTakeExec, + // XXX: doing this for testing SpecialTakeChunkedExec + SpecialTakeChunkedExecFunctor< + FixedWidthTakeExec, + GenericTakeChunkedExecFunctor::ExecNonChunked>::Exec}, + {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(match::LargeBinaryLike()), take_indices, LargeVarBinaryTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(match::FixedSizeBinaryLike()), take_indices, FixedWidthTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(null()), take_indices, NullTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::DICTIONARY), take_indices, DictionaryTake, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::EXTENSION), take_indices, ExtensionTake, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::LIST), take_indices, ListTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::SPARSE_UNION), take_indices, SparseUnionTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::STRUCT), take_indices, StructTakeExec, + GenericTakeChunkedExecFunctor::Exec}, + {InputType(Type::MAP), take_indices, MapTakeExec, + GenericTakeChunkedExecFunctor::Exec}, }; } From 36b69af8c672e9569849593f42e3ef70679d24dc Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Apr 2024 22:49:45 -0300 Subject: [PATCH 04/20] gather_internal.h: Introduce GatherFromChunks --- .../arrow/compute/kernels/gather_internal.h | 165 +++++++++++++++++- 1 file changed, 158 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/gather_internal.h b/cpp/src/arrow/compute/kernels/gather_internal.h index 4c161533a7277..102e3db34d581 100644 --- a/cpp/src/arrow/compute/kernels/gather_internal.h +++ b/cpp/src/arrow/compute/kernels/gather_internal.h @@ -20,8 +20,14 @@ #include #include #include +#include +#include "arrow/array/array_base.h" #include "arrow/array/data.h" +#include "arrow/chunk_resolver.h" +#include "arrow/chunked_array.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" #include "arrow/util/bit_block_counter.h" #include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_util.h" @@ -52,6 +58,13 @@ class GatherBaseCRTP { ARROW_DEFAULT_MOVE_AND_ASSIGN(GatherBaseCRTP); protected: + template + bool IsSrcValid(const ArraySpan& src_validity, const IndexCType* idx, + int64_t position) const { + ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr); + return src_validity.IsValid(idx[position]); + } + ARROW_FORCE_INLINE int64_t ExecuteNoNulls(int64_t idx_length) { auto* self = static_cast(this); for (int64_t position = 0; position < idx_length; position++) { @@ -76,8 +89,9 @@ class GatherBaseCRTP { // doesn't have to be called for resulting null positions. A position is // considered null if either the index or the source value is null at that // position. - template - ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ArraySpan& src_validity, + template + ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ValiditySpan& src_validity, int64_t idx_length, const IndexCType* idx, const ArraySpan& idx_validity, uint8_t* out_is_valid) { @@ -116,12 +130,11 @@ class GatherBaseCRTP { position += block.length; } } else { - // Source values may be null, so we must do random access into src_validity + // Source values may be null, so we must do random access with IsSrcValid() if (block.popcount == block.length) { // Faster path: indices are not null but source values may be for (int64_t i = 0; i < block.length; ++i) { - ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr); - if (src_validity.IsValid(idx[position])) { + if (self->IsSrcValid(src_validity, idx, position)) { // value is not null self->WriteValue(position); bit_util::SetBit(out_is_valid, position); @@ -136,9 +149,9 @@ class GatherBaseCRTP { // random access in general we have to check the value nullness one by // one. for (int64_t i = 0; i < block.length; ++i) { - ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr); ARROW_COMPILER_ASSUME(idx_validity.buffers[0].data != nullptr); - if (idx_validity.IsValid(position) && src_validity.IsValid(idx[position])) { + if (idx_validity.IsValid(position) && + self->IsSrcValid(src_validity, idx, position)) { // index is not null && value is not null self->WriteValue(position); bit_util::SetBit(out_is_valid, position); @@ -303,4 +316,142 @@ class Gather } }; +template +struct ChunkedValiditySpan { + const ChunkedArray& chunks_validity; + const IndexCType* chunk_index_vec; + const IndexCType* index_in_chunk_vec; + const bool may_have_nulls; + + ChunkedValiditySpan(const ChunkedArray& chunks_validity, + const IndexCType* chunk_index_vec, + const IndexCType* index_in_chunk_vec) + : chunks_validity(chunks_validity), + chunk_index_vec(chunk_index_vec), + index_in_chunk_vec(index_in_chunk_vec), + may_have_nulls(chunks_validity.null_count() > 0) {} + + bool MayHaveNulls() const { return may_have_nulls; } + + bool IsValid(int64_t position) const { + auto chunk_index = chunk_index_vec[position]; + auto index_in_chunk = index_in_chunk_vec[position]; + return chunks_validity.chunk(static_cast(chunk_index))->IsValid(index_in_chunk); + } +}; + +template +class GatherFromChunks + : public GatherBaseCRTP< + GatherFromChunks> { + private: + static_assert(!kWithFactor || kValueWidthInBits == 8, + "kWithFactor is only supported for kValueWidthInBits == 8"); + static_assert(kValueWidthInBits == 1 || kValueWidthInBits % 8 == 0); + // kValueWidth should not be used if kValueWidthInBits == 1. + static constexpr int kValueWidth = kValueWidthInBits / 8; + + // src_residual_bit_offsets_[i] is used to store the bit offset of the first byte (0-7) + // in src_chunks_[i] iff kValueWidthInBits == 1. + const int* src_residual_bit_offsets_ = NULLPTR; + // Pre-computed pointers to the start of the values in each chunk. + const uint8_t* const* src_chunks_; + // Number indices resolved in chunk_index_vec_/index_in_chunk_vec_ + const int64_t idx_length_; + const IndexCType* chunk_index_vec_; + const IndexCType* index_in_chunk_vec_; + + uint8_t* out_; + int64_t factor_; + + public: + void WriteValue(int64_t position) { + auto chunk_index = chunk_index_vec_[position]; + auto index_in_chunk = index_in_chunk_vec_[position]; + auto* chunk = src_chunks_[chunk_index]; + if constexpr (kValueWidthInBits == 1) { + auto src_offset = src_residual_bit_offsets_[chunk_index]; + bit_util::SetBitTo(out_, position, + bit_util::GetBit(chunk, src_offset + index_in_chunk)); + } else if constexpr (kWithFactor) { + const int64_t scaled_factor = kValueWidth * factor_; + memcpy(out_ + position * scaled_factor, chunk + index_in_chunk * scaled_factor, + scaled_factor); + } else { + memcpy(out_ + position * kValueWidth, chunk + index_in_chunk * kValueWidth, + kValueWidth); + } + } + + void WriteZero(int64_t position) { + if constexpr (kValueWidthInBits == 1) { + bit_util::ClearBit(out_, position); + } else if constexpr (kWithFactor) { + const int64_t scaled_factor = kValueWidth * factor_; + memset(out_ + position * scaled_factor, 0, scaled_factor); + } else { + memset(out_ + position * kValueWidth, 0, kValueWidth); + } + } + + void WriteZeroSegment(int64_t position, int64_t block_length) { + if constexpr (kValueWidthInBits == 1) { + bit_util::SetBitsTo(out_, position, block_length, false); + } else if constexpr (kWithFactor) { + const int64_t scaled_factor = kValueWidth * factor_; + memset(out_ + position * scaled_factor, 0, block_length * scaled_factor); + } else { + memset(out_ + position * kValueWidth, 0, block_length * kValueWidth); + } + } + + bool IsSrcValid(const ChunkedValiditySpan& src_validity, + const IndexCType* idx, int64_t position) const { + return src_validity.IsValid(position); + } + + public: + GatherFromChunks(const int* src_residual_bit_offsets, const uint8_t* const* src_chunks, + const int64_t idx_length, const IndexCType* chunk_index_vec, + const IndexCType* index_in_chunk_vec, uint8_t* out, int64_t factor = 1) + : src_residual_bit_offsets_(src_residual_bit_offsets), + src_chunks_(src_chunks), + idx_length_(idx_length), + chunk_index_vec_(chunk_index_vec), + index_in_chunk_vec_(index_in_chunk_vec), + out_(out), + factor_(factor) { + assert(src_chunks && chunk_index_vec && index_in_chunk_vec && out); + if constexpr (kValueWidthInBits == 1) { + assert(src_residual_bit_offsets); + } + assert((kWithFactor || factor == 1) && + "When kWithFactor is false, the factor is assumed to be 1 at compile time"); + } + + ARROW_FORCE_INLINE int64_t Execute() { return this->ExecuteNoNulls(idx_length_); } + + /// \pre If kOutputIsZeroInitialized, then this->out_ has to be zero initialized. + /// \pre Bits in out_is_valid have to always be zero initialized. + /// \post The bits for the valid elements (and only those) are set in out_is_valid. + /// \post If !kOutputIsZeroInitialized, then positions in this->_out containing null + /// elements have 0s written to them. This might be less efficient than + /// zero-initializing first and calling this->Execute() afterwards. + /// \return The number of valid elements in out. + template + ARROW_FORCE_INLINE int64_t Execute(const ChunkedArray& src_validity, + const ArraySpan& idx_validity, + uint8_t* out_is_valid) { + assert(idx_length_ == idx_validity.length); + assert(out_is_valid); + assert(idx_validity.type->byte_width() == sizeof(IndexCType)); + ChunkedValiditySpan src_validity_span{src_validity, chunk_index_vec_, + index_in_chunk_vec_}; + assert(src_validity_span.MayHaveNulls() || idx_validity.MayHaveNulls()); + // idx=NULLPTR because when it's passed to IsSrcValid() defined above, it's not used. + return this->template ExecuteWithNulls( + src_validity_span, idx_length_, /*idx=*/NULLPTR, idx_validity, out_is_valid); + } +}; + } // namespace arrow::internal From d4b48a38359d2ad8052bcff10b268035e56c7174 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Sun, 5 May 2024 15:54:59 -0300 Subject: [PATCH 05/20] Take: Introduce ValueSpan to delay dispatching on chunked-ness --- .../kernels/vector_selection_take_internal.cc | 87 ++++++++++++++----- 1 file changed, 67 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index a770401fd34c2..7f9ce299044c9 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -326,6 +326,43 @@ namespace { using TakeState = OptionsWrapper; +class ValuesSpan { + private: + const std::shared_ptr chunked_ = nullptr; + const ArraySpan chunk0_; // first chunk or the whole array + + public: + explicit ValuesSpan(const std::shared_ptr values) + : chunked_(std::move(values)), chunk0_{*values->chunk(0)->data()} { + DCHECK(chunked_); + DCHECK_GT(chunked_->num_chunks(), 0); + } + + explicit ValuesSpan(const ArraySpan& values) : chunk0_(values) {} + + bool is_chunked() const { return chunked_ != nullptr; } + + const ChunkedArray& chunked_array() const { + DCHECK(is_chunked()); + return *chunked_; + } + + const ArraySpan& chunk0() const { return chunk0_; } + + const ArraySpan& array() const { + DCHECK(!is_chunked()); + return chunk0_; + } + + const DataType* type() const { return chunk0_.type; } + + int64_t length() const { return is_chunked() ? chunked_->length() : array().length; } + + bool MayHaveNulls() const { + return is_chunked() ? chunked_->null_count() != 0 : array().MayHaveNulls(); + } +}; + // ---------------------------------------------------------------------- // Implement optimized take for primitive types from boolean to // 1/2/4/8/16/32-byte C-type based types and fixed-size binary (0 or more @@ -357,15 +394,22 @@ template 0 && kValueWidthInBits == 8 && // factors are used with bytes static_cast(factor * kValueWidthInBits) == bit_width)); #endif + // XXX: support values.is_chunked() case + assert(!values.is_chunked()); + return Exec(ctx, values.array(), indices, out_arr, factor); + } + + static Status Exec(KernelContext* ctx, const ArraySpan& values, + const ArraySpan& indices, ArrayData* out_arr, int64_t factor) { const bool out_has_validity = values.MayHaveNulls() || indices.MayHaveNulls(); const uint8_t* src; @@ -398,7 +442,7 @@ struct FixedWidthTakeImpl { }; template