Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangHuiGui committed Mar 27, 2024
1 parent 6cff6f2 commit b8db90c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 44 deletions.
66 changes: 28 additions & 38 deletions cpp/src/arrow/compute/key_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,38 +379,37 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool combine_hashes, uint32_t
}

Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
LightContext* ctx, uint32_t* hashes) {
uint32_t num_rows = static_cast<uint32_t>(cols[0].length());

constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
const auto alloc_batch_size = std::min(num_rows, max_batch_size);
LightContext* ctx, uint32_t* hashes) {
auto num_rows = static_cast<uint32_t>(cols[0].length());
const auto alloc_batch_size =
std::min(num_rows, static_cast<uint32_t>(util::MiniBatch::kMiniBatchLength));

// pre calculate alloc size in TempVectorStack for hash_temp_buf, null_hash_temp_buf
// and null_indices_buf
const auto alloc_hash_temp_buf =
util::TempVectorStack::EstimateAllocSize(alloc_batch_size * sizeof(uint32_t));
util::TempVectorStack::EstimateAllocationSize(alloc_batch_size * sizeof(uint32_t));
const auto alloc_for_null_indices_buf =
util::TempVectorStack::EstimateAllocSize(alloc_batch_size * sizeof(uint16_t));
util::TempVectorStack::EstimateAllocationSize(alloc_batch_size * sizeof(uint16_t));
const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;

std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
if (!ctx->stack) {
temp_stack = std::make_shared<util::TempVectorStack>();
std::unique_ptr<util::TempVectorStack> temp_stack(nullptr);
auto stack = ctx->stack;
if (!stack) {
temp_stack = std::make_unique<util::TempVectorStack>();
RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
ctx->stack = temp_stack.get();
stack = temp_stack.get();
} else {
RETURN_NOT_OK(ctx->stack->CheckAllocOverflow(alloc_size));
RETURN_NOT_OK(stack->CheckAllocationOverflow(alloc_size));
}

auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, alloc_batch_size);
auto hash_temp_buf = util::TempVectorHolder<uint32_t>(stack, alloc_batch_size);
uint32_t* hash_temp = hash_temp_buf.mutable_data();

auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, alloc_batch_size);
auto null_indices_buf = util::TempVectorHolder<uint16_t>(stack, alloc_batch_size);
uint16_t* null_indices = null_indices_buf.mutable_data();
int num_null_indices;

auto null_hash_temp_buf =
util::TempVectorHolder<uint32_t>(ctx->stack, alloc_batch_size);
auto null_hash_temp_buf = util::TempVectorHolder<uint32_t>(stack, alloc_batch_size);
uint32_t* null_hash_temp = null_hash_temp_buf.mutable_data();

for (uint32_t first_row = 0; first_row < num_rows;) {
Expand Down Expand Up @@ -479,10 +478,6 @@ Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,

first_row += batch_size_next;
}

if (temp_stack) {
ctx->stack = nullptr;
}
return Status::OK();
}

Expand Down Expand Up @@ -846,33 +841,32 @@ void Hashing64::HashFixed(bool combine_hashes, uint32_t num_keys, uint64_t key_l

Status Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
LightContext* ctx, uint64_t* hashes) {
uint32_t num_rows = static_cast<uint32_t>(cols[0].length());

constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
const auto alloc_batch_size = std::min(num_rows, max_batch_size);
auto num_rows = static_cast<uint32_t>(cols[0].length());
const auto alloc_batch_size =
std::min(num_rows, static_cast<uint32_t>(util::MiniBatch::kMiniBatchLength));

// pre calculate alloc size in TempVectorStack for null_indices_buf, null_hash_temp_buf
const auto alloc_for_null_hash_temp_buf =
util::TempVectorStack::EstimateAllocSize(alloc_batch_size * sizeof(uint64_t));
util::TempVectorStack::EstimateAllocationSize(alloc_batch_size * sizeof(uint64_t));
const auto alloc_for_null_indices_buf =
util::TempVectorStack::EstimateAllocSize(alloc_batch_size * sizeof(uint16_t));
util::TempVectorStack::EstimateAllocationSize(alloc_batch_size * sizeof(uint16_t));
const auto alloc_size = alloc_for_null_hash_temp_buf + alloc_for_null_indices_buf;

std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
if (!ctx->stack) {
temp_stack = std::make_shared<util::TempVectorStack>();
std::unique_ptr<util::TempVectorStack> temp_stack(nullptr);
auto stack = ctx->stack;
if (!stack) {
temp_stack = std::make_unique<util::TempVectorStack>();
RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
ctx->stack = temp_stack.get();
stack = temp_stack.get();
} else {
RETURN_NOT_OK(ctx->stack->CheckAllocOverflow(alloc_size));
RETURN_NOT_OK(stack->CheckAllocationOverflow(alloc_size));
}

auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, alloc_batch_size);
auto null_indices_buf = util::TempVectorHolder<uint16_t>(stack, alloc_batch_size);
uint16_t* null_indices = null_indices_buf.mutable_data();
int num_null_indices;

auto null_hash_temp_buf =
util::TempVectorHolder<uint64_t>(ctx->stack, alloc_batch_size);
auto null_hash_temp_buf = util::TempVectorHolder<uint64_t>(stack, alloc_batch_size);
uint64_t* null_hash_temp = null_hash_temp_buf.mutable_data();

for (uint32_t first_row = 0; first_row < num_rows;) {
Expand Down Expand Up @@ -938,10 +932,6 @@ Status Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,

first_row += batch_size_next;
}

if (temp_stack) {
ctx->stack = nullptr;
}
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/compute/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ using internal::CpuInfo;
namespace util {

void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) {
const auto estimate_size = EstimateAllocSize(num_bytes);
const auto estimate_size = EstimateAllocationSize(num_bytes);
// XXX cannot return a regular Status because most consumers do not either.
ARROW_DCHECK_OK(CheckAllocOverflow(estimate_size));
ARROW_CHECK_OK(CheckAllocationOverflow(estimate_size));
int64_t new_top = top_ + estimate_size;
*data = buffer_->mutable_data() + top_ + sizeof(uint64_t);
// We set 8 bytes before the beginning of the allocated range and
Expand All @@ -58,7 +58,7 @@ void TempVectorStack::release(int id, uint32_t num_bytes) {
--num_vectors_;
}

Status TempVectorStack::CheckAllocOverflow(int64_t alloc_size) {
Status TempVectorStack::CheckAllocationOverflow(int64_t alloc_size) {
// Stack overflow check (see GH-39582).
if ((alloc_size + top_) > buffer_size_) {
return Status::Invalid("TempVectorStack alloc overflow. (Actual ", buffer_size_,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/compute/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,19 @@ class ARROW_EXPORT TempVectorStack {
Status Init(MemoryPool* pool, int64_t size) {
num_vectors_ = 0;
top_ = 0;
buffer_size_ = EstimateAllocSize(size);
buffer_size_ = EstimateAllocationSize(size);
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
// Ensure later operations don't accidentally read uninitialized memory.
std::memset(buffer->mutable_data(), 0xFF, size);
buffer_ = std::move(buffer);
return Status::OK();
}

static int64_t EstimateAllocSize(int64_t size) {
static int64_t EstimateAllocationSize(int64_t size) {
return PaddedAllocationSize(size) + 2 * sizeof(uint64_t);
}

Status CheckAllocOverflow(int64_t alloc_size);
Status CheckAllocationOverflow(int64_t alloc_size);

private:
static int64_t PaddedAllocationSize(int64_t num_bytes) {
Expand Down

0 comments on commit b8db90c

Please sign in to comment.