Skip to content

Commit

Permalink
GH-17211: refresh history for scalar_hash kernel
Browse files Browse the repository at this point in the history
This commit ports the latest state of scalar_hash kernels without
pulling a long development history.

This kernel is an element-wise function that uses an xxHash-like
algorithm, prioritizing speed and not suitable for cryptographic
purposes. The function is implemented by the `FastHashScalar` struct
which is templated by the output type (which is assumed to be either
UInt32 or UInt64, but there is no validation of that at the moment).

The benchmarks in scalar_hash_benchmark.cc uses the hashing_benchmark.cc
file as a reference (in cpp/src/arrow/util/), but only covers various
input types and the key hashing functions (from key_hash.h).

The tests in scalar_hash_test.cc use a simplified version of hashing
based on what is implemented in the key_hash.cc. The idea being that the
high-level entry points for high-level types should eventually reach an
expected application of the low-level hash functions on simple data
types; the tests do this exact comparison. At the moment, the tests pass
for simple cases, but they do not work for nested types with non-trivial
row layouts (e.g. ListTypes).

Issue: ARROW-8991
Issue: GH-17211
  • Loading branch information
drin committed Jun 28, 2024
1 parent 62ee676 commit d54bc43
Show file tree
Hide file tree
Showing 3 changed files with 590 additions and 0 deletions.
138 changes: 138 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_hash.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/**
* @file scalar_hash.cc
* @brief Element-wise (scalar) kernels for hashing values.
*/

#include <algorithm>
//#include <iostream>

#include "arrow/array/array_base.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/compute/key_hash.h"
#include "arrow/compute/util.h"
#include "arrow/compute/kernels/common_internal.h"
#include "arrow/compute/light_array.h"
#include "arrow/result.h"

namespace arrow {
namespace compute {
namespace internal {

// Define symbols visible within `arrow::compute::internal` in this file;
// these symbols are not visible outside of this file.
namespace {

// Function documentation
const FunctionDoc hash_64_doc{
"Construct a hash for every element of the input argument",
("An element-wise function that uses an xxHash-like algorithm.\n"
"This function is not suitable for cryptographic purposes.\n"
"Hash results are 64-bit and emitted for each valid row.\n"
"Null (or invalid) rows emit a null in the output."),
{"hash_input"}};

// ------------------------------
// Kernel implementations
// It is expected that HashArrowType is either UInt32Type or UInt64Type (default)
template <typename HashArrowType = UInt64Type>
struct FastHashScalar {
using OutputCType = typename TypeTraits<HashArrowType>::CType;
using KeyColumnArrayVec = std::vector<KeyColumnArray>;

// Internal wrapper functions to resolve Hashing32 vs Hashing64 using parameter types
static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx,
uint32_t* hashes) {
Hashing32::HashMultiColumn(cols, ctx, hashes);
}

static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx,
uint64_t* hashes) {
Hashing64::HashMultiColumn(cols, ctx, hashes);
}

static Status Exec(KernelContext* ctx, const ExecSpan& input_arg, ExecResult* out) {
if (input_arg.num_values() != 1 || !input_arg[0].is_array()) {
return Status::Invalid("FastHash currently supports a single array input");
}
ArraySpan hash_input = input_arg[0].array;

auto exec_ctx = default_exec_context();
if (ctx && ctx->exec_context()) {
exec_ctx = ctx->exec_context();
}

// Initialize stack-based memory allocator used by Hashing32 and Hashing64
util::TempVectorStack stack_memallocator;
ARROW_RETURN_NOT_OK(
stack_memallocator.Init(exec_ctx->memory_pool(),
3 * sizeof(int32_t) * util::MiniBatch::kMiniBatchLength));

// Prepare context used by Hashing32 and Hashing64
LightContext hash_ctx;
hash_ctx.hardware_flags = exec_ctx->cpu_info()->hardware_flags();
hash_ctx.stack = &stack_memallocator;

// Construct vector<KeyColumnArray> from input ArraySpan; this essentially
// flattens the input array span, lifting nested Array buffers into a single level
ARROW_ASSIGN_OR_RAISE(KeyColumnArrayVec input_keycols,
ColumnArraysFromArraySpan(hash_input, hash_input.length));

// Call the hashing function, overloaded based on OutputCType
ArraySpan* result_span = out->array_span_mutable();
FastHashMultiColumn(input_keycols, &hash_ctx, result_span->GetValues<OutputCType>(1));

return Status::OK();
}
};

// ------------------------------
// Function construction and kernel registration
std::shared_ptr<ScalarFunction> RegisterKernelsFastHash64() {
// Create function instance
auto fn_hash_64 =
std::make_shared<ScalarFunction>("hash_64", Arity::Unary(), hash_64_doc);

// Associate kernel with function
for (auto& simple_inputtype : PrimitiveTypes()) {
DCHECK_OK(fn_hash_64->AddKernel({InputType(simple_inputtype)}, OutputType(uint64()),
FastHashScalar<UInt64Type>::Exec));
}

for (const auto nested_type :
{Type::STRUCT, Type::DENSE_UNION, Type::SPARSE_UNION, Type::LIST,
Type::FIXED_SIZE_LIST, Type::MAP, Type::DICTIONARY}) {
DCHECK_OK(fn_hash_64->AddKernel({InputType(nested_type)}, OutputType(uint64()),
FastHashScalar<UInt64Type>::Exec));
}

// Return function to be registered
return fn_hash_64;
}

} // namespace

void RegisterScalarHash(FunctionRegistry* registry) {
auto fn_scalarhash64 = RegisterKernelsFastHash64();
DCHECK_OK(registry->AddFunction(std::move(fn_scalarhash64)));
}

} // namespace internal
} // namespace compute
} // namespace arrow
189 changes: 189 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_hash_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <cstdint>
#include <limits>
#include <random>
#include <string>
#include <vector>

#include "benchmark/benchmark.h"

#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/hashing.h"

#include "arrow/array/array_nested.h"
#include "arrow/compute/exec.h"

namespace arrow {
namespace internal {

// ------------------------------
// Anonymous namespace with global params

namespace {
// copied from scalar_string_benchmark
constexpr auto kSeed = 0x94378165;
constexpr double null_prob = 0.2;

static random::RandomArrayGenerator hashing_rng(kSeed);
} // namespace

// ------------------------------
// Convenience functions

static Result<std::shared_ptr<StructArray>> MakeStructArray(int64_t n_values,
int32_t min_strlen,
int32_t max_strlen) {
auto vals_first = hashing_rng.Int64(n_values, 0, std::numeric_limits<int64_t>::max());
auto vals_second = hashing_rng.String(n_values, min_strlen, max_strlen, null_prob);
auto vals_third = hashing_rng.Int64(n_values, 0, std::numeric_limits<int64_t>::max());

return arrow::StructArray::Make(
arrow::ArrayVector{vals_first, vals_second, vals_third},
arrow::FieldVector{arrow::field("first", arrow::int64()),
arrow::field("second", arrow::utf8()),
arrow::field("third", arrow::int64())});
}

// ------------------------------
// Benchmark implementations

static void Hash64Int64(benchmark::State& state) { // NOLINT non-const reference
auto test_vals = hashing_rng.Int64(10000, 0, std::numeric_limits<int64_t>::max());

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {test_vals}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() * test_vals->length() * sizeof(int64_t));
state.SetItemsProcessed(state.iterations() * test_vals->length());
}

static void Hash64StructSmallStrings(
benchmark::State& state) { // NOLINT non-const reference
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array,
MakeStructArray(10000, 2, 20));

// 2nd column (index 1) is a string column, which has offset type of int32_t
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second,
values_array->GetFlattenedField(1));
std::shared_ptr<StringArray> str_vals =
std::static_pointer_cast<StringArray>(values_second);
int32_t total_string_size = str_vals->total_values_length();

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {values_array}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() *
((values_array->length() * sizeof(int64_t)) +
(total_string_size) +
(values_array->length() * sizeof(int64_t))));
state.SetItemsProcessed(state.iterations() * 3 * values_array->length());
}

static void Hash64StructMediumStrings(
benchmark::State& state) { // NOLINT non-const reference
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array,
MakeStructArray(10000, 20, 120));

// 2nd column (index 1) is a string column, which has offset type of int32_t
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second,
values_array->GetFlattenedField(1));
std::shared_ptr<StringArray> str_vals =
std::static_pointer_cast<StringArray>(values_second);
int32_t total_string_size = str_vals->total_values_length();

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {values_array}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() *
((values_array->length() * sizeof(int64_t)) +
(total_string_size) +
(values_array->length() * sizeof(int64_t))));
state.SetItemsProcessed(state.iterations() * 3 * values_array->length());
}

static void Hash64StructLargeStrings(
benchmark::State& state) { // NOLINT non-const reference
ASSERT_OK_AND_ASSIGN(std::shared_ptr<StructArray> values_array,
MakeStructArray(10000, 120, 2000));

// 2nd column (index 1) is a string column, which has offset type of int32_t
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> values_second,
values_array->GetFlattenedField(1));
std::shared_ptr<StringArray> str_vals =
std::static_pointer_cast<StringArray>(values_second);
int32_t total_string_size = str_vals->total_values_length();

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {values_array}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() *
((values_array->length() * sizeof(int64_t)) +
(total_string_size) +
(values_array->length() * sizeof(int64_t))));
state.SetItemsProcessed(state.iterations() * 3 * values_array->length());
}

static void Hash64Map(benchmark::State& state) { // NOLINT non-const reference
constexpr int64_t test_size = 10000;
auto test_keys = hashing_rng.String(test_size, 2, 20, /*null_probability=*/0);
auto test_vals = hashing_rng.Int64(test_size, 0, std::numeric_limits<int64_t>::max());
auto test_keyvals = hashing_rng.Map(test_keys, test_vals, test_size);

auto key_arr = std::static_pointer_cast<StringArray>(test_keys);
int32_t total_key_size = key_arr->total_values_length();
int32_t total_val_size = test_size * sizeof(int64_t);

while (state.KeepRunning()) {
ASSERT_OK_AND_ASSIGN(Datum hash_result,
compute::CallFunction("hash_64", {test_keyvals}));
benchmark::DoNotOptimize(hash_result);
}

state.SetBytesProcessed(state.iterations() * (total_key_size + total_val_size));
state.SetItemsProcessed(state.iterations() * 2 * test_size);
}

// ------------------------------
// Benchmark declarations

// Uses "FastHash" compute functions (wraps KeyHash functions)
BENCHMARK(Hash64Int64);

BENCHMARK(Hash64StructSmallStrings);
BENCHMARK(Hash64StructMediumStrings);
BENCHMARK(Hash64StructLargeStrings);

BENCHMARK(Hash64Map);

} // namespace internal
} // namespace arrow
Loading

0 comments on commit d54bc43

Please sign in to comment.