Skip to content

Commit

Permalink
feat(rdb_load): add support for loading huge sets (#3807)
Browse files Browse the repository at this point in the history
* feat(rdb_load): add support for loading huge sets
  • Loading branch information
andydunstall authored Sep 29, 2024
1 parent 73002dc commit 520dea0
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 67 deletions.
220 changes: 153 additions & 67 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ using namespace tiering::literals;
namespace {

constexpr size_t kYieldPeriod = 50000;
constexpr size_t kMaxBlobLen = 1ULL << 16;
constexpr size_t kMaxBlobLen = 1ULL << 12;
constexpr char kErrCat[] = "dragonfly.rdbload";

inline void YieldIfNeeded(size_t i) {
Expand Down Expand Up @@ -384,7 +384,8 @@ io::Result<base::IoBuf*> Lz4Decompress::Decompress(std::string_view data) {

class RdbLoaderBase::OpaqueObjLoader {
public:
OpaqueObjLoader(int rdb_type, PrimeValue* pv) : rdb_type_(rdb_type), pv_(pv) {
OpaqueObjLoader(int rdb_type, PrimeValue* pv, LoadConfig config)
: rdb_type_(rdb_type), pv_(pv), config_(config) {
}

void operator()(long long val) {
Expand Down Expand Up @@ -428,6 +429,7 @@ class RdbLoaderBase::OpaqueObjLoader {
int rdb_type_;
base::PODArray<char> tset_blob_;
PrimeValue* pv_;
LoadConfig config_;
};

RdbLoaderBase::RdbLoaderBase() : origin_mem_buf_{16_KB} {
Expand Down Expand Up @@ -512,10 +514,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
auto cleanup = absl::MakeCleanup([&] {
if (sdsele)
sdsfree(sdsele);
if (is_intset) {
zfree(inner_obj);
} else {
CompactObj::DeleteMR<StringSet>(inner_obj);
if (inner_obj) {
if (is_intset) {
zfree(inner_obj);
} else {
CompactObj::DeleteMR<StringSet>(inner_obj);
}
}
});

Expand All @@ -535,15 +539,31 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
return true;
});
} else {
StringSet* set = CompactObj::AllocateMR<StringSet>();
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
inner_obj = set;
StringSet* set;
if (config_.append) {
// Note we only use append_ when the set size exceeds kMaxBlobLen,
// which is greater than SetFamily::MaxIntsetEntries so we'll always use
// a string set not an int set.
if (pv_->ObjType() != OBJ_SET) {
LOG(ERROR) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(ERROR) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}

// TODO: to move this logic to set_family similarly to ConvertToStrSet.
set = static_cast<StringSet*>(pv_->RObjPtr());
} else {
set = CompactObj::AllocateMR<StringSet>();
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
inner_obj = set;

/* It's faster to expand the dict to the right size asap in order
* to avoid rehashing */
set->Reserve(len);
// Expand the set up front to avoid rehashing.
set->Reserve((config_.reserve > len) ? config_.reserve : len);
}

size_t increment = 1;
if (rdb_type_ == RDB_TYPE_SET_WITH_EXPIRY) {
Expand Down Expand Up @@ -583,7 +603,10 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {

if (ec_)
return;
pv_->InitRobj(OBJ_SET, is_intset ? kEncodingIntSet : kEncodingStrMap2, inner_obj);

if (!config_.append) {
pv_->InitRobj(OBJ_SET, is_intset ? kEncodingIntSet : kEncodingStrMap2, inner_obj);
}
std::move(cleanup).Cancel();
}

Expand Down Expand Up @@ -1487,25 +1510,36 @@ auto RdbLoaderBase::ReadLzf() -> io::Result<LzfString> {

auto RdbLoaderBase::ReadSet(int rdbtype) -> io::Result<OpaqueObj> {
size_t len;
SET_OR_UNEXPECT(LoadLen(NULL), len);

if (rdbtype == RDB_TYPE_SET_WITH_EXPIRY) {
len *= 2;
if (pending_read_.remaining > 0) {
len = pending_read_.remaining;
} else {
SET_OR_UNEXPECT(LoadLen(NULL), len);
if (rdbtype == RDB_TYPE_SET_WITH_EXPIRY) {
len *= 2;
}
pending_read_.reserve = len;
}

// Limit each read to kMaxBlobLen elements.
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < load_trace->arr.size(); i++) {
size_t n = std::min(len - i * kMaxBlobLen, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; j++) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec) {
return make_unexpected(ec);
}
load_trace->arr.resize(1);
size_t n = std::min(len, kMaxBlobLen);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; i++) {
error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var);
if (ec) {
return make_unexpected(ec);
}
}

// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (len > n) {
pending_read_.remaining = len - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}

return OpaqueObj{std::move(load_trace), rdbtype};
}

Expand Down Expand Up @@ -2473,7 +2507,12 @@ void RdbLoader::FlushAllShards() {
}

std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv);
return RdbLoaderBase::FromOpaque(opaque, LoadConfig{}, pv);
}

std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig config,
CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv, config);
std::visit(visitor, opaque.obj);

return visitor.ec();
Expand All @@ -2491,7 +2530,20 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {

for (const auto* item : ib) {
PrimeValue pv;
if (ec_ = FromOpaque(item->val, &pv); ec_) {
PrimeValue* pv_ptr = &pv;

// If we're appending the item to an existing key, first load the
// object.
if (item->load_config.append) {
auto res = db_slice.FindMutable(db_cntx, item->key);
if (!IsValid(res.it)) {
LOG(ERROR) << "Count not to find append key '" << item->key << "' in DB " << db_ind;
continue;
}
pv_ptr = &res.it->second;
}

if (ec_ = FromOpaque(item->val, item->load_config, pv_ptr); ec_) {
if ((*ec_).value() == errc::empty_key) {
auto error = error_msg(item, db_ind);
if (RdbTypeAllowedEmpty(item->val.rdb_type)) {
Expand All @@ -2505,6 +2557,9 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
stop_early_ = true;
break;
}
if (item->load_config.append) {
continue;
}
// We need this extra check because we don't return empty_key
if (!pv.TagAllowsEmptyValue() && pv.Size() == 0) {
LOG(WARNING) << error_msg(item, db_ind);
Expand Down Expand Up @@ -2549,29 +2604,79 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
// load with different number of shards which makes database resizing unfeasible.
}

// Loads the next key/val pair.
//
// Huge objects may be loaded in parts, where only a subset of elements are
// loaded at a time. This reduces the memory required to load huge objects and
// prevents LoadItemsBuffer blocking. (Note so far only RDB_TYPE_SET and
// RDB_TYPE_SET_WITH_EXPIRY support partial reads).
error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
// We return the item in LoadItemsBuffer.
Item* item = item_queue_.Pop();
std::string key;
SET_OR_RETURN(ReadKey(), key);

if (item == nullptr) {
item = new Item;
}
auto cleanup = absl::Cleanup([item] { delete item; });
do {
// If there is a cached Item in the free pool, take it, otherwise allocate
// a new Item (LoadItemsBuffer returns free items).
Item* item = item_queue_.Pop();
if (item == nullptr) {
item = new Item;
}
// Delete the item if we fail to load the key/val pair.
auto cleanup = absl::Cleanup([item] { delete item; });

// Read key
SET_OR_RETURN(ReadKey(), item->key);
item->load_config.append = pending_read_.remaining > 0;

// Read value
error_code ec = ReadObj(type, &item->val);
if (ec) {
VLOG(1) << "ReadObj error " << ec << " for key " << item->key;
return ec;
}
error_code ec = ReadObj(type, &item->val);
if (ec) {
VLOG(1) << "ReadObj error " << ec << " for key " << key;
return ec;
}

// If the key can be discarded, we must still continue to read the
// object from the RDB so we can read the next key.
if (ShouldDiscardKey(key, settings)) {
continue;
}

if (pending_read_.remaining > 0) {
item->key = key;
} else {
// Avoid copying the key if this is the last read of the object.
item->key = std::move(key);
}

item->load_config.reserve = pending_read_.reserve;
// Clear 'reserve' as we must only set when the object is first
// initialized.
pending_read_.reserve = 0;

item->is_sticky = settings->is_sticky;
item->has_mc_flags = settings->has_mc_flags;
item->mc_flags = settings->mc_flags;

ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];

out_buf.emplace_back(std::move(item));
std::move(cleanup).Cancel();

constexpr size_t kBufSize = 64;
if (out_buf.size() >= kBufSize) {
// Despite being async, this function can block if the shard queue is full.
FlushShardAsync(sid);
}
} while (pending_read_.remaining > 0);

return kOk;
}

bool RdbLoader::ShouldDiscardKey(std::string_view key, ObjSettings* settings) const {
if (!load_unowned_slots_ && cluster::IsClusterEnabled()) {
const cluster::ClusterConfig* cluster_config = cluster::ClusterFamily::cluster_config();
if (cluster_config != nullptr && !cluster_config->IsMySlot(item->key)) {
return kOk; // Ignoring item
if (cluster_config != nullptr && !cluster_config->IsMySlot(key)) {
return true;
}
}

Expand All @@ -2583,31 +2688,12 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
* Similarly if the RDB is the preamble of an AOF file, we want to
* load all the keys as they are, since the log of operations later
* assume to work in an exact keyspace state. */

if (ServerState::tlocal()->is_master && settings->has_expired) {
VLOG(2) << "Expire key: " << item->key;
return kOk;
}

item->is_sticky = settings->is_sticky;
item->has_mc_flags = settings->has_mc_flags;
item->mc_flags = settings->mc_flags;

ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];

out_buf.emplace_back(item);
std::move(cleanup).Cancel();

constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
// Despite being async, this function can block if the shard queue is full.
FlushShardAsync(sid);
VLOG(2) << "Expire key: " << key;
return true;
}

return kOk;
return false;
}

void RdbLoader::LoadScriptFromAux(string&& body) {
Expand Down
33 changes: 33 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,40 @@ class RdbLoaderBase {
}
};

// Contains the state of a pending partial read.
//
// This us used to load huge objects in parts (only loading a subset of
// elements at a time) (see LoadKeyValPair).
struct PendingRead {
// Number of elements in the object to reserve.
//
// Used to reserve the elements in a huge object up front, then append
// in next loads.
size_t reserve = 0;

// Number of elements remaining in the object.
size_t remaining = 0;
};

struct LoadConfig {
// Number of elements in the object to reserve.
//
// Used to reserve the elements in a huge object up front, then append
// in next loads.
size_t reserve = 0;

// Whether to append to the existing object or initialize a new object.
bool append = false;
};

class OpaqueObjLoader;

io::Result<uint8_t> FetchType();

template <typename T> io::Result<T> FetchInt();

static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv);
static std::error_code FromOpaque(const OpaqueObj& opaque, LoadConfig config, CompactObj* pv);

io::Result<uint64_t> LoadLen(bool* is_encoded);
std::error_code FetchBuf(size_t size, void* dest);
Expand Down Expand Up @@ -173,6 +200,7 @@ class RdbLoaderBase {
JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;
RdbVersion rdb_version_ = RDB_VERSION;
PendingRead pending_read_;
};

class RdbLoader : protected RdbLoaderBase {
Expand Down Expand Up @@ -247,6 +275,8 @@ class RdbLoader : protected RdbLoaderBase {
bool has_mc_flags = false;
uint32_t mc_flags = 0;

LoadConfig load_config;

friend void MPSC_intrusive_store_next(Item* dest, Item* nxt) {
dest->next.store(nxt, std::memory_order_release);
}
Expand All @@ -261,6 +291,8 @@ class RdbLoader : protected RdbLoaderBase {
struct ObjSettings;

std::error_code LoadKeyValPair(int type, ObjSettings* settings);
// Returns whether to discard the read key pair.
bool ShouldDiscardKey(std::string_view key, ObjSettings* settings) const;
void ResizeDb(size_t key_num, size_t expire_num);
std::error_code HandleAux();

Expand Down Expand Up @@ -301,6 +333,7 @@ class RdbLoader : protected RdbLoaderBase {
// Callback when receiving RDB_OPCODE_FULLSYNC_END
std::function<void()> full_sync_cut_cb;

// A free pool of allocated unused items.
base::MPSCIntrusiveQueue<Item> item_queue_;
};

Expand Down
Loading

0 comments on commit 520dea0

Please sign in to comment.