diff --git a/cpp/src/arrow/compute/kernels/scalar_hash.cc b/cpp/src/arrow/compute/kernels/scalar_hash.cc new file mode 100644 index 0000000000000..b76349cd7681b --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_hash.cc @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * @file scalar_hash.cc + * @brief Element-wise (scalar) kernels for hashing values. + */ + +#include +//#include + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/compute/key_hash.h" +#include "arrow/compute/util.h" +#include "arrow/compute/kernels/common_internal.h" +#include "arrow/compute/light_array.h" +#include "arrow/result.h" + +namespace arrow { +namespace compute { +namespace internal { + +// Define symbols visible within `arrow::compute::internal` in this file; +// these symbols are not visible outside of this file. +namespace { + +// Function documentation +const FunctionDoc hash_64_doc{ + "Construct a hash for every element of the input argument", + ("An element-wise function that uses an xxHash-like algorithm.\n" + "This function is not suitable for cryptographic purposes.\n" + "Hash results are 64-bit and emitted for each valid row.\n" + "Null (or invalid) rows emit a null in the output."), + {"hash_input"}}; + +// ------------------------------ +// Kernel implementations +// It is expected that HashArrowType is either UInt32Type or UInt64Type (default) +template +struct FastHashScalar { + using OutputCType = typename TypeTraits::CType; + using KeyColumnArrayVec = std::vector; + + // Internal wrapper functions to resolve Hashing32 vs Hashing64 using parameter types + static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx, + uint32_t* hashes) { + Hashing32::HashMultiColumn(cols, ctx, hashes); + } + + static void FastHashMultiColumn(KeyColumnArrayVec& cols, LightContext* ctx, + uint64_t* hashes) { + Hashing64::HashMultiColumn(cols, ctx, hashes); + } + + static Status Exec(KernelContext* ctx, const ExecSpan& input_arg, ExecResult* out) { + if (input_arg.num_values() != 1 || !input_arg[0].is_array()) { + return Status::Invalid("FastHash currently supports a single array input"); + } + ArraySpan hash_input = input_arg[0].array; + + auto exec_ctx = default_exec_context(); + if (ctx && ctx->exec_context()) { + exec_ctx = ctx->exec_context(); + } + + // Initialize stack-based memory allocator used by Hashing32 and Hashing64 + util::TempVectorStack stack_memallocator; + ARROW_RETURN_NOT_OK( + stack_memallocator.Init(exec_ctx->memory_pool(), + 3 * sizeof(int32_t) * util::MiniBatch::kMiniBatchLength)); + + // Prepare context used by Hashing32 and Hashing64 + LightContext hash_ctx; + hash_ctx.hardware_flags = exec_ctx->cpu_info()->hardware_flags(); + hash_ctx.stack = &stack_memallocator; + + // Construct vector from input ArraySpan; this essentially + // flattens the input array span, lifting nested Array buffers into a single level + ARROW_ASSIGN_OR_RAISE(KeyColumnArrayVec input_keycols, + ColumnArraysFromArraySpan(hash_input, hash_input.length)); + + // Call the hashing function, overloaded based on OutputCType + ArraySpan* result_span = out->array_span_mutable(); + FastHashMultiColumn(input_keycols, &hash_ctx, result_span->GetValues(1)); + + return Status::OK(); + } +}; + +// ------------------------------ +// Function construction and kernel registration +std::shared_ptr RegisterKernelsFastHash64() { + // Create function instance + auto fn_hash_64 = + std::make_shared("hash_64", Arity::Unary(), hash_64_doc); + + // Associate kernel with function + for (auto& simple_inputtype : PrimitiveTypes()) { + DCHECK_OK(fn_hash_64->AddKernel({InputType(simple_inputtype)}, OutputType(uint64()), + FastHashScalar::Exec)); + } + + for (const auto nested_type : + {Type::STRUCT, Type::DENSE_UNION, Type::SPARSE_UNION, Type::LIST, + Type::FIXED_SIZE_LIST, Type::MAP, Type::DICTIONARY}) { + DCHECK_OK(fn_hash_64->AddKernel({InputType(nested_type)}, OutputType(uint64()), + FastHashScalar::Exec)); + } + + // Return function to be registered + return fn_hash_64; +} + +} // namespace + +void RegisterScalarHash(FunctionRegistry* registry) { + auto fn_scalarhash64 = RegisterKernelsFastHash64(); + DCHECK_OK(registry->AddFunction(std::move(fn_scalarhash64))); +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_hash_benchmark.cc b/cpp/src/arrow/compute/kernels/scalar_hash_benchmark.cc new file mode 100644 index 0000000000000..2a8a7f1f518f7 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_hash_benchmark.cc @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "benchmark/benchmark.h" + +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/hashing.h" + +#include "arrow/array/array_nested.h" +#include "arrow/compute/exec.h" + +namespace arrow { +namespace internal { + +// ------------------------------ +// Anonymous namespace with global params + +namespace { +// copied from scalar_string_benchmark +constexpr auto kSeed = 0x94378165; +constexpr double null_prob = 0.2; + +static random::RandomArrayGenerator hashing_rng(kSeed); +} // namespace + +// ------------------------------ +// Convenience functions + +static Result> MakeStructArray(int64_t n_values, + int32_t min_strlen, + int32_t max_strlen) { + auto vals_first = hashing_rng.Int64(n_values, 0, std::numeric_limits::max()); + auto vals_second = hashing_rng.String(n_values, min_strlen, max_strlen, null_prob); + auto vals_third = hashing_rng.Int64(n_values, 0, std::numeric_limits::max()); + + return arrow::StructArray::Make( + arrow::ArrayVector{vals_first, vals_second, vals_third}, + arrow::FieldVector{arrow::field("first", arrow::int64()), + arrow::field("second", arrow::utf8()), + arrow::field("third", arrow::int64())}); +} + +// ------------------------------ +// Benchmark implementations + +static void Hash64Int64(benchmark::State& state) { // NOLINT non-const reference + auto test_vals = hashing_rng.Int64(10000, 0, std::numeric_limits::max()); + + while (state.KeepRunning()) { + ASSERT_OK_AND_ASSIGN(Datum hash_result, + compute::CallFunction("hash_64", {test_vals})); + benchmark::DoNotOptimize(hash_result); + } + + state.SetBytesProcessed(state.iterations() * test_vals->length() * sizeof(int64_t)); + state.SetItemsProcessed(state.iterations() * test_vals->length()); +} + +static void Hash64StructSmallStrings( + benchmark::State& state) { // NOLINT non-const reference + ASSERT_OK_AND_ASSIGN(std::shared_ptr values_array, + MakeStructArray(10000, 2, 20)); + + // 2nd column (index 1) is a string column, which has offset type of int32_t + ASSERT_OK_AND_ASSIGN(std::shared_ptr values_second, + values_array->GetFlattenedField(1)); + std::shared_ptr str_vals = + std::static_pointer_cast(values_second); + int32_t total_string_size = str_vals->total_values_length(); + + while (state.KeepRunning()) { + ASSERT_OK_AND_ASSIGN(Datum hash_result, + compute::CallFunction("hash_64", {values_array})); + benchmark::DoNotOptimize(hash_result); + } + + state.SetBytesProcessed(state.iterations() * + ((values_array->length() * sizeof(int64_t)) + + (total_string_size) + + (values_array->length() * sizeof(int64_t)))); + state.SetItemsProcessed(state.iterations() * 3 * values_array->length()); +} + +static void Hash64StructMediumStrings( + benchmark::State& state) { // NOLINT non-const reference + ASSERT_OK_AND_ASSIGN(std::shared_ptr values_array, + MakeStructArray(10000, 20, 120)); + + // 2nd column (index 1) is a string column, which has offset type of int32_t + ASSERT_OK_AND_ASSIGN(std::shared_ptr values_second, + values_array->GetFlattenedField(1)); + std::shared_ptr str_vals = + std::static_pointer_cast(values_second); + int32_t total_string_size = str_vals->total_values_length(); + + while (state.KeepRunning()) { + ASSERT_OK_AND_ASSIGN(Datum hash_result, + compute::CallFunction("hash_64", {values_array})); + benchmark::DoNotOptimize(hash_result); + } + + state.SetBytesProcessed(state.iterations() * + ((values_array->length() * sizeof(int64_t)) + + (total_string_size) + + (values_array->length() * sizeof(int64_t)))); + state.SetItemsProcessed(state.iterations() * 3 * values_array->length()); +} + +static void Hash64StructLargeStrings( + benchmark::State& state) { // NOLINT non-const reference + ASSERT_OK_AND_ASSIGN(std::shared_ptr values_array, + MakeStructArray(10000, 120, 2000)); + + // 2nd column (index 1) is a string column, which has offset type of int32_t + ASSERT_OK_AND_ASSIGN(std::shared_ptr values_second, + values_array->GetFlattenedField(1)); + std::shared_ptr str_vals = + std::static_pointer_cast(values_second); + int32_t total_string_size = str_vals->total_values_length(); + + while (state.KeepRunning()) { + ASSERT_OK_AND_ASSIGN(Datum hash_result, + compute::CallFunction("hash_64", {values_array})); + benchmark::DoNotOptimize(hash_result); + } + + state.SetBytesProcessed(state.iterations() * + ((values_array->length() * sizeof(int64_t)) + + (total_string_size) + + (values_array->length() * sizeof(int64_t)))); + state.SetItemsProcessed(state.iterations() * 3 * values_array->length()); +} + +static void Hash64Map(benchmark::State& state) { // NOLINT non-const reference + constexpr int64_t test_size = 10000; + auto test_keys = hashing_rng.String(test_size, 2, 20, /*null_probability=*/0); + auto test_vals = hashing_rng.Int64(test_size, 0, std::numeric_limits::max()); + auto test_keyvals = hashing_rng.Map(test_keys, test_vals, test_size); + + auto key_arr = std::static_pointer_cast(test_keys); + int32_t total_key_size = key_arr->total_values_length(); + int32_t total_val_size = test_size * sizeof(int64_t); + + while (state.KeepRunning()) { + ASSERT_OK_AND_ASSIGN(Datum hash_result, + compute::CallFunction("hash_64", {test_keyvals})); + benchmark::DoNotOptimize(hash_result); + } + + state.SetBytesProcessed(state.iterations() * (total_key_size + total_val_size)); + state.SetItemsProcessed(state.iterations() * 2 * test_size); +} + +// ------------------------------ +// Benchmark declarations + +// Uses "FastHash" compute functions (wraps KeyHash functions) +BENCHMARK(Hash64Int64); + +BENCHMARK(Hash64StructSmallStrings); +BENCHMARK(Hash64StructMediumStrings); +BENCHMARK(Hash64StructLargeStrings); + +BENCHMARK(Hash64Map); + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/scalar_hash_test.cc b/cpp/src/arrow/compute/kernels/scalar_hash_test.cc new file mode 100644 index 0000000000000..d86110e532372 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_hash_test.cc @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/chunked_array.h" +#include "arrow/compute/api.h" +#include "arrow/compute/util.h" +#include "arrow/compute/key_hash.h" +#include "arrow/compute/kernels/test_util.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/util/key_value_metadata.h" + +namespace arrow { +namespace compute { + +/** A test helper that is a friend function for Hashing64 **/ +void TestHashVarLen(bool should_incr, uint32_t row_count, + const uint32_t* var_offsets, const uint8_t* var_data, + uint64_t* hash_results) { + Hashing64::HashVarLen(should_incr, row_count, var_offsets, var_data, hash_results); +} + +namespace { + +// combining based on key_hash.h:CombineHashesImp (96a3af4) +static const uint64_t combiner_const = 0x9e3779b9UL; +static inline uint64_t hash_combine(uint64_t h1, uint64_t h2) { + uint64_t combiner_result = combiner_const + h2 + (h1 << 6) + (h1 >> 2); + return h1 ^ combiner_result; +} + +// hash_int based on key_hash.cc:HashIntImp (672431b) +template +uint64_t hash_int(T val) { + constexpr uint64_t int_const = 11400714785074694791ULL; + uint64_t cast_val = static_cast(val); + + return static_cast(BYTESWAP(cast_val * int_const)); +} + +template +uint64_t hash_int_add(T val, uint64_t first_hash) { + return hash_combine(first_hash, hash_int(val)); +} + +} // namespace + +TEST(TestScalarHash, Hash64Primitive) { + constexpr int data_bufndx{1}; + std::vector test_values{3, 1, 2, 0, 127, 64}; + std::string test_inputs_str{"[3, 1, 2, 0, 127, 64]"}; + + for (auto input_dtype : {int32(), uint32(), int8(), uint8()}) { + auto test_inputs = ArrayFromJSON(input_dtype, test_inputs_str); + + ASSERT_OK_AND_ASSIGN(Datum hash_result, CallFunction("hash_64", {test_inputs})); + auto result_data = *(hash_result.array()); + + // validate each value + for (int val_ndx = 0; val_ndx < test_inputs->length(); ++val_ndx) { + uint64_t expected_hash = hash_int(test_values[val_ndx]); + uint64_t actual_hash = result_data.GetValues(data_bufndx)[val_ndx]; + ASSERT_EQ(expected_hash, actual_hash); + } + } +} + +// NOTE: oddly, if int32_t or uint64_t is used for hash_int<>, this fails +TEST(TestScalarHash, Hash64Negative) { + constexpr int data_bufndx{1}; + std::vector test_values{-3, 1, -2, 0, -127, 64}; + + Int32Builder input_builder; + ASSERT_OK(input_builder.Reserve(test_values.size())); + ASSERT_OK(input_builder.AppendValues(test_values)); + ASSERT_OK_AND_ASSIGN(auto test_inputs, input_builder.Finish()); + + ASSERT_OK_AND_ASSIGN(Datum hash_result, CallFunction("hash_64", {test_inputs})); + auto result_data = *(hash_result.array()); + + // validate each value + for (int val_ndx = 0; val_ndx < test_inputs->length(); ++val_ndx) { + uint64_t expected_hash = hash_int(test_values[val_ndx]); + uint64_t actual_hash = result_data.GetValues(data_bufndx)[val_ndx]; + ASSERT_EQ(expected_hash, actual_hash); + } +} + +TEST(TestScalarHash, Hash64IntMap) { + constexpr int data_bufndx{1}; + std::vector test_vals_first{7, 67, 3, 31, 17, 29}; + std::vector test_vals_second{67, 7, 31, 3, 29, 17}; + + auto test_map = ArrayFromJSON(map(uint16(), int16()), + R"([[[ 7, 67]], [[67, 7]], [[ 3, 31]], + [[31, 3]], [[17, 29]], [[29, 17]]])"); + + ASSERT_OK_AND_ASSIGN(Datum hash_result, CallFunction("hash_64", {test_map})); + auto result_data = *(hash_result.array()); + + // validate each value + for (size_t val_ndx = 0; val_ndx < test_vals_first.size(); ++val_ndx) { + uint64_t expected_hash = hash_combine(hash_int(test_vals_first[val_ndx]), + hash_int(test_vals_second[val_ndx])); + uint64_t actual_hash = result_data.GetValues(data_bufndx)[val_ndx]; + ASSERT_EQ(expected_hash, actual_hash); + } +} + +TEST(TestScalarHash, Hash64StringMap) { + constexpr int data_bufndx{1}; + constexpr int varoffs_bufndx{1}; + constexpr int vardata_bufndx{2}; + + // for expected values + auto test_vals_first = ArrayFromJSON(utf8(), R"(["first-A", "second-A", "third-A"])"); + auto test_vals_second = ArrayFromJSON(utf8(), R"(["first-B", "second-B", "third-B"])"); + uint64_t expected_hashes[test_vals_first->length()]; + + TestHashVarLen(/*combine_hashes=*/false, /*num_rows=*/3, + test_vals_first->data()->GetValues(varoffs_bufndx), + test_vals_first->data()->GetValues(vardata_bufndx), + expected_hashes); + + TestHashVarLen(/*combine_hashes=*/true, /*num_rows=*/3, + test_vals_second->data()->GetValues(varoffs_bufndx), + test_vals_second->data()->GetValues(vardata_bufndx), + expected_hashes); + + // for actual values + auto test_map = ArrayFromJSON(map(utf8(), utf8()), + R"([[["first-A", "first-B"]], + [["second-A", "second-B"]], + [["third-A", "third-B"]]])"); + + ASSERT_OK_AND_ASSIGN(Datum hash_result, CallFunction("hash_64", {test_map})); + auto result_data = *(hash_result.array()); + + // compare actual and expected + for (int64_t val_ndx = 0; val_ndx < test_vals_first->length(); ++val_ndx) { + uint64_t actual_hash = result_data.GetValues(data_bufndx)[val_ndx]; + + ASSERT_EQ(expected_hashes[val_ndx], actual_hash); + } +} + +TEST(TestScalarHash, Hash64Map) { + constexpr int data_bufndx{1}; + constexpr int varoffs_bufndx{1}; + constexpr int vardata_bufndx{2}; + + // For expected values + auto test_vals_first = ArrayFromJSON(utf8(), + R"(["first-A", "second-A", "first-B", + "second-B", "first-C", "second-C"])"); + + std::vector test_vals_second{1, 3, 11, 23, 111, 223}; + uint64_t expected_hashes[test_vals_first->length()]; + + // compute initial hashes from the string column (array) + TestHashVarLen(/*combine_hashes=*/false, /*num_rows=*/6, + test_vals_first->data()->GetValues(varoffs_bufndx), + test_vals_first->data()->GetValues(vardata_bufndx), + expected_hashes); + + // For actual values + auto test_map = ArrayFromJSON(map(utf8(), uint8()), + R"([[["first-A", 1]], [["second-A", 3]], + [["first-B", 11]], [["second-B", 23]], + [["first-C", 111]], [["second-C", 223]]])"); + + ASSERT_OK_AND_ASSIGN(Datum hash_result, CallFunction("hash_64", {test_map})); + auto result_data = *(hash_result.array()); + + // compare actual and expected + for (int64_t val_ndx = 0; val_ndx < test_vals_first->length(); ++val_ndx) { + // compute final hashes by combining int hashes with initial string hashes + expected_hashes[val_ndx] = hash_combine(expected_hashes[val_ndx], + hash_int(test_vals_second[val_ndx])); + + uint64_t actual_hash = result_data.GetValues(data_bufndx)[val_ndx]; + + ASSERT_EQ(expected_hashes[val_ndx], actual_hash); + } +} + +TEST(TestScalarHash, Hash64List) { + constexpr int data_bufndx{1}; + constexpr int varoffs_bufndx{1}; + constexpr int vardata_bufndx{2}; + + // for expected values + auto test_vals1 = ArrayFromJSON(utf8(), R"(["first-A", "second-A", "third-A"])"); + auto test_vals2 = ArrayFromJSON(utf8(), R"(["first-B", "second-B", "third-B"])"); + auto test_vals3 = ArrayFromJSON(utf8(), R"(["first-A", "first-B", + "second-A", "second-B", + "third-A", "third-B"])"); + uint64_t expected_hashes[test_vals1->length()]; + uint64_t test_hashes[test_vals3->length()]; + + TestHashVarLen(/*combine_hashes=*/false, /*num_rows=*/3, + test_vals1->data()->GetValues(varoffs_bufndx), + test_vals1->data()->GetValues(vardata_bufndx), + expected_hashes); + + TestHashVarLen(/*combine_hashes=*/true, /*num_rows=*/3, + test_vals2->data()->GetValues(varoffs_bufndx), + test_vals2->data()->GetValues(vardata_bufndx), + expected_hashes); + + TestHashVarLen(/*combine_hashes=*/false, /*num_rows=*/6, + test_vals3->data()->GetValues(varoffs_bufndx), + test_vals3->data()->GetValues(vardata_bufndx), + test_hashes); + + // for actual values + auto test_list = ArrayFromJSON(list(utf8()), + R"([["first-A", "first-B"], + ["second-A", "second-B"], + ["third-A", "third-B"]])"); + + ARROW_LOG(INFO) << "size: " << test_list->length(); + ARROW_LOG(INFO) << test_list->ToString(); + + ARROW_LOG(INFO) << "Test Hashes:"; + for (uint64_t hash_val : test_hashes) { + ARROW_LOG(INFO) << "\t" << hash_val; + } + + ASSERT_OK_AND_ASSIGN(Datum hash_result, CallFunction("hash_64", {test_list})); + auto result_data = *(hash_result.array()); + + // compare actual and expected + // for (int64_t val_ndx = 0; val_ndx < test_list->length(); ++val_ndx) { + for (int64_t val_ndx = 0; val_ndx < result_data.length; ++val_ndx) { + uint64_t actual_hash = result_data.GetValues(data_bufndx)[val_ndx]; + ARROW_LOG(INFO) << "actual hash: " << actual_hash; + + // ASSERT_EQ(expected_hashes[val_ndx], actual_hash); + // ARROW_LOG(INFO) << "expected hash: " << expected_hashes[val_ndx] << "\tactual hash: " << actual_hash; + } +} + +} // namespace compute +} // namespace arrow