Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-17211: [C++] Add hash_64 scalar compute function #39836

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ if(ARROW_COMPUTE)
compute/kernels/scalar_arithmetic.cc
compute/kernels/scalar_boolean.cc
compute/kernels/scalar_compare.cc
compute/kernels/scalar_hash.cc
compute/kernels/scalar_if_else.cc
compute/kernels/scalar_nested.cc
compute/kernels/scalar_random.cc
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/compute/api_scalar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,12 @@ Result<Datum> MapLookup(const Datum& arg, MapLookupOptions options, ExecContext*
return CallFunction("map_lookup", {arg}, &options, ctx);
}

// ----------------------------------------------------------------------
// Hash functions
Result<Datum> Hash64(const Datum& input_array, ExecContext* ctx) {
return CallFunction("hash_64", {input_array}, ctx);
}

// ----------------------------------------------------------------------

} // namespace compute
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/compute/api_scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -1718,5 +1718,21 @@ ARROW_EXPORT Result<Datum> NanosecondsBetween(const Datum& left, const Datum& ri
/// \note API not yet finalized
ARROW_EXPORT Result<Datum> MapLookup(const Datum& map, MapLookupOptions options,
ExecContext* ctx = NULLPTR);

/// \brief Construct a hash value for each row of the input.
///
/// The result is an Array of length equal to the length of the input; however, the output
/// shall be a UInt64Array, with each element being a hash constructed from each row of
/// the input. If the input Array is a NestedArray, this means that each "attribute" or
/// "field" of the input NestedArray corresponding to the same "row" will collectively
/// produce a single uint64_t hash. At the moment, this function does not take options,
/// though these may be added in the future.
///
/// \param[in] input_array input data to hash
/// \param[in] ctx function execution context, optional
/// \return elementwise hash values
ARROW_EXPORT
Result<Datum> Hash64(const Datum& input_array, ExecContext* ctx = NULLPTR);

} // namespace compute
} // namespace arrow
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ add_arrow_compute_test(scalar_utility_test
scalar_random_test.cc
scalar_set_lookup_test.cc
scalar_validity_test.cc
scalar_hash_test.cc
EXTRA_LINK_LIBS
arrow_compute_kernels_testing)

Expand All @@ -87,6 +88,7 @@ add_arrow_benchmark(scalar_round_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(scalar_set_lookup_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(scalar_string_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(scalar_temporal_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(scalar_hash_benchmark PREFIX "arrow-compute")

# ----------------------------------------------------------------------
# Vector kernels
Expand Down
138 changes: 138 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_hash.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/**
* @file scalar_hash.cc
* @brief Element-wise (scalar) kernels for hashing values.
*/

#include <algorithm>
//#include <iostream>

#include "arrow/array/array_base.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/compute/key_hash.h"
#include "arrow/compute/util.h"
#include "arrow/compute/kernels/common_internal.h"
#include "arrow/compute/light_array.h"
#include "arrow/result.h"

namespace arrow {
namespace compute {
namespace internal {

// Define symbols visible within `arrow::compute::internal` in this file;
// these symbols are not visible outside of this file.
namespace {

// Function documentation
const FunctionDoc hash_64_doc{
"Construct a hash for every element of the input argument",
("An element-wise function that uses an xxHash-like algorithm.\n"
"This function is not suitable for cryptographic purposes.\n"
"Hash results are 64-bit and emitted for each valid row.\n"
"Null (or invalid) rows emit a null in the output."),
{"hash_input"}};

// ------------------------------
// Kernel implementations
// It is expected that HashArrowType is either UInt32Type or UInt64Type (default)
template <typename HashArrowType = UInt64Type>
struct FastHashScalar {
using OutputCType = typename TypeTraits<HashArrowType>::CType;
using KeyColumnArrayVec = std::vector<KeyColumnArray>;

// Internal wrapper functions to resolve Hashing32 vs Hashing64 using parameter types
static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx,
uint32_t* hashes) {
Hashing32::HashMultiColumn(cols, ctx, hashes);
}

static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx,
uint64_t* hashes) {
Hashing64::HashMultiColumn(cols, ctx, hashes);
}

static Status Exec(KernelContext* ctx, const ExecSpan& input_arg, ExecResult* out) {
if (input_arg.num_values() != 1 || !input_arg[0].is_array()) {
return Status::Invalid("FastHash currently supports a single array input");
}
ArraySpan hash_input = input_arg[0].array;

auto exec_ctx = default_exec_context();
if (ctx && ctx->exec_context()) {
exec_ctx = ctx->exec_context();
}

// Initialize stack-based memory allocator used by Hashing32 and Hashing64
util::TempVectorStack stack_memallocator;
ARROW_RETURN_NOT_OK(
stack_memallocator.Init(exec_ctx->memory_pool(),
3 * sizeof(int32_t) * util::MiniBatch::kMiniBatchLength));

// Prepare context used by Hashing32 and Hashing64
LightContext hash_ctx;
hash_ctx.hardware_flags = exec_ctx->cpu_info()->hardware_flags();
hash_ctx.stack = &stack_memallocator;

// Construct vector<KeyColumnArray> from input ArraySpan; this essentially
// flattens the input array span, lifting nested Array buffers into a single level
ARROW_ASSIGN_OR_RAISE(KeyColumnArrayVec input_keycols,
ColumnArraysFromArraySpan(hash_input, hash_input.length));

// Call the hashing function, overloaded based on OutputCType
ArraySpan* result_span = out->array_span_mutable();
FastHashMultiColumn(input_keycols, &hash_ctx, result_span->GetValues<OutputCType>(1));

return Status::OK();
}
};

// ------------------------------
// Function construction and kernel registration
std::shared_ptr<ScalarFunction> RegisterKernelsFastHash64() {
// Create function instance
auto fn_hash_64 =
std::make_shared<ScalarFunction>("hash_64", Arity::Unary(), hash_64_doc);

// Associate kernel with function
for (auto& simple_inputtype : PrimitiveTypes()) {
DCHECK_OK(fn_hash_64->AddKernel({InputType(simple_inputtype)}, OutputType(uint64()),
FastHashScalar<UInt64Type>::Exec));
}

for (const auto nested_type :
{Type::STRUCT, Type::DENSE_UNION, Type::SPARSE_UNION, Type::LIST,
Type::FIXED_SIZE_LIST, Type::MAP, Type::DICTIONARY}) {
DCHECK_OK(fn_hash_64->AddKernel({InputType(nested_type)}, OutputType(uint64()),
FastHashScalar<UInt64Type>::Exec));
}

// Return function to be registered
return fn_hash_64;
}

} // namespace

void RegisterScalarHash(FunctionRegistry* registry) {
auto fn_scalarhash64 = RegisterKernelsFastHash64();
DCHECK_OK(registry->AddFunction(std::move(fn_scalarhash64)));
}

} // namespace internal
} // namespace compute
} // namespace arrow
189 changes: 189 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_hash_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <cstdint>
#include <limits>
#include <random>
#include <string>
#include <vector>

#include "benchmark/benchmark.h"

#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/hashing.h"

#include "arrow/array/array_nested.h"
#include "arrow/compute/exec.h"

namespace arrow {
namespace internal {

// ------------------------------
// Anonymous namespace with global params

namespace {
// copied from scalar_string_benchmark
constexpr auto kSeed = 0x94378165;
constexpr double null_prob = 0.2;

static random::RandomArrayGenerator hashing_rng(kSeed);
} // namespace

// ------------------------------
// Convenience functions

static Result<std::shared_ptr<StructArray>> MakeStructArray(int64_t n_values,
int32_t min_strlen,
int32_t max_strlen) {
auto vals_first = hashing_rng.Int64(n_values, 0, std::numeric_limits<int64_t>::max());
auto vals_second = hashing_rng.String(n_values, min_strlen, max_strlen, null_prob);
auto vals_third = hashing_rng.Int64(n_values, 0, std::numeric_limits<int64_t>::max());

return arrow::StructArray::Make(
arrow::ArrayVector{vals_first, vals_second, vals_third},
arrow::FieldVector{arrow::field("first", arrow::int64()),
arrow::field("second", arrow::utf8()),
arrow::field("third", arrow::int64())});
}

// ------------------------------
// Benchmark implementations

static void Hash64Int64(benchmark::State& state) { // NOLINT non-const reference
auto test_vals = hashing_rng.Int64(10000, 0, std::numeric_limits<int64_t>::max());

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {test_vals}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() * test_vals->length() * sizeof(int64_t));
state.SetItemsProcessed(state.iterations() * test_vals->length());
}

static void Hash64StructSmallStrings(
benchmark::State& state) { // NOLINT non-const reference
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array,
MakeStructArray(10000, 2, 20));

// 2nd column (index 1) is a string column, which has offset type of int32_t
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second,
values_array->GetFlattenedField(1));
std::shared_ptr<StringArray> str_vals =
std::static_pointer_cast<StringArray>(values_second);
int32_t total_string_size = str_vals->total_values_length();

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {values_array}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() *
((values_array->length() * sizeof(int64_t)) +
(total_string_size) +
(values_array->length() * sizeof(int64_t))));
state.SetItemsProcessed(state.iterations() * 3 * values_array->length());
}

static void Hash64StructMediumStrings(
benchmark::State& state) { // NOLINT non-const reference
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array,
MakeStructArray(10000, 20, 120));

// 2nd column (index 1) is a string column, which has offset type of int32_t
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second,
values_array->GetFlattenedField(1));
std::shared_ptr<StringArray> str_vals =
std::static_pointer_cast<StringArray>(values_second);
int32_t total_string_size = str_vals->total_values_length();

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {values_array}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() *
((values_array->length() * sizeof(int64_t)) +
(total_string_size) +
(values_array->length() * sizeof(int64_t))));
state.SetItemsProcessed(state.iterations() * 3 * values_array->length());
}

static void Hash64StructLargeStrings(
benchmark::State& state) { // NOLINT non-const reference
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array,
MakeStructArray(10000, 120, 2000));

// 2nd column (index 1) is a string column, which has offset type of int32_t
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second,
values_array->GetFlattenedField(1));
std::shared_ptr<StringArray> str_vals =
std::static_pointer_cast<StringArray>(values_second);
int32_t total_string_size = str_vals->total_values_length();

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {values_array}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() *
((values_array->length() * sizeof(int64_t)) +
(total_string_size) +
(values_array->length() * sizeof(int64_t))));
state.SetItemsProcessed(state.iterations() * 3 * values_array->length());
}

static void Hash64Map(benchmark::State& state) { // NOLINT non-const reference
constexpr int64_t test_size = 10000;
auto test_keys = hashing_rng.String(test_size, 2, 20, /*null_probability=*/0);
auto test_vals = hashing_rng.Int64(test_size, 0, std::numeric_limits<int64_t>::max());
auto test_keyvals = hashing_rng.Map(test_keys, test_vals, test_size);

auto key_arr = std::static_pointer_cast<StringArray>(test_keys);
int32_t total_key_size = key_arr->total_values_length();
int32_t total_val_size = test_size * sizeof(int64_t);

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {test_keyvals}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() * (total_key_size + total_val_size));
state.SetItemsProcessed(state.iterations() * 2 * test_size);
}

// ------------------------------
// Benchmark declarations

// Uses "FastHash" compute functions (wraps KeyHash functions)
BENCHMARK(Hash64Int64);

BENCHMARK(Hash64StructSmallStrings);
BENCHMARK(Hash64StructMediumStrings);
BENCHMARK(Hash64StructLargeStrings);

BENCHMARK(Hash64Map);

} // namespace internal
} // namespace arrow
Loading
Loading