Skip to content

Commit

Permalink
fix: make sure dfly_bench reliably connects (#3802)
Browse files Browse the repository at this point in the history
1. Issue ping upon connect, add a comment why.
2. log error if dfly_bench disconnects before all the requests were processed.
3. Refactor memcache parsing code into ParseMC function.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Sep 26, 2024
1 parent 3945b7e commit 6a13329
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 47 deletions.
116 changes: 70 additions & 46 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class Driver {
private:
void PopRequest();
void ReceiveFb();
void ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf);
void ParseRESP();
void ParseMC();

struct Req {
uint64_t start;
Expand All @@ -219,6 +220,9 @@ class Driver {
fb2::Fiber receive_fb_;
queue<Req> reqs_;
fb2::CondVarAny cnd_;

facade::RedisParser parser_{1 << 16, false};
io::IoBuf io_buf_{512};
};

// Per thread client.
Expand Down Expand Up @@ -333,6 +337,19 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
int yes = 1;
CHECK_EQ(0, setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
}

// TCP Connect does not ensure that the connection was indeed accepted by the server.
// if server backlog is too short the connection will get stuck in the accept queue.
// Therefore, we send a ping command to ensure that every connection got connected.
ec = socket_->Write(io::Buffer("ping\r\n"));
CHECK(!ec);

uint8_t buf[128];
auto res_sz = socket_->Recv(io::MutableBytes(buf));
CHECK(res_sz) << res_sz.error().message();
string_view resp = io::View(io::Bytes(buf, *res_sz));
CHECK(absl::EndsWith(resp, "\r\n")) << resp;

receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); });
}

Expand Down Expand Up @@ -385,7 +402,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
CHECK(!ec) << ec.message();
}

const int finish = absl::GetCurrentTimeNanos();
int64_t finish = absl::GetCurrentTimeNanos();
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took "
<< StrFormat("%.1fs", double(finish - start) / 1000000000)
<< ". Waiting for server processing";
Expand Down Expand Up @@ -426,71 +443,40 @@ void Driver::PopRequest() {
}

void Driver::ReceiveFb() {
facade::RedisParser parser{1 << 16, false};
io::IoBuf io_buf{512};

unsigned blob_len = 0;

while (true) {
io_buf.EnsureCapacity(256);
auto buf = io_buf.AppendBuffer();
io_buf_.EnsureCapacity(256);
auto buf = io_buf_.AppendBuffer();
VLOG(2) << "Socket read: " << reqs_.size();

::io::Result<size_t> recv_sz = socket_->Recv(buf);
if (!recv_sz && FiberSocketBase::IsConnClosed(recv_sz.error())) {
LOG_IF(DFATAL, !reqs_.empty())
<< "Broke with " << reqs_.size() << " requests, received: " << received_;
// clear reqs - to prevent Driver::Run block on them indefinitely.
decltype(reqs_)().swap(reqs_);
break;
}

CHECK(recv_sz) << recv_sz.error().message();
io_buf.CommitWrite(*recv_sz);
io_buf_.CommitWrite(*recv_sz);

if (protocol == RESP) {
ParseRESP(&parser, &io_buf);
ParseRESP();
} else {
// MC_TEXT
while (true) {
string_view line = FindLine(io_buf.InputBuffer());
if (line.empty())
break;
CHECK_EQ(line.back(), '\n');
if (line == "STORED\r\n" || line == "END\r\n") {
PopRequest();
blob_len = 0;
} else if (absl::StartsWith(line, "VALUE")) {
// last token is a blob length.
auto it = line.rbegin();
while (it != line.rend() && *it != ' ')
++it;
size_t len = it - line.rbegin() - 2;
const char* start = &(*it) + 1;
if (!absl::SimpleAtoi(string(start, len), &blob_len)) {
LOG(ERROR) << "Invalid blob len " << line;
return;
}
++stats_.hit_count;
} else if (absl::StartsWith(line, "SERVER_ERROR")) {
++stats_.num_errors;
PopRequest();
blob_len = 0;
} else {
auto handle = socket_->native_handle();
CHECK_EQ(blob_len + 2, line.size()) << line;
blob_len = 0;
VLOG(2) << "Got line " << handle << ": " << line;
}
io_buf.ConsumeInput(line.size());
}
ParseMC();
}
}
VLOG(1) << "ReceiveFb done";
}

void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) {
void Driver::ParseRESP() {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser->Parse(io_buf->InputBuffer(), &consumed, &parse_args);
result = parser_.Parse(io_buf_.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (parse_args[0].type == facade::RespExpr::ERROR) {
++stats_.num_errors;
Expand All @@ -500,10 +486,48 @@ void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) {
parse_args.clear();
PopRequest();
}
io_buf->ConsumeInput(consumed);
io_buf_.ConsumeInput(consumed);
} while (result == RedisParser::OK);
}

void Driver::ParseMC() {
unsigned blob_len = 0;

while (true) {
string_view line = FindLine(io_buf_.InputBuffer());
if (line.empty())
break;

CHECK_EQ(line.back(), '\n');
if (line == "STORED\r\n" || line == "END\r\n") {
PopRequest();
blob_len = 0;
} else if (absl::StartsWith(line, "VALUE")) {
// last token is a blob length.
auto it = line.rbegin();
while (it != line.rend() && *it != ' ')
++it;
size_t len = it - line.rbegin() - 2;
const char* start = &(*it) + 1;
if (!absl::SimpleAtoi(string(start, len), &blob_len)) {
LOG(ERROR) << "Invalid blob len " << line;
return;
}
++stats_.hit_count;
} else if (absl::StartsWith(line, "SERVER_ERROR")) {
++stats_.num_errors;
PopRequest();
blob_len = 0;
} else {
auto handle = socket_->native_handle();
CHECK_EQ(blob_len + 2, line.size()) << line;
blob_len = 0;
VLOG(2) << "Got line " << handle << ": " << line;
}
io_buf_.ConsumeInput(line.size());
}
}

void TLocalClient::Connect(tcp::endpoint ep) {
VLOG(2) << "Connecting client...";
vector<fb2::Fiber> fbs(drivers_.size());
Expand Down
2 changes: 1 addition & 1 deletion src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ ABSL_FLAG(bool, force_epoll, false,
ABSL_FLAG(bool, version_check, true,
"If true, Will monitor for new releases on Dragonfly servers once a day.");

ABSL_FLAG(uint16_t, tcp_backlog, 128, "TCP listen(2) backlog parameter.");
ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter.");

using namespace util;
using namespace facade;
Expand Down

0 comments on commit 6a13329

Please sign in to comment.