Skip to content

Commit

Permalink
apacheGH-41334: [C++][Acero] Use per-node basis temp vector stack to …
Browse files Browse the repository at this point in the history
…mitigate overflow (apache#41335)

### Rationale for this change

The risk of temp vector stack overflow still exists as described in apache#41334 . Many people have agreed on a per-node basis approach:
> 1) it doesn't introduce more performance penalty than shared stack; 2) it can mitigate the overflow in a natural way, i.e., expanding the stack size linear to the number of nodes; 3) it requires no more complexity to the existing stack implementation.

The full (but long) story is also revealed in the subsequent discussion of this PR. Feel free to scroll down.

### What changes are included in this PR?

1. Change the current shared (per-thread) temp vector stack usage to per-node basis.
2. Make the stack size required by each stack user more explicit.

### Are these changes tested?

UT included.

### Are there any user-facing changes?

None.

* GitHub Issue: apache#41334

Authored-by: Ruoxi Sun <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
zanmato1984 authored May 14, 2024
1 parent 8f27e26 commit 6c386da
Show file tree
Hide file tree
Showing 19 changed files with 371 additions and 150 deletions.
3 changes: 2 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ set(ARROW_COMPUTE_SRCS
compute/row/compare_internal.cc
compute/row/grouper.cc
compute/row/row_internal.cc
compute/util.cc)
compute/util.cc
compute/util_internal.cc)

append_runtime_avx2_src(ARROW_COMPUTE_SRCS compute/key_hash_internal_avx2.cc)
append_runtime_avx2_bmi2_src(ARROW_COMPUTE_SRCS compute/key_map_internal_avx2.cc)
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ struct ExecPlanImpl : public ExecPlan {
Future<> scheduler_finished = arrow::util::AsyncTaskScheduler::Make(
[this](arrow::util::AsyncTaskScheduler* async_scheduler) {
QueryContext* ctx = query_context();
RETURN_NOT_OK(ctx->Init(ctx->max_concurrency(), async_scheduler));
RETURN_NOT_OK(ctx->Init(async_scheduler));

#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
Expand Down
38 changes: 26 additions & 12 deletions cpp/src/arrow/acero/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,11 @@ struct BloomFilterPushdownContext {
using BuildFinishedCallback = std::function<Status(size_t, AccumulationQueue)>;
using FiltersReceivedCallback = std::function<Status(size_t)>;
using FilterFinishedCallback = std::function<Status(size_t, AccumulationQueue)>;
void Init(HashJoinNode* owner, size_t num_threads,
RegisterTaskGroupCallback register_task_group_callback,
StartTaskGroupCallback start_task_group_callback,
FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter,
bool use_sync_execution);
Status Init(HashJoinNode* owner, size_t num_threads,
RegisterTaskGroupCallback register_task_group_callback,
StartTaskGroupCallback start_task_group_callback,
FiltersReceivedCallback on_bloom_filters_received,
bool disable_bloom_filter, bool use_sync_execution);

Status StartProducing(size_t thread_index);

Expand Down Expand Up @@ -559,8 +559,7 @@ struct BloomFilterPushdownContext {
std::vector<uint32_t> hashes(batch.length);
std::vector<uint8_t> bv(bit_vector_bytes);

ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * stack,
ctx_->GetTempStack(thread_index));
arrow::util::TempVectorStack* stack = &tld_[thread_index].stack;

// Start with full selection for the current batch
memset(selected.data(), 0xff, bit_vector_bytes);
Expand Down Expand Up @@ -654,7 +653,17 @@ struct BloomFilterPushdownContext {
FiltersReceivedCallback all_received_callback_;
FilterFinishedCallback on_finished_;
} eval_;

static constexpr auto kTempStackUsage =
Hashing32::kHashBatchTempStackUsage +
(sizeof(uint32_t) + /*extra=*/1) * arrow::util::MiniBatch::kMiniBatchLength;

struct ThreadLocalData {
arrow::util::TempVectorStack stack;
};
std::vector<ThreadLocalData> tld_;
};

bool HashJoinSchema::HasDictionaries() const {
for (int side = 0; side <= 1; ++side) {
for (int icol = 0; icol < proj_maps[side].num_cols(HashJoinProjection::INPUT);
Expand Down Expand Up @@ -930,7 +939,7 @@ class HashJoinNode : public ExecNode, public TracedNode {
// we will change it back to just the CPU's thread pool capacity.
size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);

pushdown_context_.Init(
RETURN_NOT_OK(pushdown_context_.Init(
this, num_threads,
[ctx](std::function<Status(size_t, int64_t)> fn,
std::function<Status(size_t)> on_finished) {
Expand All @@ -940,7 +949,7 @@ class HashJoinNode : public ExecNode, public TracedNode {
return ctx->StartTaskGroup(task_group_id, num_tasks);
},
[this](size_t thread_index) { return OnFiltersReceived(thread_index); },
disable_bloom_filter_, use_sync_execution);
disable_bloom_filter_, use_sync_execution));

RETURN_NOT_OK(impl_->Init(
ctx, join_type_, num_threads, &(schema_mgr_->proj_maps[0]),
Expand Down Expand Up @@ -1037,7 +1046,7 @@ class HashJoinNode : public ExecNode, public TracedNode {
BloomFilterPushdownContext pushdown_context_;
};

void BloomFilterPushdownContext::Init(
Status BloomFilterPushdownContext::Init(
HashJoinNode* owner, size_t num_threads,
RegisterTaskGroupCallback register_task_group_callback,
StartTaskGroupCallback start_task_group_callback,
Expand Down Expand Up @@ -1074,6 +1083,12 @@ void BloomFilterPushdownContext::Init(
return eval_.on_finished_(thread_index, std::move(eval_.batches_));
});
start_task_group_callback_ = std::move(start_task_group_callback);
tld_.resize(num_threads);
for (auto& local_data : tld_) {
RETURN_NOT_OK(local_data.stack.Init(ctx_->memory_pool(), kTempStackUsage));
}

return Status::OK();
}

Status BloomFilterPushdownContext::StartProducing(size_t thread_index) {
Expand Down Expand Up @@ -1124,8 +1139,7 @@ Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_inde
}
ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns)));

ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * stack,
ctx_->GetTempStack(thread_index));
arrow::util::TempVectorStack* stack = &tld_[thread_index].stack;
arrow::util::TempVectorHolder<uint32_t> hash_holder(
stack, arrow::util::MiniBatch::kMiniBatchLength);
uint32_t* hashes = hash_holder.mutable_data();
Expand Down
52 changes: 52 additions & 0 deletions cpp/src/arrow/acero/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/api.h"
#include "arrow/compute/kernels/row_encoder_internal.h"
#include "arrow/compute/kernels/test_util.h"
#include "arrow/compute/light_array_internal.h"
#include "arrow/testing/extension_type.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
Expand All @@ -41,6 +42,7 @@ namespace arrow {

using compute::call;
using compute::default_exec_context;
using compute::ExecBatchBuilder;
using compute::ExecSpan;
using compute::field_ref;
using compute::SortIndices;
Expand Down Expand Up @@ -3201,5 +3203,55 @@ TEST(HashJoin, ChainedIntegerHashJoins) {
}
}

// Test that a large number of joins don't overflow the temp vector stack, like GH-39582
// and GH-39951.
TEST(HashJoin, ManyJoins) {
// The idea of this case is to create many nested join nodes that may possibly cause
// recursive usage of temp vector stack. To make sure that the recursion happens:
// 1. A left-deep join tree is created so that the left-most (the final probe side)
// table will go through all the hash tables from the right side.
// 2. Left-outer join is used so that every join will increase the cardinality.
// 3. The left-most table contains rows of unique integers from 0 to N.
// 4. Each right table at level i contains two rows of integer i, so that the probing of
// each level will increase the result by one row.
// 5. The left-most table is a single batch of enough rows, so that at each level, the
// probing will accumulate enough result rows to have to output to the subsequent level
// before finishing the current batch (releasing the buffer allocated on the temp vector
// stack), which is essentially the recursive usage of the temp vector stack.

// A fair number of joins to guarantee temp vector stack overflow before GH-41335.
const int num_joins = 64;

// `ExecBatchBuilder::num_rows_max()` is the number of rows for swiss join to accumulate
// before outputting.
const int num_left_rows = ExecBatchBuilder::num_rows_max();
ASSERT_OK_AND_ASSIGN(
auto left_batches,
MakeIntegerBatches({[](int row_id) -> int64_t { return row_id; }},
schema({field("l_key", int32())}),
/*num_batches=*/1, /*batch_size=*/num_left_rows));
Declaration root{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(left_batches.schema),
std::move(left_batches.batches))};

HashJoinNodeOptions join_opts(JoinType::LEFT_OUTER, /*left_keys=*/{"l_key"},
/*right_keys=*/{"r_key"});

for (int i = 0; i < num_joins; ++i) {
ASSERT_OK_AND_ASSIGN(auto right_batches,
MakeIntegerBatches({[i](int) -> int64_t { return i; }},
schema({field("r_key", int32())}),
/*num_batches=*/1, /*batch_size=*/2));
Declaration table{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(right_batches.schema),
std::move(right_batches.batches))};

Declaration new_root{"hashjoin", {std::move(root), std::move(table)}, join_opts};
root = std::move(new_root);
}

ASSERT_OK_AND_ASSIGN(std::ignore, DeclarationToTable(std::move(root)));
}

} // namespace acero
} // namespace arrow
12 changes: 1 addition & 11 deletions cpp/src/arrow/acero/query_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context)
const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance(); }
int64_t QueryContext::hardware_flags() const { return cpu_info()->hardware_flags(); }

Status QueryContext::Init(size_t max_num_threads, util::AsyncTaskScheduler* scheduler) {
tld_.resize(max_num_threads);
Status QueryContext::Init(util::AsyncTaskScheduler* scheduler) {
async_scheduler_ = scheduler;
return Status::OK();
}
Expand All @@ -50,15 +49,6 @@ size_t QueryContext::GetThreadIndex() { return thread_indexer_(); }

size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); }

Result<util::TempVectorStack*> QueryContext::GetTempStack(size_t thread_index) {
if (!tld_[thread_index].is_init) {
RETURN_NOT_OK(tld_[thread_index].stack.Init(
memory_pool(), 32 * util::MiniBatch::kMiniBatchLength * sizeof(uint64_t)));
tld_[thread_index].is_init = true;
}
return &tld_[thread_index].stack;
}

Result<Future<>> QueryContext::BeginExternalTask(std::string_view name) {
Future<> completion_future = Future<>::Make();
if (async_scheduler_->AddSimpleTask([completion_future] { return completion_future; },
Expand Down
8 changes: 1 addition & 7 deletions cpp/src/arrow/acero/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ARROW_ACERO_EXPORT QueryContext {
QueryContext(QueryOptions opts = {},
ExecContext exec_context = *default_exec_context());

Status Init(size_t max_num_threads, arrow::util::AsyncTaskScheduler* scheduler);
Status Init(arrow::util::AsyncTaskScheduler* scheduler);

const ::arrow::internal::CpuInfo* cpu_info() const;
int64_t hardware_flags() const;
Expand All @@ -52,7 +52,6 @@ class ARROW_ACERO_EXPORT QueryContext {

size_t GetThreadIndex();
size_t max_concurrency() const;
Result<arrow::util::TempVectorStack*> GetTempStack(size_t thread_index);

/// \brief Start an external task
///
Expand Down Expand Up @@ -145,11 +144,6 @@ class ARROW_ACERO_EXPORT QueryContext {
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();

ThreadIndexer thread_indexer_;
struct ThreadLocalData {
bool is_init = false;
arrow::util::TempVectorStack stack;
};
std::vector<ThreadLocalData> tld_;

std::atomic<size_t> in_flight_bytes_to_disk_{0};
};
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,8 @@ Status JoinProbeProcessor::OnFinished() {

class SwissJoin : public HashJoinImpl {
public:
static constexpr auto kTempStackUsage = 64 * arrow::util::MiniBatch::kMiniBatchLength;

Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads,
const HashJoinProjectionMaps* proj_map_left,
const HashJoinProjectionMaps* proj_map_right,
Expand Down Expand Up @@ -2513,6 +2515,7 @@ class SwissJoin : public HashJoinImpl {

local_states_.resize(num_threads_);
for (int i = 0; i < num_threads_; ++i) {
RETURN_NOT_OK(local_states_[i].stack.Init(pool_, kTempStackUsage));
local_states_[i].hash_table_ready = false;
local_states_[i].num_output_batches = 0;
local_states_[i].materialize.Init(pool_, proj_map_left, proj_map_right);
Expand Down Expand Up @@ -2566,8 +2569,7 @@ class SwissJoin : public HashJoinImpl {

ExecBatch keypayload_batch;
ARROW_ASSIGN_OR_RAISE(keypayload_batch, KeyPayloadFromInput(/*side=*/0, &batch));
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_index));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_index].stack;

return CancelIfNotOK(
probe_processor_.OnNextBatch(thread_index, keypayload_batch, temp_stack,
Expand Down Expand Up @@ -2679,8 +2681,7 @@ class SwissJoin : public HashJoinImpl {
input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol];
}
}
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_id));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack;
RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.PushNextBatch(
static_cast<int64_t>(thread_id), key_batch, no_payload ? nullptr : &payload_batch,
temp_stack)));
Expand Down Expand Up @@ -2715,8 +2716,7 @@ class SwissJoin : public HashJoinImpl {

Status MergeFinished(size_t thread_id) {
RETURN_NOT_OK(status());
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_id));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack;
hash_table_build_.FinishPrtnMerge(temp_stack);
return CancelIfNotOK(OnBuildHashTableFinished(static_cast<int64_t>(thread_id)));
}
Expand Down Expand Up @@ -2771,8 +2771,7 @@ class SwissJoin : public HashJoinImpl {
std::min((task_id + 1) * kNumRowsPerScanTask, hash_table_.num_rows());
// Get thread index and related temp vector stack
//
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_id));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack;

// Split into mini-batches
//
Expand Down Expand Up @@ -2949,6 +2948,7 @@ class SwissJoin : public HashJoinImpl {
FinishedCallback finished_callback_;

struct ThreadLocalState {
arrow::util::TempVectorStack stack;
JoinResultMaterialize materialize;
std::vector<KeyColumnArray> temp_column_arrays;
int64_t num_output_batches;
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/compute/key_hash_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ class ARROW_EXPORT Hashing32 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
uint32_t* out_hash);

// Clarify the max temp stack usage for HashBatch, which might be necessary for the
// caller to be aware of at compile time to reserve enough stack size in advance. The
// HashBatch implementation uses one uint32 temp vector as a buffer for hash, one uint16
// temp vector as a buffer for null indices and one uint32 temp vector as a buffer for
// null hash, all are of size kMiniBatchLength. Plus extra kMiniBatchLength to cope with
// stack padding and aligning.
static constexpr auto kHashBatchTempStackUsage =
(sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) *
util::MiniBatch::kMiniBatchLength;

static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack,
Expand Down Expand Up @@ -161,6 +171,15 @@ class ARROW_EXPORT Hashing64 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
uint64_t* hashes);

// Clarify the max temp stack usage for HashBatch, which might be necessary for the
// caller to be aware of at compile time to reserve enough stack size in advance. The
// HashBatch implementation uses one uint16 temp vector as a buffer for null indices and
// one uint64 temp vector as a buffer for null hash, all are of size kMiniBatchLength.
// Plus extra kMiniBatchLength to cope with stack padding and aligning.
static constexpr auto kHashBatchTempStackUsage =
(sizeof(uint16_t) + sizeof(uint64_t) + /*extra=*/1) *
util::MiniBatch::kMiniBatchLength;

static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack,
Expand Down
Loading

0 comments on commit 6c386da

Please sign in to comment.