Skip to content

Commit

Permalink
chore: a clean-up of direct_fd code
Browse files Browse the repository at this point in the history
Fixing the registration code - the direct fd table is not resizable.
Add a test reproducing the socket leakage when using direct fds.
Opened axboe/liburing#1192 to follow up.

Finally, disabled direct fds for sockets.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jul 29, 2024
1 parent 288cb31 commit 9967c5a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 43 deletions.
21 changes: 21 additions & 0 deletions util/fibers/fiber_socket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,27 @@ TEST_P(FiberSocketTest, NotEmpty) {

proactor_->Await([&] { std::ignore = sock->Close(); });
}

TEST_P(FiberSocketTest, OpenMany) {
bool use_uring = GetParam() == "uring";
if (!use_uring) {
GTEST_SKIP() << "OpenMany requires iouring";
return;
}

proactor_->Await([&] {
for (unsigned i = 0; i < 10000; ++i) {
UringProactor* up = static_cast<UringProactor*>(proactor_.get());
UringSocket sock(up);
auto ec = sock.Create(AF_INET);
ASSERT_FALSE(ec);
ec = sock.Close();
ASSERT_FALSE(ec);
usleep(100);
}
});
}

#endif

} // namespace fb2
Expand Down
50 changes: 12 additions & 38 deletions util/fibers/uring_proactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
#include "util/fibers/detail/scheduler.h"
#include "util/fibers/uring_socket.h"

// TODO: we need to fix register_fds_ resize flow.
// Also we must ensure that there is no leakage of socket descriptors with enable_direct_fd enabled.
// We must ensure that there is no leakage of socket descriptors with enable_direct_fd enabled.
// See AcceptServerTest.Shutdown to trigger direct fd resize.
ABSL_FLAG(bool, enable_direct_fd, false, "If true tries to register file descriptors");
ABSL_FLAG(uint32_t, uring_direct_table_len, 0, "If positive create direct fd table of this length");

#define URING_CHECK(x) \
do { \
Expand Down Expand Up @@ -91,7 +90,7 @@ UringProactor::~UringProactor() {
}
}

if (direct_fd_) {
if (!register_fds_.empty()) {
io_uring_unregister_files(&ring_);
}
io_uring_queue_exit(&ring_);
Expand All @@ -117,13 +116,8 @@ void UringProactor::Init(unsigned pool_index, size_t ring_size, int wq_fd) {

msgring_f_ = 0;
poll_first_ = 0;
direct_fd_ = 0;
buf_ring_f_ = 0;

if (kver.kernel > 5 || (kver.kernel == 5 && kver.major >= 15)) {
direct_fd_ = absl::GetFlag(FLAGS_enable_direct_fd); // failswitch to disable direct fds.
}

// If we setup flags that kernel does not recognize, it fails the setup call.
if (kver.kernel > 5 || (kver.kernel == 5 && kver.major >= 19)) {
params.flags |= IORING_SETUP_SUBMIT_ALL;
Expand All @@ -132,6 +126,9 @@ void UringProactor::Init(unsigned pool_index, size_t ring_size, int wq_fd) {

// io_uring_register_buf_ring is supported since 5.19.
buf_ring_f_ = 1;

// FLAGS_uring_direct_table_len is a failswitch to disable direct fds.
register_fds_.resize(absl::GetFlag(FLAGS_uring_direct_table_len), -1);
}

if (kver.kernel >= 6 && kver.major >= 1) {
Expand Down Expand Up @@ -174,8 +171,7 @@ void UringProactor::Init(unsigned pool_index, size_t ring_size, int wq_fd) {
int res = io_uring_register_ring_fd(&ring_);
VLOG_IF(1, res < 0) << "io_uring_register_ring_fd failed: " << -res;

if (direct_fd_) {
register_fds_.resize(512, -1);
if (!register_fds_.empty()) {
int res = io_uring_register_files(&ring_, register_fds_.data(), register_fds_.size());
CHECK_EQ(0, res);
}
Expand Down Expand Up @@ -524,34 +520,13 @@ void UringProactor::CancelPeriodicInternal(PeriodicItem* item) {
}

unsigned UringProactor::RegisterFd(int source_fd) {
if (!direct_fd_)
if (register_fds_.empty())
return kInvalidDirectFd;

// TODO: to create a linked list from free fds.
auto next = std::find(register_fds_.begin() + next_free_index_, register_fds_.end(), -1);
if (next == register_fds_.end()) {
size_t prev_sz = register_fds_.size();
DCHECK_GT(prev_sz, 0u);

// enlarge direct fds table.
register_fds_.resize(prev_sz * 2, -1);
register_fds_[prev_sz] = source_fd; // source fd will map to prev_sz index.
next_free_index_ = prev_sz + 1;

// TODO: this does not work because it seems we need to unregister first
// to be able re-register. See
int res = io_uring_register_files(&ring_, register_fds_.data(), register_fds_.size());
if (res < 0) {
LOG(ERROR) << "Error registering files: " << -res << " " << SafeErrorMessage(-res) << " "
<< prev_sz;
register_fds_.resize(prev_sz);
next_free_index_ = prev_sz;
return kInvalidDirectFd;
}
++direct_fds_cnt_;

return prev_sz;
}
if (next == register_fds_.end()) // it is not possible to resize this table.
return kInvalidDirectFd;

*next = source_fd;
next_free_index_ = next - register_fds_.begin();
Expand All @@ -567,17 +542,16 @@ unsigned UringProactor::RegisterFd(int source_fd) {
}

int UringProactor::TranslateDirectFd(unsigned fixed_fd) const {
DCHECK(direct_fd_);
DCHECK_LT(fixed_fd, register_fds_.size());
DCHECK_GE(register_fds_[fixed_fd], 0);

return register_fds_[fixed_fd];
}

int UringProactor::UnregisterFd(unsigned fixed_fd) {
DCHECK(direct_fd_);
DCHECK(!register_fds_.empty());

if (!direct_fd_)
if (register_fds_.empty())
return -1;

DCHECK_LT(fixed_fd, register_fds_.size());
Expand Down
5 changes: 2 additions & 3 deletions util/fibers/uring_proactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class UringProactor : public ProactorBase {
}

bool HasDirectFD() const {
return direct_fd_;
return !register_fds_.empty();
}

int ring_fd() const {
Expand Down Expand Up @@ -151,9 +151,8 @@ class UringProactor : public ProactorBase {

uint8_t msgring_f_ : 1;
uint8_t poll_first_ : 1;
uint8_t direct_fd_ : 1;
uint8_t buf_ring_f_ : 1;
uint8_t : 4;
uint8_t : 5;

EventCount sqe_avail_;

Expand Down
7 changes: 5 additions & 2 deletions util/fibers/uring_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ using nonstd::make_unexpected;

namespace {

// Disable direct fd for sockets due to https://github.com/axboe/liburing/issues/1192
constexpr bool kEnableDirect = false;

inline ssize_t posix_err_wrap(ssize_t res, UringSocket::error_code* ec) {
if (res == -1) {
*ec = UringSocket::error_code(errno, system_category());
Expand Down Expand Up @@ -63,7 +66,7 @@ error_code UringSocket::Create(unsigned short protocol_family) {
CHECK(proactor && is_direct_fd_ == 0);
DCHECK(proactor->InMyThread());

if (proactor->HasDirectFD()) {
if (kEnableDirect && proactor->HasDirectFD()) {
int source_fd = ShiftedFd(); // linux fd.
unsigned direct_fd = proactor->RegisterFd(source_fd);
if (direct_fd != UringProactor::kInvalidDirectFd) {
Expand Down Expand Up @@ -427,7 +430,7 @@ auto UringSocket::native_handle() const -> native_handle_type {
void UringSocket::OnSetProactor() {
UringProactor* proactor = GetProactor();

if (proactor->HasDirectFD() && is_direct_fd_ == 0 && fd_ >= 0) {
if (kEnableDirect && proactor->HasDirectFD() && is_direct_fd_ == 0 && fd_ >= 0) {
// Using direct descriptors has consistent positive impact on CPU usage of the server.
// Checked with echo_server with server side sockets.
int source_fd = ShiftedFd(); // linux fd.
Expand Down

0 comments on commit 9967c5a

Please sign in to comment.