diff --git a/cpp/src/arrow/compute/key_hash_internal.h b/cpp/src/arrow/compute/key_hash_internal.h index 1f25beb0e1622..275f1b38dc2c8 100644 --- a/cpp/src/arrow/compute/key_hash_internal.h +++ b/cpp/src/arrow/compute/key_hash_internal.h @@ -166,6 +166,9 @@ class ARROW_EXPORT Hashing64 { friend void TestBloomLargeHashHelper(int64_t, int64_t, const std::vector&, int64_t, int, T*); friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool); + friend void TestHashVarLen(bool should_incr, uint32_t row_count, + const uint32_t* var_offsets, const uint8_t* var_data, + uint64_t* hash_results); public: static void HashMultiColumn(const std::vector& cols, LightContext* ctx, diff --git a/cpp/src/arrow/compute/light_array_internal.cc b/cpp/src/arrow/compute/light_array_internal.cc index 4f235925d0fb6..ff4b3e6305e62 100644 --- a/cpp/src/arrow/compute/light_array_internal.cc +++ b/cpp/src/arrow/compute/light_array_internal.cc @@ -48,16 +48,16 @@ KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* validity_buffer, uint8_t* fixed_length_buffer, uint8_t* var_length_buffer, int bit_offset_validity, - int bit_offset_fixed) { + int bit_offset_fixed, + const util::TempVectorStack* alloc) { metadata_ = metadata; - length_ = length; - buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer; - buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = - fixed_length_buffer; - buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = - var_length_buffer; - bit_offset_[kValidityBuffer] = bit_offset_validity; + length_ = length; + buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer; + buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = fixed_length_buffer; + buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = var_length_buffer; + bit_offset_[kValidityBuffer] = bit_offset_validity; bit_offset_[kFixedLengthBuffer] = bit_offset_fixed; + arena_alloc = alloc; } KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other, @@ -83,6 +83,8 @@ KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const { sliced.metadata_ = metadata_; sliced.length_ = length; uint32_t fixed_size = metadata_.fixed_length; + // TODO: see if this is necessary + // uint32_t fixed_size = !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length; sliced.buffers_[0] = buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr; @@ -114,42 +116,229 @@ KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const { return sliced; } -Result ColumnMetadataFromDataType( - const std::shared_ptr& type) { - const bool is_extension = type->id() == Type::EXTENSION; - const std::shared_ptr& typ = - is_extension - ? arrow::internal::checked_pointer_cast(type->GetSharedPtr()) - ->storage_type() - : type; - - if (typ->id() == Type::DICTIONARY) { +Result ColumnMetadataFromDataType(const std::shared_ptr& type) { + // "ptype" is the "physical" type + const DataType* ptype = type->GetSharedPtr().get(); + + // For ExtensionType, use the backing physical type (storage_type() is a shared ptr) + if (ARROW_PREDICT_FALSE(type->id() == Type::EXTENSION)) { + const ExtensionType* ext_type = static_cast(type); + ptype = ext_type->storage_type().get(); + } + + if (ptype->id() == Type::DICTIONARY) { auto bit_width = - arrow::internal::checked_cast(*typ).bit_width(); + arrow::internal::checked_cast(*ptype).bit_width(); ARROW_DCHECK(bit_width % 8 == 0); return KeyColumnMetadata(true, bit_width / 8); } - if (typ->id() == Type::BOOL) { + if (ptype->id() == Type::BOOL) { return KeyColumnMetadata(true, 0); } - if (is_fixed_width(typ->id())) { + if (is_fixed_width(ptype->id())) { return KeyColumnMetadata( - true, arrow::internal::checked_cast(*typ).bit_width() / 8); + true, + arrow::internal::checked_cast(*ptype).bit_width() / 8); } - if (is_binary_like(typ->id())) { + if (is_binary_like(ptype->id())) { return KeyColumnMetadata(false, sizeof(uint32_t)); } - if (is_large_binary_like(typ->id())) { + if (is_large_binary_like(ptype->id())) { return KeyColumnMetadata(false, sizeof(uint64_t)); } - if (typ->id() == Type::NA) { + if (ptype->id() == Type::NA) { return KeyColumnMetadata(true, 0, true); } // Caller attempted to create a KeyColumnArray from an invalid type - return Status::TypeError("Unsupported column data type ", typ->name(), + return Status::TypeError("Unsupported column data type ", ptype->name(), + " used with KeyColumnMetadata"); +} + +Result ColumnMetadataFromDataType( + const std::shared_ptr& type) { + return ColumnMetadataFromDataType(type.get()); +} + +/** + * Constructs metadata that tells hashing functions how to iterate over the + * KeyColumnArray. + * + * This function assumes ColumnMetadataFromDataType has already failed, which makes this + * function distinct because it should only be called when the input Array is flattened in + * a particular way. + */ +Result ColumnMetadataFromListType(const DataType* type) { + if (type->id() == Type::LIST || type->id() == Type::MAP) { + return KeyColumnMetadata(false, sizeof(uint32_t)); + } + else if (type->id() == Type::LARGE_LIST) { + return KeyColumnMetadata(false, sizeof(uint64_t)); + } + // Caller attempted to create a KeyColumnArray from an invalid type + return Status::TypeError("Unsupported column data type ", type->name(), " used with KeyColumnMetadata"); } +Result ColumnMetadataFromListType( + const std::shared_ptr& type) { + return ColumnMetadataFromListType(type.get()); +} + +/** + * Coalesces children of a StructArray into a flattened list of KeyColumnArrays. When + * hashing a StructArray, we want to co-index a list of KeyColumnArrays so that values in + * the same row are combined. + */ +Result ColumnArraysFromStructArray(const ArraySpan& array_span, + int64_t num_rows) { + KeyColumnVector flattened_spans; + flattened_spans.reserve(array_span.child_data.size()); + + // Recurse on each child of the ArraySpan in DFS-order + for (size_t child_ndx = 0; child_ndx < array_span.child_data.size(); ++child_ndx) { + auto child_span = array_span.child_data[child_ndx]; + ARROW_ASSIGN_OR_RAISE(auto child_keycols, + ColumnArraysFromArraySpan(child_span, num_rows)); + + flattened_spans.insert(flattened_spans.end(), child_keycols.begin(), + child_keycols.end()); + } + + return flattened_spans; +} + +/** + * Flattens the data in a ListArray into a list of KeyColumnArrays so that each element in + * the ListArray is properly treated as a row value. Due to semantics of nulls in nested + * arrays and their non-impact on a hash value, nulls in nested arrays are dropped when + * flattened. If a list is null, then that row is considered null and is preserved. + * + * The values buffer of a list type should be propagated to the caller as is, but the + * parent offsets and offsets of the current ArraySpan must be coalesced. Essentially, for + * the purposes of hashing, we don't care about internal structure of a row value so we + * flatten the offsets. + */ + +// TODO: recurse to: (1) flatten offsets, (2) eventually bottom out and grab var data +template +Result ColumnArraysFromListArray(const ArraySpan& array_span, + int64_t num_rows, + const OffsetType* parent_offsets) { + // Construct KeyColumnMetadata + ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, + ColumnMetadataFromListType(array_span.type)); + + ARROW_LOG(INFO) << "[ListArray] Child count: " + << std::to_string(array_span.child_data.size()); + + // ListArrays have only 1 child + auto child_span = array_span.child_data[0]; + uint8_t* buffer_validity = nullptr; + /* + * TODO: Currently unsupported. + * figure out how the validity bitmap affects flattened lists + *if (child_span.GetBuffer(0) != nullptr) { + * buffer_validity = (uint8_t*)child_span.GetBuffer(0)->data(); + *} + **/ + + // For simple lists or lists containing only list types, this should point to the child + // buffer with all of the values + uint8_t* buffer_varlength = nullptr; + if (child_span.num_buffers() > 2 && child_span.GetBuffer(2) != NULLPTR) { + ARROW_LOG(INFO) << "found list array data"; + buffer_varlength = (uint8_t*)child_span.GetBuffer(2)->data(); + } + else { + ARROW_LOG(INFO) << "child array does not have list array data"; + } + + // TODO + // Lists get flattened to 1 KeyColumnArray; Maps get flattened to 2 (key and value) + KeyColumnArray column_array = + KeyColumnArray(metadata, child_span.offset + num_rows, buffer_validity, + child_span.GetBuffer(1)->data(), buffer_varlength); + + return column_array; +} + + +Result ColumnArrayFromArraySpan(const ArraySpan& array_span, + int64_t num_rows) { + ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, + ColumnMetadataFromDataType(array_span.type)); + + uint8_t* buffer_validity = nullptr; + if (array_span.GetBuffer(0) != nullptr) { + buffer_validity = (uint8_t*)array_span.GetBuffer(0)->data(); + } + + uint8_t* buffer_varlength = nullptr; + if (array_span.num_buffers() > 2 && array_span.GetBuffer(2) != NULLPTR) { + buffer_varlength = (uint8_t*)array_span.GetBuffer(2)->data(); + } + + KeyColumnArray column_array = + KeyColumnArray(metadata, array_span.offset + num_rows, buffer_validity, + array_span.GetBuffer(1)->data(), buffer_varlength); + + return column_array.Slice(array_span.offset, num_rows); +} + +Result ColumnArraysFromArraySpan(const ArraySpan& array_span, + int64_t num_rows) { + KeyColumnVector flattened_spans; + flattened_spans.reserve(1 + array_span.child_data.size()); + + // Construct a KeyColumnArray from the given ArraySpan + auto keycol_result = ColumnArrayFromArraySpan(array_span, num_rows); + if (keycol_result.ok()) { + flattened_spans.push_back(*keycol_result); + } + + // If ArraySpan data type is not supported, check for supported nested types. + else if (is_nested(array_span.type->id())) { + switch (array_span.type->id()) { + case Type::LIST: + case Type::MAP: { + const uint32_t* list_offsets = (const uint32_t*) array_span.GetBuffer(1)->data(); + ARROW_ASSIGN_OR_RAISE(auto list_keycol, + ColumnArraysFromListArray(array_span, num_rows, + list_offsets)); + + flattened_spans.push_back(list_keycol); + break; + } + + case Type::LARGE_LIST: { + const uint64_t* list_offsets = (const uint64_t*) array_span.GetBuffer(1)->data(); + ARROW_ASSIGN_OR_RAISE(auto list_keycol, + ColumnArraysFromListArray(array_span, num_rows, + list_offsets)); + + flattened_spans.push_back(list_keycol); + break; + } + + case Type::STRUCT: { + ARROW_ASSIGN_OR_RAISE(auto struct_keycols, + ColumnArraysFromStructArray(array_span, num_rows)); + + flattened_spans.insert(flattened_spans.end(), struct_keycols.begin(), + struct_keycols.end()); + break; + } + + default: + // unsupported types include: unions, fixed size list + ARROW_WARN_NOT_OK(keycol_result.status(), "Unsupported nested type for hashing"); + break; + } + } + + return flattened_spans; +} + Result ColumnArrayFromArrayData( const std::shared_ptr& array_data, int64_t start_row, int64_t num_rows) { ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, @@ -161,12 +350,14 @@ KeyColumnArray ColumnArrayFromArrayDataAndMetadata( const std::shared_ptr& array_data, const KeyColumnMetadata& metadata, int64_t start_row, int64_t num_rows) { KeyColumnArray column_array = KeyColumnArray( - metadata, array_data->offset + start_row + num_rows, - array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr, - array_data->buffers[1]->data(), - (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR) + metadata + ,array_data->offset + start_row + num_rows + ,array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr + ,array_data->buffers[1]->data() + ,(array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR) ? array_data->buffers[2]->data() - : nullptr); + : nullptr + ); return column_array.Slice(array_data->offset + start_row, num_rows); } @@ -201,10 +392,12 @@ Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row, Status ColumnArraysFromExecBatch(const ExecBatch& batch, std::vector* column_arrays) { - return ColumnArraysFromExecBatch(batch, 0, static_cast(batch.length), - column_arrays); + return ColumnArraysFromExecBatch( + batch, 0, static_cast(batch.length), column_arrays + ); } + void ResizableArrayData::Init(const std::shared_ptr& data_type, MemoryPool* pool, int log_num_rows_min) { #ifndef NDEBUG diff --git a/cpp/src/arrow/compute/light_array_internal.h b/cpp/src/arrow/compute/light_array_internal.h index 995c4211998e0..832d7543213a8 100644 --- a/cpp/src/arrow/compute/light_array_internal.h +++ b/cpp/src/arrow/compute/light_array_internal.h @@ -85,6 +85,7 @@ class ARROW_EXPORT KeyColumnArray { public: /// \brief Create an uninitialized KeyColumnArray KeyColumnArray() = default; + /// \brief Create a read-only view from buffers /// /// This is a view only and does not take ownership of the buffers. The lifetime @@ -97,10 +98,14 @@ class ARROW_EXPORT KeyColumnArray { /// /// This is a view only and does not take ownership of the buffers. The lifetime /// of the buffers must exceed the lifetime of this view - KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, - uint8_t* validity_buffer, uint8_t* fixed_length_buffer, - uint8_t* var_length_buffer, int bit_offset_validity = 0, - int bit_offset_fixed = 0); + KeyColumnArray( const KeyColumnMetadata& metadata + ,int64_t length + ,uint8_t* validity_buffer + ,uint8_t* fixed_length_buffer + ,uint8_t* var_length_buffer + ,int bit_offset_validity = 0 + ,int bit_offset_fixed = 0 + ,const util::TempVectorStack *alloc = nullptr); /// \brief Create a sliced view of `this` /// /// The number of rows used in offset must be divisible by 8 @@ -184,6 +189,7 @@ class ARROW_EXPORT KeyColumnArray { // Starting bit offset within the first byte (between 0 and 7) // to be used when accessing buffers that store bit vectors. int bit_offset_[kMaxBuffers - 1]; + const util::TempVectorStack* arena_alloc; bool is_bool_type() const { return metadata_.is_fixed_length && metadata_.fixed_length == 0 && @@ -221,6 +227,15 @@ class ARROW_EXPORT KeyColumnArray { ARROW_EXPORT Result ColumnMetadataFromDataType( const std::shared_ptr& type); +ARROW_EXPORT Result ColumnMetadataFromDataType(const DataType* type); + +/// \brief Create KeyColumnArray from ArraySpan +/// +/// The caller should ensure this is only called on "key" columns. +/// \see ColumnMetadataFromDataType for details +ARROW_EXPORT Result ColumnArraysFromArraySpan( + const ArraySpan& array_span, int64_t num_rows); + /// \brief Create KeyColumnArray from ArrayData /// /// If `type` is a dictionary type then this will return the KeyColumnArray for @@ -270,6 +285,7 @@ ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t st ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, std::vector* column_arrays); + /// A lightweight resizable array for "key" columns /// /// Unlike KeyColumnArray this instance owns its buffers diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index cc02d489d138f..53d0762b4af08 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -287,6 +287,40 @@ TEST(KeyColumnArray, SliceBinaryTest) { GenericTestSlice(large_binary(), json_test_strings, testCases); } +TEST(KeyColumnArray, TempAllocForHashing) { + std::unique_ptr pool = MemoryPool::CreateDefault(); + + for (const auto& type : kSampleFixedDataTypes) { + ARROW_SCOPED_TRACE("Type: ", type->ToString()); + /* TODO + int row_count = 12; + int byte_width = + arrow::internal::checked_pointer_cast(type)->bit_width() / 8; + + { + KeyColumnPseudoSpan column_span {pool.get()}; + ARROW_ASSIGN_OR_RAISE(uint8_t* buf_data, + column_span.CreateBuffer(1, byte_width * row_count)); + + // TODO: create a test that uses a KeyColumnArray constructed from a + // KeyColumnPseudoSpan + KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean()).ValueOrDie(); + + KeyColumnArray column_view { + column_span.CreateView( + + int min_bytes_needed_for_values = byte_width; + int min_bytes_needed_for_validity = 1; + int min_bytes_needed = min_bytes_needed_for_values + min_bytes_needed_for_validity; + ASSERT_LT(min_bytes_needed, pool->bytes_allocated()); + ASSERT_GT(min_bytes_needed * 2, pool->bytes_allocated()); + } + */ + // After array is destroyed buffers should be freed + ASSERT_EQ(0, pool->bytes_allocated()); + } +} + TEST(ResizableArrayData, Basic) { std::unique_ptr pool = MemoryPool::CreateDefault(); for (const auto& type : kSampleFixedDataTypes) {