Skip to content

Commit

Permalink
apacheGH-41813: [C++] Fix avx2 gather offset larger than 2GB in `Comp…
Browse files Browse the repository at this point in the history
…areColumnsToRows` (apache#42188)

### Rationale for this change

AVX2 intrinsics `_mm256_i32gather_epi32`/`_mm256_i32gather_epi64` are used in `CompareColumnsToRows` API, and treat the `vindex` as signed integer. In our row table implementation, we use `uint32_t` to represent the offset within the row table. When a offset is larger than (`0x80000000`, or `2GB`), the aforementioned intrinsics will treat it as negative offset and gather the data from undesired address. More details please see apache#41813 (comment).

Considering there is no unsigned-32bit-offset or 64bit-offset counterparts of those intrinsics in AVX2, this issue can be simply mitigated by translating the base address and the offset:
```
new_base = base + 0x80000000;
new_offset = offset - 0x80000000;
```

### What changes are included in this PR?

Fix and UT that reproduces the issue.

### Are these changes tested?

UT included.

### Are there any user-facing changes?

None.

* GitHub Issue: apache#41813

Authored-by: Ruoxi Sun <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
zanmato1984 authored Jun 25, 2024
1 parent 3e7ae53 commit e635cc2
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 5 deletions.
46 changes: 41 additions & 5 deletions cpp/src/arrow/compute/row/compare_internal_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
irow_right =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
}
// TODO: Need to test if this gather is OK when irow_right is larger than
// 0x80000000u.
__m256i offset_right =
_mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4);
offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
Expand All @@ -251,6 +253,40 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
}
}

namespace {

// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we
// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row
// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat
// it as negative offset and gather the data from undesired address. To avoid this issue,
// we normalize the addresses by translating `base` `0x80000000` higher, and `offset`
// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those
// intrinsics are safe.

constexpr uint64_t kTwoGB = 0x80000000ull;

template <uint32_t kScale>
inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset) {
int const* normalized_base = base + kTwoGB / sizeof(int);
__m256i normalized_offset =
_mm256_sub_epi32(offset, _mm256_set1_epi32(static_cast<int>(kTwoGB / kScale)));
return _mm256_i32gather_epi32(normalized_base, normalized_offset,
static_cast<int>(kScale));
}

template <uint32_t kScale>
inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base,
__m128i offset) {
arrow::util::int64_for_gather_t const* normalized_base =
base + kTwoGB / sizeof(arrow::util::int64_for_gather_t);
__m128i normalized_offset =
_mm_sub_epi32(offset, _mm_set1_epi32(static_cast<int>(kTwoGB / kScale)));
return _mm256_i32gather_epi64(normalized_base, normalized_offset,
static_cast<int>(kScale));
}

} // namespace

template <int column_width>
inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* right_base,
__m256i irow_left, __m256i offset_right,
Expand Down Expand Up @@ -281,7 +317,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r
ARROW_DCHECK(false);
}

__m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
__m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right);
if (column_width != sizeof(uint32_t)) {
constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff;
right = _mm256_and_si256(right, _mm256_set1_epi32(mask));
Expand Down Expand Up @@ -330,7 +366,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas
ARROW_DCHECK(false);
}

__m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
__m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right);
if (column_width != sizeof(uint32_t)) {
constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff;
right = _mm256_and_si256(right, _mm256_set1_epi32(mask));
Expand Down Expand Up @@ -367,9 +403,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig
auto right_base_i64 =
reinterpret_cast<const arrow::util::int64_for_gather_t*>(right_base);
__m256i right_lo =
_mm256_i32gather_epi64(right_base_i64, _mm256_castsi256_si128(offset_right), 1);
__m256i right_hi = _mm256_i32gather_epi64(right_base_i64,
_mm256_extracti128_si256(offset_right, 1), 1);
UnsignedOffsetSafeGather64<1>(right_base_i64, _mm256_castsi256_si128(offset_right));
__m256i right_hi = UnsignedOffsetSafeGather64<1>(
right_base_i64, _mm256_extracti128_si256(offset_right, 1));
uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo));
uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi));
return result_lo | (static_cast<uint64_t>(result_hi) << 32);
Expand Down
138 changes: 138 additions & 0 deletions cpp/src/arrow/compute/row/compare_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#include <numeric>

#include "arrow/compute/row/compare_internal.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/bitmap_ops.h"

namespace arrow {
namespace compute {
Expand Down Expand Up @@ -164,5 +166,141 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) {
}
}

#ifndef ARROW_VALGRIND
// Compare columns to rows at offsets over 2GB within a row table.
// Certain AVX2 instructions may behave unexpectedly causing troubles like GH-41813.
TEST(KeyCompare, CompareColumnsToRowsLarge) {
if constexpr (sizeof(void*) == 4) {
GTEST_SKIP() << "Test only works on 64-bit platforms";
}

// The idea of this case is to create a row table using several fixed length columns and
// one var length column (so the row is hence var length and has offset buffer), with
// the overall data size exceeding 2GB. Then compare each row with itself.
constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll;
// The compare function requires the row id of the left column to be uint16_t, hence the
// number of rows.
constexpr int64_t num_rows = std::numeric_limits<uint16_t>::max() + 1;
const std::vector<std::shared_ptr<DataType>> fixed_length_types{uint64(), uint32()};
// The var length column should be a little smaller than 2GB to workaround the capacity
// limitation in the var length builder.
constexpr int32_t var_length = two_gb / num_rows - 1;
auto row_size = std::accumulate(fixed_length_types.begin(), fixed_length_types.end(),
static_cast<int64_t>(var_length),
[](int64_t acc, const std::shared_ptr<DataType>& type) {
return acc + type->byte_width();
});
// The overall size should be larger than 2GB.
ASSERT_GT(row_size * num_rows, two_gb);

MemoryPool* pool = default_memory_pool();

// The left side columns.
std::vector<KeyColumnArray> columns_left;
ExecBatch batch_left;
{
std::vector<Datum> values;

// Several fixed length arrays containing random content.
for (const auto& type : fixed_length_types) {
ASSERT_OK_AND_ASSIGN(auto value, ::arrow::gen::Random(type)->Generate(num_rows));
values.push_back(std::move(value));
}
// A var length array containing 'X' repeated var_length times.
ASSERT_OK_AND_ASSIGN(auto value_var_length,
::arrow::gen::Constant(
std::make_shared<BinaryScalar>(std::string(var_length, 'X')))
->Generate(num_rows));
values.push_back(std::move(value_var_length));

batch_left = ExecBatch(std::move(values), num_rows);
ASSERT_OK(ColumnArraysFromExecBatch(batch_left, &columns_left));
}

// The right side row table.
RowTableImpl row_table_right;
{
// Encode the row table with the left columns.
std::vector<KeyColumnMetadata> column_metadatas;
ASSERT_OK(ColumnMetadatasFromExecBatch(batch_left, &column_metadatas));
RowTableMetadata table_metadata;
table_metadata.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t),
sizeof(uint64_t));
ASSERT_OK(row_table_right.Init(pool, table_metadata));
std::vector<uint16_t> row_ids(num_rows);
std::iota(row_ids.begin(), row_ids.end(), 0);
RowTableEncoder row_encoder;
row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t));
row_encoder.PrepareEncodeSelected(0, num_rows, columns_left);
ASSERT_OK(row_encoder.EncodeSelected(
&row_table_right, static_cast<uint32_t>(num_rows), row_ids.data()));

// The row table must contain an offset buffer.
ASSERT_NE(row_table_right.offsets(), NULLPTR);
// The whole point of this test.
ASSERT_GT(row_table_right.offsets()[num_rows - 1], two_gb);
}

// The rows to compare.
std::vector<uint32_t> row_ids_to_compare(num_rows);
std::iota(row_ids_to_compare.begin(), row_ids_to_compare.end(), 0);

TempVectorStack stack;
ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows)));
LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack};

{
// No selection, output no match row ids.
uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
KeyCompare::CompareColumnsToRows(num_rows, /*sel_left_maybe_null=*/NULLPTR,
row_ids_to_compare.data(), &ctx, &num_rows_no_match,
row_ids_out.data(), columns_left, row_table_right,
/*are_cols_in_encoding_order=*/true,
/*out_match_bitvector_maybe_null=*/NULLPTR);
ASSERT_EQ(num_rows_no_match, 0);
}

{
// No selection, output match bit vector.
std::vector<uint8_t> match_bitvector(BytesForBits(num_rows));
KeyCompare::CompareColumnsToRows(
num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_to_compare.data(), &ctx,
/*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left,
row_table_right,
/*are_cols_in_encoding_order=*/true, match_bitvector.data());
ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows),
num_rows);
}

std::vector<uint16_t> selection_left(num_rows);
std::iota(selection_left.begin(), selection_left.end(), 0);

{
// With selection, output no match row ids.
uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(),
row_ids_to_compare.data(), &ctx, &num_rows_no_match,
row_ids_out.data(), columns_left, row_table_right,
/*are_cols_in_encoding_order=*/true,
/*out_match_bitvector_maybe_null=*/NULLPTR);
ASSERT_EQ(num_rows_no_match, 0);
}

{
// With selection, output match bit vector.
std::vector<uint8_t> match_bitvector(BytesForBits(num_rows));
KeyCompare::CompareColumnsToRows(
num_rows, selection_left.data(), row_ids_to_compare.data(), &ctx,
/*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left,
row_table_right,
/*are_cols_in_encoding_order=*/true, match_bitvector.data());
ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows),
num_rows);
}
}
#endif // ARROW_VALGRIND

} // namespace compute
} // namespace arrow

0 comments on commit e635cc2

Please sign in to comment.