Skip to content

Commit

Permalink
apacheGH-41899: [C++] IPC: Minor enhance the code of writer (apache#4…
Browse files Browse the repository at this point in the history
…1900)

### Rationale for this change

Enhance the code of IPC writer

### What changes are included in this PR?

1. memcpy rather than reinterpret_cast
2. move the buffer rather than copying

### Are these changes tested?

Covered by existing

### Are there any user-facing changes?

No

* GitHub Issue: apache#41899

Authored-by: mwish <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
mapleFU authored Jun 11, 2024
1 parent 03a960d commit 64b1109
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class RecordBatchSerializer {
std::shared_ptr<Buffer> bitmap;
RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(),
options_.memory_pool, &bitmap));
out_->body_buffers.emplace_back(bitmap);
out_->body_buffers.emplace_back(std::move(bitmap));
} else {
// Push a dummy zero-length buffer, not to be copied
out_->body_buffers.emplace_back(kNullBuffer);
Expand Down Expand Up @@ -222,8 +222,9 @@ class RecordBatchSerializer {
RETURN_NOT_OK(
result->Resize(actual_length + sizeof(int64_t), /* shrink_to_fit= */ true));
}
*reinterpret_cast<int64_t*>(result->mutable_data()) =
bit_util::ToLittleEndian(prefixed_length);
int64_t prefixed_length_little_endian = bit_util::ToLittleEndian(prefixed_length);
util::SafeStore(result->mutable_data(), prefixed_length_little_endian);

*out = SliceBuffer(std::move(result), /*offset=*/0, actual_length + sizeof(int64_t));

return Status::OK();
Expand Down Expand Up @@ -415,7 +416,7 @@ class RecordBatchSerializer {
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(GetTruncatedBitmap(array.offset(), array.length(), array.values(),
options_.memory_pool, &data));
out_->body_buffers.emplace_back(data);
out_->body_buffers.emplace_back(std::move(data));
return Status::OK();
}

Expand All @@ -442,7 +443,7 @@ class RecordBatchSerializer {
data->size() - byte_offset);
data = SliceBuffer(data, byte_offset, buffer_length);
}
out_->body_buffers.emplace_back(data);
out_->body_buffers.emplace_back(std::move(data));
return Status::OK();
}

Expand All @@ -464,8 +465,8 @@ class RecordBatchSerializer {
data = SliceBuffer(data, start_offset, slice_length);
}

out_->body_buffers.emplace_back(value_offsets);
out_->body_buffers.emplace_back(data);
out_->body_buffers.emplace_back(std::move(value_offsets));
out_->body_buffers.emplace_back(std::move(data));
return Status::OK();
}

Expand Down Expand Up @@ -566,7 +567,7 @@ class RecordBatchSerializer {
RETURN_NOT_OK(GetTruncatedBuffer(
offset, length, static_cast<int32_t>(sizeof(UnionArray::type_code_t)),
array.type_codes(), options_.memory_pool, &type_codes));
out_->body_buffers.emplace_back(type_codes);
out_->body_buffers.emplace_back(std::move(type_codes));

--max_recursion_depth_;
for (int i = 0; i < array.num_fields(); ++i) {
Expand All @@ -585,7 +586,7 @@ class RecordBatchSerializer {
RETURN_NOT_OK(GetTruncatedBuffer(
offset, length, static_cast<int32_t>(sizeof(UnionArray::type_code_t)),
array.type_codes(), options_.memory_pool, &type_codes));
out_->body_buffers.emplace_back(type_codes);
out_->body_buffers.emplace_back(std::move(type_codes));

--max_recursion_depth_;
const auto& type = checked_cast<const UnionType&>(*array.type());
Expand Down Expand Up @@ -640,7 +641,7 @@ class RecordBatchSerializer {

value_offsets = std::move(shifted_offsets_buffer);
}
out_->body_buffers.emplace_back(value_offsets);
out_->body_buffers.emplace_back(std::move(value_offsets));

// Visit children and slice accordingly
for (int i = 0; i < type.num_fields(); ++i) {
Expand Down

0 comments on commit 64b1109

Please sign in to comment.