Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add defrag logic for zsets #3836

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ pair<void*, bool> DefragIntSet(intset* is, float ratio) {
return {replacement, true};
}

pair<void*, bool> DefragSortedMap(detail::SortedMap* sm, float ratio) {
const bool reallocated = sm->DefragIfNeeded(ratio);
return {sm, reallocated};
}

// Iterates over allocations of internal hash data structures and re-allocates
// them if their pages are underutilized.
// Returns pointer to new object ptr and whether any re-allocations happened.
Expand Down Expand Up @@ -208,6 +213,23 @@ pair<void*, bool> DefragSet(unsigned encoding, void* ptr, float ratio) {
}
}

pair<void*, bool> DefragZSet(unsigned encoding, void* ptr, float ratio) {
switch (encoding) {
// Listpack is stored as a single contiguous array
case OBJ_ENCODING_LISTPACK: {
return DefragListPack((uint8_t*)ptr, ratio);
}

// SKIPLIST really means ScoreMap
case OBJ_ENCODING_SKIPLIST: {
return DefragSortedMap((detail::SortedMap*)ptr, ratio);
}

default:
ABSL_UNREACHABLE();
}
}

inline void FreeObjStream(void* ptr) {
freeStream((stream*)ptr);
}
Expand Down Expand Up @@ -420,19 +442,23 @@ void RobjWrapper::SetString(string_view s, MemoryResource* mr) {
}

bool RobjWrapper::DefragIfNeeded(float ratio) {
auto do_defrag = [this, ratio](auto defrug_fun) mutable {
auto [new_ptr, realloced] = defrug_fun(encoding_, inner_obj_, ratio);
inner_obj_ = new_ptr;
return realloced;
};

if (type() == OBJ_STRING) {
if (zmalloc_page_is_underutilized(inner_obj(), ratio)) {
ReallocateString(tl.local_mr);
return true;
}
} else if (type() == OBJ_HASH) {
auto [new_ptr, realloced] = DefragHash(encoding_, inner_obj_, ratio);
inner_obj_ = new_ptr;
return realloced;
return do_defrag(DefragHash);
} else if (type() == OBJ_SET) {
auto [new_ptr, realloced] = DefragSet(encoding_, inner_obj_, ratio);
inner_obj_ = new_ptr;
return realloced;
return do_defrag(DefragSet);
} else if (type() == OBJ_ZSET) {
return do_defrag(DefragZSet);
}
return false;
}
Expand Down
27 changes: 27 additions & 0 deletions src/core/score_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,31 @@ detail::SdsScorePair ScoreMap::iterator::BreakToPair(void* obj) {
return detail::SdsScorePair(f, GetValue(f));
}

bool ScoreMap::iterator::ReallocIfNeeded(float ratio) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also //todo for me, mock this and test for sanity

// Unwrap all links to correctly call SetObject()
auto* ptr = curr_entry_;
while (ptr->IsLink())
ptr = ptr->AsLink();

auto* obj = ptr->GetObject();
auto [new_obj, realloced] = static_cast<ScoreMap*>(owner_)->ReallocIfNeeded(obj, ratio);
ptr->SetObject(new_obj);

return realloced;
}

pair<sds, bool> ScoreMap::ReallocIfNeeded(void* obj, float ratio) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can extra this to seperate function. It's almost identical as to StringMap... I will follow up on this on separate PR with a small refactor

sds key = (sds)obj;
size_t key_len = sdslen(key);

if (!zmalloc_page_is_underutilized(key, ratio))
return {key, false};

sds newkey = AllocSdsWithSpace(key_len, 8);
memcpy(newkey, key, key_len + 8 + 1);
sdsfree(key);

return {newkey, true};
}

} // namespace dfly
8 changes: 8 additions & 0 deletions src/core/score_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class ScoreMap : public DenseSet {
return BreakToPair(ptr);
}

// Try reducing memory fragmentation of the value by re-allocating. Returns true if
// re-allocation happened.
bool ReallocIfNeeded(float ratio);

iterator& operator++() {
Advance();
return *this;
Expand Down Expand Up @@ -118,6 +122,10 @@ class ScoreMap : public DenseSet {
}

private:
// Reallocate key and/or value if their pages are underutilized.
// Returns new pointer (stays same if key utilization is enough) and if reallocation happened.
std::pair<sds, bool> ReallocIfNeeded(void* obj, float ratio);

uint64_t Hash(const void* obj, uint32_t cookie) const final;
bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const final;
size_t ObjectAllocSize(const void* obj) const final;
Expand Down
52 changes: 52 additions & 0 deletions src/core/score_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "core/score_map.h"

#include <mimalloc.h>

#include "base/gtest.h"
#include "base/logging.h"
#include "core/mi_memory_resource.h"
Expand Down Expand Up @@ -84,4 +86,54 @@ TEST_F(ScoreMapTest, EmptyFind) {
EXPECT_EQ(nullopt, sm_->Find("bar"));
}

uint64_t total_wasted_memory = 0;

TEST_F(ScoreMapTest, ReallocIfNeeded) {
auto build_str = [](size_t i) { return to_string(i) + string(131, 'a'); };

auto count_waste = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
size_t used = block_size * area->used;
total_wasted_memory += area->committed - used;
return true;
};

for (size_t i = 0; i < 10'000; i++) {
sm_->AddOrUpdate(build_str(i), i);
}

for (size_t i = 0; i < 10'000; i++) {
if (i % 10 == 0)
continue;
sm_->Erase(build_str(i));
}

mi_heap_collect(mi_heap_get_backing(), true);
mi_heap_visit_blocks(mi_heap_get_backing(), false, count_waste, nullptr);
size_t wasted_before = total_wasted_memory;

size_t underutilized = 0;
for (auto it = sm_->begin(); it != sm_->end(); ++it) {
underutilized += zmalloc_page_is_underutilized(it->first, 0.9);
it.ReallocIfNeeded(0.9);
}
// Check there are underutilized pages
CHECK_GT(underutilized, 0u);

total_wasted_memory = 0;
mi_heap_collect(mi_heap_get_backing(), true);
mi_heap_visit_blocks(mi_heap_get_backing(), false, count_waste, nullptr);
size_t wasted_after = total_wasted_memory;

// Check we waste significanlty less now
EXPECT_GT(wasted_before, wasted_after * 2);

ASSERT_EQ(sm_->UpperBoundSize(), 1000);
for (size_t i = 0; i < 1000; i++) {
auto res = sm_->Find(build_str(i * 10));
ASSERT_EQ(res.has_value(), true);
ASSERT_EQ((size_t)*res, i * 10);
}
}

} // namespace dfly
11 changes: 11 additions & 0 deletions src/core/sorted_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -769,5 +769,16 @@ SortedMap* SortedMap::FromListPack(PMR_NS::memory_resource* res, const uint8_t*
return zs;
}

bool SortedMap::DefragIfNeeded(float ratio) {
bool realloced = false;

for (auto it = score_map->begin(); it != score_map->end(); ++it) {
realloced |= it.ReallocIfNeeded(ratio);
// Not sure if it's worth going over B+ tree cause lgn to delete + lgn to insert
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it's really worth going over the B+ tree because we would need to delete + insert. We can add this in the future if we wish to + it's really boilerplate....

}

return realloced;
}

} // namespace detail
} // namespace dfly
2 changes: 2 additions & 0 deletions src/core/sorted_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class SortedMap {
uint8_t* ToListPack() const;
static SortedMap* FromListPack(PMR_NS::memory_resource* res, const uint8_t* lp);

bool DefragIfNeeded(float ratio);

private:
using ScoreTree = BPTree<ScoreSds, ScoreSdsPolicy>;

Expand Down
Loading