Skip to content

Commit

Permalink
chore: SinkReplyBuilder2 with vec batching (#3454)
Browse files Browse the repository at this point in the history
  • Loading branch information
dranikpg authored Aug 7, 2024
1 parent 8587377 commit 1b1a83d
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 11 deletions.
77 changes: 67 additions & 10 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ void SinkReplyBuilder::SendError(OpStatus status) {
}
}

void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
absl::FixedArray<iovec, 16> arr(msg_vec.size());

for (unsigned i = 0; i < msg_vec.size(); ++i) {
arr[i].iov_base = const_cast<char*>(msg_vec[i].data());
arr[i].iov_len = msg_vec[i].size();
}
Send(arr.data(), msg_vec.size());
}

void SinkReplyBuilder::StartAggregate() {
DVLOG(1) << "StartAggregate";
should_aggregate_ = true;
Expand Down Expand Up @@ -203,6 +193,73 @@ size_t SinkReplyBuilder::UsedMemory() const {
return dfly::HeapSize(batch_);
}

void SinkReplyBuilder2::Write(std::string_view str) {
DCHECK(scoped_);
if (str.size() >= 32)
WriteRef(str);
else
WritePiece(str);
}

char* SinkReplyBuilder2::ReservePiece(size_t size) {
if (buffer_.AppendBuffer().size() <= size)
Flush();

char* dest = reinterpret_cast<char*>(buffer_.AppendBuffer().data());
if (vecs_.empty() || !IsInBuf(vecs_.back().iov_base))
NextVec({dest, 0});

return dest;
}

void SinkReplyBuilder2::CommitPiece(size_t size) {
DCHECK(IsInBuf(vecs_.back().iov_base));

buffer_.CommitWrite(size);
vecs_.back().iov_len += size;
total_size_ += size;
}

void SinkReplyBuilder2::WritePiece(std::string_view str) {
char* dest = ReservePiece(str.size());
memcpy(dest, str.data(), str.size());
CommitPiece(str.size());
}

void SinkReplyBuilder2::WriteRef(std::string_view str) {
NextVec(str);
total_size_ += str.size();
}

void SinkReplyBuilder2::Flush() {
auto ec = sink_->Write(vecs_.data(), vecs_.size());
if (ec)
ec_ = ec;

buffer_.Clear();
vecs_.clear();
total_size_ = 0;
}

void SinkReplyBuilder2::FinishScope() {
// If batching or aggregations are not enabled, flush
Flush();

// TODO: otherwise iterate over vec_ and copy items to buffer_
// whilst also updating their pointers
}

bool SinkReplyBuilder2::IsInBuf(const void* ptr) const {
auto ib = buffer_.InputBuffer();
return ptr >= ib.data() && ptr <= ib.data() + ib.size();
}

void SinkReplyBuilder2::NextVec(std::string_view str) {
if (vecs_.size() >= IOV_MAX)
Flush();
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
}

MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
}

Expand Down
51 changes: 50 additions & 1 deletion src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ class SinkReplyBuilder {

protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
void SendRawVec(absl::Span<const std::string_view> msg_vec);

void Send(const iovec* v, uint32_t len);

Expand All @@ -168,6 +167,56 @@ class SinkReplyBuilder {
bool send_active_ : 1;
};

// TMP: New version of reply builder that batches not only to a buffer, but also iovecs.
class SinkReplyBuilder2 {
explicit SinkReplyBuilder2(io::Sink* sink) : sink_(sink) {
}

// Use with care: All send calls within a scope must keep their data alive!
// This allows to fully eliminate copies for batches of data by using vectorized io.
struct ReplyScope {
explicit ReplyScope(SinkReplyBuilder2* rb) : prev_scoped(rb->scoped_), rb(rb) {
rb->scoped_ = true;
}
~ReplyScope() {
if (!prev_scoped) {
rb->scoped_ = false;
rb->FinishScope();
}
}

private:
bool prev_scoped;
SinkReplyBuilder2* rb;
};

public:
void Write(std::string_view str);

protected:
void Flush(); // Send all accumulated data and reset to clear state
void FinishScope(); // Called when scope ends

char* ReservePiece(size_t size); // Reserve size bytes from buffer
void CommitPiece(size_t size); // Mark size bytes from buffer as used
void WritePiece(std::string_view str); // Reserve + memcpy + Commit

void WriteRef(std::string_view str); // Add iovec bypassing buffer

bool IsInBuf(const void* ptr) const; // checks if ptr is part of buffer_
void NextVec(std::string_view str);

private:
io::Sink* sink_;
std::error_code ec_;

bool scoped_;

size_t total_size_ = 0; // sum of vec_ lengths
base::IoBuf buffer_;
std::vector<iovec> vecs_;
};

class MCReplyBuilder : public SinkReplyBuilder {
bool noreply_;

Expand Down

0 comments on commit 1b1a83d

Please sign in to comment.