diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 18f656a2e458d..ec511aa03a6d0 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -236,6 +236,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( irow_right = _mm256_loadu_si256(reinterpret_cast(left_to_right_map) + i); } + // TODO: Need to test if this gather is OK when irow_right is larger than + // 0x80000000u. __m256i offset_right = _mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4); offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row)); @@ -251,6 +253,40 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( } } +namespace { + +// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we +// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row +// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat +// it as negative offset and gather the data from undesired address. To avoid this issue, +// we normalize the addresses by translating `base` `0x80000000` higher, and `offset` +// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those +// intrinsics are safe. + +constexpr uint64_t kTwoGB = 0x80000000ull; + +template +inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset) { + int const* normalized_base = base + kTwoGB / sizeof(int); + __m256i normalized_offset = + _mm256_sub_epi32(offset, _mm256_set1_epi32(static_cast(kTwoGB / kScale))); + return _mm256_i32gather_epi32(normalized_base, normalized_offset, + static_cast(kScale)); +} + +template +inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base, + __m128i offset) { + arrow::util::int64_for_gather_t const* normalized_base = + base + kTwoGB / sizeof(arrow::util::int64_for_gather_t); + __m128i normalized_offset = + _mm_sub_epi32(offset, _mm_set1_epi32(static_cast(kTwoGB / kScale))); + return _mm256_i32gather_epi64(normalized_base, normalized_offset, + static_cast(kScale)); +} + +} // namespace + template inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* right_base, __m256i irow_left, __m256i offset_right, @@ -281,7 +317,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r ARROW_DCHECK(false); } - __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1); + __m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -330,7 +366,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas ARROW_DCHECK(false); } - __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1); + __m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -367,9 +403,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig auto right_base_i64 = reinterpret_cast(right_base); __m256i right_lo = - _mm256_i32gather_epi64(right_base_i64, _mm256_castsi256_si128(offset_right), 1); - __m256i right_hi = _mm256_i32gather_epi64(right_base_i64, - _mm256_extracti128_si256(offset_right, 1), 1); + UnsignedOffsetSafeGather64<1>(right_base_i64, _mm256_castsi256_si128(offset_right)); + __m256i right_hi = UnsignedOffsetSafeGather64<1>( + right_base_i64, _mm256_extracti128_si256(offset_right, 1)); uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo)); uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi)); return result_lo | (static_cast(result_hi) << 32); diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 4044049b10863..662862075c245 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -18,8 +18,10 @@ #include #include "arrow/compute/row/compare_internal.h" +#include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" +#include "arrow/util/bitmap_ops.h" namespace arrow { namespace compute { @@ -164,5 +166,141 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { } } +#ifndef ARROW_VALGRIND +// Compare columns to rows at offsets over 2GB within a row table. +// Certain AVX2 instructions may behave unexpectedly causing troubles like GH-41813. +TEST(KeyCompare, CompareColumnsToRowsLarge) { + if constexpr (sizeof(void*) == 4) { + GTEST_SKIP() << "Test only works on 64-bit platforms"; + } + + // The idea of this case is to create a row table using several fixed length columns and + // one var length column (so the row is hence var length and has offset buffer), with + // the overall data size exceeding 2GB. Then compare each row with itself. + constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll; + // The compare function requires the row id of the left column to be uint16_t, hence the + // number of rows. + constexpr int64_t num_rows = std::numeric_limits::max() + 1; + const std::vector> fixed_length_types{uint64(), uint32()}; + // The var length column should be a little smaller than 2GB to workaround the capacity + // limitation in the var length builder. + constexpr int32_t var_length = two_gb / num_rows - 1; + auto row_size = std::accumulate(fixed_length_types.begin(), fixed_length_types.end(), + static_cast(var_length), + [](int64_t acc, const std::shared_ptr& type) { + return acc + type->byte_width(); + }); + // The overall size should be larger than 2GB. + ASSERT_GT(row_size * num_rows, two_gb); + + MemoryPool* pool = default_memory_pool(); + + // The left side columns. + std::vector columns_left; + ExecBatch batch_left; + { + std::vector values; + + // Several fixed length arrays containing random content. + for (const auto& type : fixed_length_types) { + ASSERT_OK_AND_ASSIGN(auto value, ::arrow::gen::Random(type)->Generate(num_rows)); + values.push_back(std::move(value)); + } + // A var length array containing 'X' repeated var_length times. + ASSERT_OK_AND_ASSIGN(auto value_var_length, + ::arrow::gen::Constant( + std::make_shared(std::string(var_length, 'X'))) + ->Generate(num_rows)); + values.push_back(std::move(value_var_length)); + + batch_left = ExecBatch(std::move(values), num_rows); + ASSERT_OK(ColumnArraysFromExecBatch(batch_left, &columns_left)); + } + + // The right side row table. + RowTableImpl row_table_right; + { + // Encode the row table with the left columns. + std::vector column_metadatas; + ASSERT_OK(ColumnMetadatasFromExecBatch(batch_left, &column_metadatas)); + RowTableMetadata table_metadata; + table_metadata.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t), + sizeof(uint64_t)); + ASSERT_OK(row_table_right.Init(pool, table_metadata)); + std::vector row_ids(num_rows); + std::iota(row_ids.begin(), row_ids.end(), 0); + RowTableEncoder row_encoder; + row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t)); + row_encoder.PrepareEncodeSelected(0, num_rows, columns_left); + ASSERT_OK(row_encoder.EncodeSelected( + &row_table_right, static_cast(num_rows), row_ids.data())); + + // The row table must contain an offset buffer. + ASSERT_NE(row_table_right.offsets(), NULLPTR); + // The whole point of this test. + ASSERT_GT(row_table_right.offsets()[num_rows - 1], two_gb); + } + + // The rows to compare. + std::vector row_ids_to_compare(num_rows); + std::iota(row_ids_to_compare.begin(), row_ids_to_compare.end(), 0); + + TempVectorStack stack; + ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); + LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack}; + + { + // No selection, output no match row ids. + uint32_t num_rows_no_match; + std::vector row_ids_out(num_rows); + KeyCompare::CompareColumnsToRows(num_rows, /*sel_left_maybe_null=*/NULLPTR, + row_ids_to_compare.data(), &ctx, &num_rows_no_match, + row_ids_out.data(), columns_left, row_table_right, + /*are_cols_in_encoding_order=*/true, + /*out_match_bitvector_maybe_null=*/NULLPTR); + ASSERT_EQ(num_rows_no_match, 0); + } + + { + // No selection, output match bit vector. + std::vector match_bitvector(BytesForBits(num_rows)); + KeyCompare::CompareColumnsToRows( + num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_to_compare.data(), &ctx, + /*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left, + row_table_right, + /*are_cols_in_encoding_order=*/true, match_bitvector.data()); + ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), + num_rows); + } + + std::vector selection_left(num_rows); + std::iota(selection_left.begin(), selection_left.end(), 0); + + { + // With selection, output no match row ids. + uint32_t num_rows_no_match; + std::vector row_ids_out(num_rows); + KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), + row_ids_to_compare.data(), &ctx, &num_rows_no_match, + row_ids_out.data(), columns_left, row_table_right, + /*are_cols_in_encoding_order=*/true, + /*out_match_bitvector_maybe_null=*/NULLPTR); + ASSERT_EQ(num_rows_no_match, 0); + } + + { + // With selection, output match bit vector. + std::vector match_bitvector(BytesForBits(num_rows)); + KeyCompare::CompareColumnsToRows( + num_rows, selection_left.data(), row_ids_to_compare.data(), &ctx, + /*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left, + row_table_right, + /*are_cols_in_encoding_order=*/true, match_bitvector.data()); + ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), + num_rows); + } +} +#endif // ARROW_VALGRIND + } // namespace compute } // namespace arrow