Skip to content

Commit

Permalink
chore: WrapSds from family_utils.h (#3818)
Browse files Browse the repository at this point in the history
* chore: WrapSds from family_utils.h
---------

Signed-off-by: Roman Gershman <[email protected]>
Signed-off-by: Kostas Kyrimis  <[email protected]>
Co-authored-by: Roman Gershman <[email protected]>
Co-authored-by: Kostas Kyrimis <[email protected]>
  • Loading branch information
3 people authored Oct 1, 2024
1 parent fa288c1 commit beb2ec2
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 216 deletions.
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ endif()

add_library(dragonfly_lib bloom_family.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc engine_shard.cc
engine_shard_set.cc
engine_shard_set.cc family_utils.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
Expand Down
31 changes: 0 additions & 31 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,37 +423,6 @@ std::ostream& operator<<(std::ostream& os, const GlobalState& state) {
return os << GlobalStateName(state);
}

NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
CHECK_GT(max_range, RandomPick(0));
}

RandomPick NonUniquePicksGenerator::Generate() {
return absl::Uniform(bitgen_, 0u, max_range_);
}

UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range)
: remaining_picks_count_(picks_count), picked_indexes_(picks_count) {
CHECK_GE(max_range, picks_count);
current_random_limit_ = max_range - picks_count;
}

RandomPick UniquePicksGenerator::Generate() {
DCHECK_GT(remaining_picks_count_, 0u);

remaining_picks_count_--;

const RandomPick max_index = current_random_limit_++;
const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u);

const bool random_index_is_picked = picked_indexes_.emplace(random_index).second;
if (random_index_is_picked) {
return random_index;
}

picked_indexes_.insert(max_index);
return max_index;
}

ThreadLocalMutex::ThreadLocalMutex() {
shard_ = EngineShard::tlocal();
}
Expand Down
44 changes: 0 additions & 44 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include <absl/random/random.h>
#include <absl/strings/ascii.h>
#include <absl/strings/str_cat.h>
#include <absl/types/span.h>
Expand Down Expand Up @@ -318,49 +317,6 @@ struct MemoryBytesFlag {
bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err);
std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag);

using RandomPick = std::uint32_t;

class PicksGenerator {
public:
virtual RandomPick Generate() = 0;
virtual ~PicksGenerator() = default;
};

class NonUniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
NonUniquePicksGenerator(RandomPick max_range);

RandomPick Generate() override;

private:
const RandomPick max_range_;
absl::BitGen bitgen_{};
};

/*
* Generates unique index in O(1).
*
* picks_count specifies the number of random indexes to be generated.
* In other words, this is the number of times the Generate() function is called.
*
* The class uses Robert Floyd's sampling algorithm
* https://dl.acm.org/doi/pdf/10.1145/30401.315746
* */
class UniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range);

RandomPick Generate() override;

private:
RandomPick current_random_limit_;
std::uint32_t remaining_picks_count_;
std::unordered_set<RandomPick> picked_indexes_;
absl::BitGen bitgen_{};
};

// Helper class used to guarantee atomicity between serialization of buckets
class ABSL_LOCKABLE ThreadLocalMutex {
public:
Expand Down
6 changes: 0 additions & 6 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,11 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap),
shard_id_(pb->GetPoolIndex()) {
tmp_str1 = sdsempty();

defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
}

EngineShard::~EngineShard() {
sdsfree(tmp_str1);
}

void EngineShard::Shutdown() {
DVLOG(1) << "EngineShard::Shutdown";

Expand Down
6 changes: 0 additions & 6 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ class EngineShard {
Stats& operator+=(const Stats&);
};

// EngineShard() is private down below.
~EngineShard();

// Sets up a new EngineShard in the thread.
// If update_db_time is true, initializes periodic time update for its db_slice.
static void InitThreadLocal(util::ProactorBase* pb);
Expand Down Expand Up @@ -122,9 +119,6 @@ class EngineShard {
return shard_search_indices_.get();
}

// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1;

// Moving average counters.
enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };

Expand Down
43 changes: 43 additions & 0 deletions src/server/family_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include "server/family_utils.h"

#include "base/logging.h"

namespace dfly {

sds WrapSds(std::string_view s) {
static thread_local sds tmp_sds = sdsempty();
return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length());
}

NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
CHECK_GT(max_range, RandomPick(0));
}

RandomPick NonUniquePicksGenerator::Generate() {
return absl::Uniform(bitgen_, 0u, max_range_);
}

UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range)
: remaining_picks_count_(picks_count), picked_indexes_(picks_count) {
CHECK_GE(max_range, picks_count);
current_random_limit_ = max_range - picks_count;
}

RandomPick UniquePicksGenerator::Generate() {
DCHECK_GT(remaining_picks_count_, 0u);

remaining_picks_count_--;

const RandomPick max_index = current_random_limit_++;
const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u);

const bool random_index_is_picked = picked_indexes_.emplace(random_index).second;
if (random_index_is_picked) {
return random_index;
}

picked_indexes_.insert(max_index);
return max_index;
}

} // namespace dfly
63 changes: 63 additions & 0 deletions src/server/family_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/container/flat_hash_set.h>
#include <absl/random/random.h>

#include <cstdint>

extern "C" {
#include "redis/sds.h"
}
namespace dfly {

// Copy str to thread local sds instance. Valid until next WrapSds call on thread
sds WrapSds(std::string_view str);

using RandomPick = uint32_t;

class PicksGenerator {
public:
virtual RandomPick Generate() = 0;
virtual ~PicksGenerator() = default;
};

class NonUniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
NonUniquePicksGenerator(RandomPick max_range);

RandomPick Generate() override;

private:
const RandomPick max_range_;
absl::BitGen bitgen_{};
};

/*
* Generates unique index in O(1).
*
* picks_count specifies the number of random indexes to be generated.
* In other words, this is the number of times the Generate() function is called.
*
* The class uses Robert Floyd's sampling algorithm
* https://dl.acm.org/doi/pdf/10.1145/30401.315746
* */
class UniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
UniquePicksGenerator(uint32_t picks_count, RandomPick max_range);

RandomPick Generate() override;

private:
RandomPick current_random_limit_;
uint32_t remaining_picks_count_;
absl::flat_hash_set<RandomPick> picked_indexes_;
absl::BitGen bitgen_{};
};

} // namespace dfly
6 changes: 3 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/server_state.h"
#include "server/family_utils.h"
#include "server/transaction.h"

/**
Expand Down Expand Up @@ -283,8 +283,8 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;

for (string_view v : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size());
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
}

if (res.is_new) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "server/set_family.h"

#include "server/family_utils.h"

extern "C" {
#include "redis/intset.h"
#include "redis/redis_aux.h"
Expand Down
Loading

0 comments on commit beb2ec2

Please sign in to comment.