diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 3c09991e7..22bf9b816 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -31,6 +31,14 @@ bool BaseCmd::CheckArg(size_t num) const { std::vector BaseCmd::CurrentKey(PClient* client) const { return std::vector{client->Key()}; } +void BaseCmd::ProxyExecute(PClient* client) { + DEBUG("proxy execute command: {}", client->CmdName()); + + if (g_config.use_raft.load()) { + + } +} + void BaseCmd::Execute(PClient* client) { DEBUG("execute command: {}", client->CmdName()); diff --git a/src/base_cmd.h b/src/base_cmd.h index 6bb77dfa2..8bd7bf068 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -271,6 +271,8 @@ class BaseCmd : public std::enable_shared_from_this { // 后续如果需要拓展,在这个函数里面拓展 // 对外部调用者来说,只暴露这个函数,其他的都是内部实现 void Execute(PClient* client); + + void ProxyExecute(PClient* client); // binlog 相关的函数,我对这块不熟悉,就没有移植,后面binlog应该可以在Execute里面调用 virtual std::string ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, diff --git a/src/net/tcp_connection.h b/src/net/tcp_connection.h index 5bfb7f354..e2629bcfd 100644 --- a/src/net/tcp_connection.h +++ b/src/net/tcp_connection.h @@ -23,6 +23,8 @@ using TcpMessageCallback = std::function; // called when a connection being reset using TcpDisconnectCallback = std::function; +// called when got task callback +using TcpTaskCallback = std::function; // After client connects the server or the server accepts a new client, // the pikiwidb will create a TcpConnection to handle the connection. diff --git a/src/proxy.cc b/src/proxy.cc new file mode 100644 index 000000000..f666379cf --- /dev/null +++ b/src/proxy.cc @@ -0,0 +1,16 @@ +#include "proxy.h" + +namespace pikiwidb { + +PProxy& PProxy::Instance() { + static PProxy inst_; + return inst_; +} + +bool ParseArgs(int ac, char* av[]) { + // TODO: Parse ip from cfg file +} + + + +} \ No newline at end of file diff --git a/src/proxy.h b/src/proxy.h new file mode 100644 index 000000000..c0c5b935c --- /dev/null +++ b/src/proxy.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include "common.h" +#include "net/http_server.h" +#include "net/tcp_connection.h" + +namespace pikiwidb { + +class PProxy final { +public: + PProxy() = default; + ~PProxy() = default; + + PProxy& Instance(); + + bool ParseArgs(int ac, char *av[]); + const PString& GetConfigName() const { return cfg_file_; } + + bool Init(); + void Run(); + + void Stop(); + + void OnNewConnection(TcpConnection* obj); + +public: + PString cfg_file_; + uint16_t port_{0}; + PString log_level_; + + PString master_; + uint16_t master_port_{0}; + + static const uint32_t kRunidSize; + +private: + uint32_t cmd_id_ = 0; + +}; +} \ No newline at end of file diff --git a/src/proxy/brpc_redis.cc b/src/proxy/brpc_redis.cc new file mode 100644 index 000000000..bfaba2587 --- /dev/null +++ b/src/proxy/brpc_redis.cc @@ -0,0 +1,65 @@ +#include "brpc_redis.h" +#include +#include +#include +#include +#include +#include +#include +#include "config.h" +#include "proxy_base_cmd.h" + +namespace pikiwidb { + +void* thread_entry(void* arg) { + auto func = static_cast*>(arg); + (*func)(); + delete func; + return nullptr; +} + +void BrpcRedis::Open() { + +} + +void BrpcRedis::PushRedisTask(const std::shared_ptr& task) { + std::lock_guard lock(lock__); + tasks_.push_back(task); +} + +void SetResponse(const brpc::RedisResponse& response, const std::shared_ptr& task, size_t index) { + // TODO: write callback + LOG(INFO) << response.reply(index); + + + +} + +void BrpcRedis::Commit() { + brpc::RedisRequest request; + brpc::RedisResponse response; + brpc::Controller cntl; + std::vector> task_batch; + size_t batch_size = std::min((size_t) tasks_.size(), batch_size_); + + { + std::lock_guard lock(lock__); + task_batch.assign(tasks_.begin(), tasks_.begin() + batch_size); + tasks_.erase(tasks_.begin(), tasks_.begin() + batch_size); + } + + for (auto& task : task_batch) { + request.AddCommand(task->GetCommand()); + } + + auto callback = new std::function([&]() { + channel_.CallMethod(nullptr, &cntl, &request, &response, nullptr); + for (size_t i = 0; i < task_batch.size(); i++) { + SetResponse(response, task_batch[i], i); + } + }); + + callback(); +} + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/proxy/brpc_redis.h b/src/proxy/brpc_redis.h new file mode 100644 index 000000000..9c8b1c436 --- /dev/null +++ b/src/proxy/brpc_redis.h @@ -0,0 +1,41 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include "brpc/redis_reply.h" +#include "redis.h" +#include "proxy_base_cmd.h" + +namespace pikiwidb { + +class BrpcRedis : public Redis { +public: + void Init() { options.protocol = brpc::PROTOCOL_REDIS; } + + void Open(); + + void PushRedisTask(const std::shared_ptr& task); + + void Commit(); + + brpc::Channel GetChannel() { return channel_; } + brpc::ChannelOptions GetOptions() { return options; } + + BrpcRedis() { this->Init(); } + +private: + void SetResponse(const brpc::RedisResponse& resp, const std::shared_ptr& task, size_t index); + + brpc::Channel channel_; + brpc::ChannelOptions options; + std::mutex lock__; + std::vector> tasks_; + size_t batch_size_ = 5; +}; +} + diff --git a/src/proxy/proxy_base_cmd.cc b/src/proxy/proxy_base_cmd.cc new file mode 100644 index 000000000..8e048a193 --- /dev/null +++ b/src/proxy/proxy_base_cmd.cc @@ -0,0 +1,8 @@ +#include "proxy_base_cmd.h" +#include "log.h" +#include "client.h" + +namespace pikiwidb { +// PS: no function need to be defined + +} \ No newline at end of file diff --git a/src/proxy/proxy_base_cmd.h b/src/proxy/proxy_base_cmd.h new file mode 100644 index 000000000..b12214294 --- /dev/null +++ b/src/proxy/proxy_base_cmd.h @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "proxy_base_cmd.h" +#include "client.h" +#include "store.h" + +namespace pikiwidb { + +const std::string kTaskNamePing = "ping"; + +enum TaskFlags { + kTaskFlagsWrite = (1 << 0), + +}; + +class ProxyBaseCmd : public std::enable_shared_from_this { +public: + enum class OpType { + kGet, + kSet, + kDel, + kIncr, + kDecr, + kUnknown, + }; + + enum class KeyType { + kGKey, + kMKey, + kUnknown, + }; + + enum class TaskType { + kExecute, + kCallback, + kUnknown, + }; + + ProxyBaseCmd() = default; + virtual ~ProxyBaseCmd() = default; + + virtual void Execute() = 0; + virtual void CallBack() = 0; + + virtual std::string GetCommand() = 0; + + OpType GetOpType() const { return op_type_; } + KeyType GetKeyType() const { return key_type_; } + TaskType GetTaskType() const { return task_type_; } + virtual std::shared_ptr Client() = 0; + virtual std::string GetKey() = 0; + +protected: + OpType op_type_ = OpType::kUnknown; + KeyType key_type_ = KeyType::kUnknown; + TaskType task_type_ = TaskType::kUnknown; + + // TODO(Tangruilin): counter for mget or mset + +private: + virtual bool DoInitial(PClient* client) = 0; +}; + +} \ No newline at end of file diff --git a/src/proxy/proxy_cmd_kv.cc b/src/proxy/proxy_cmd_kv.cc new file mode 100644 index 000000000..1e283325f --- /dev/null +++ b/src/proxy/proxy_cmd_kv.cc @@ -0,0 +1,45 @@ +#include "proxy_cmd_kv.h" +#include "base_cmd.h" +#include +#include +#include "pikiwidb.h" + +// proxy_cmd_kv.cc +namespace pikiwidb { + +std::string SetProxyCmd::GetCommand() { + return "set " + key_ + " " + value_; +} + +bool SetProxyCmd::DoInitial(PClient* client) { + // client + client_.reset(client); +} + +// +void SetProxyCmd::Execute() { + // TODO: route, (leave interface for mget or mset, + // split task and combine callback) + // Codis: add sub task for mget or mset + + // route class: manage all brpc_redis + // task -> route -> brpc_redis_ + // Commit might be launch from timer (better) or route + // route::forward(cmd) + + ROUTER.forward(shared_from_this()); +} + +void SetProxyCmd::CallBack() { + // same as cmd_kv.cc + // after DoCmd ? + + client_->SetRes(CmdRes::kOK); +} + +} // namespace pikiwidb + +// TODO: +// 1. 解析 config 文件,知道后台有多少 pikiwidb +// 2. flag 以 proxy 模式启动 +// 3. client \ No newline at end of file diff --git a/src/proxy/proxy_cmd_kv.h b/src/proxy/proxy_cmd_kv.h new file mode 100644 index 000000000..9d379d8f4 --- /dev/null +++ b/src/proxy/proxy_cmd_kv.h @@ -0,0 +1,33 @@ +#pragma once +#include "proxy_base_cmd.h" +#include "brpc_redis.h" +#include "router.h" + +namespace pikiwidb { + +class SetProxyCmd : public ProxyBaseCmd { + public: + enum SetCondition { kNONE, kNX, kXX, kEXORPX }; + SetProxyCmd(std::string key, std::string value) : key_(key), value_(value) {}; + std::shared_ptr Client() { return client_; } + + protected: + void Execute() override; + void CallBack() override; + bool DoInitial(PClient* client) override; + std::string GetCommand() override; + + private: + std::string key_; + std::string value_; + int64_t sec_ = 0; + std::shared_ptr client_; // TODO: need to discuss + Router* router_; + + SetProxyCmd::SetCondition condition_{kNONE}; + + std::unique_ptr brpc_redis_; +}; + + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/proxy/router.cc b/src/proxy/router.cc new file mode 100644 index 000000000..e94305255 --- /dev/null +++ b/src/proxy/router.cc @@ -0,0 +1,46 @@ +#include "router.h" +#include "config.h" + +namespace pikiwidb { + +extern PConfig g_config; + +Router::~Router() { + timer_wheel_->Stop(); + INFO("ROUTER is closing..."); +} + +Router& Router::Instance() { + static Router router; + return router; +} + +void Router::Init() { + // TODO: config.cc main()... + // TODO: register brpc_redis from cfg_file_ + vector ip_group = SplitString(g_config.proxy_ip.ToString(), ';'); + + for (auto &ip : ip_group) { + BrpcRedis brpc_redis; + if (brpc_redis.GetChannel().Init(ip, &brpc_redis.GetOptions()) != 0) { + LOG(ERROR) << "Fail to init channel to pikiwidb, ip " << ip; + continue; + } + brpc_redis_.push_back(brpc_redis); + } + + brpc_redis_num_ = brpc_redis_.size(); + timer_wheel_ = new TimerWheel(10, 1000); + for (auto &brpc : brpc_redis_) { + timer_wheel_->AddTask(1000, [brpc]() { brpc->Commit(); }); + } + timer_wheel_->Start(); +} + +void Router::forward(std::shared_ptr task) { + // TODO (Tangruilin): add pd + // hash key + brpc_redis_[hasher_(task->GetKey()) % brpc_redis_num_]->PushRedisTask(task); +} + +} \ No newline at end of file diff --git a/src/proxy/router.h b/src/proxy/router.h new file mode 100644 index 000000000..1e1cb4012 --- /dev/null +++ b/src/proxy/router.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "proxy_base_cmd.h" +#include "brpc_redis.h" + +class TimerWheel { +public: + using Task = std::function; + + explicit TimerWheel(size_t wheel_size, int interval_ms) + : wheel_size_(wheel_size), + interval_ms_(interval_ms), + wheel_(wheel_size), + current_index_(0) {} + + ~TimerWheel() { + Stop(); + } + + void Start() { + if (running_) return; + running_ = true; + thread_ = std::thread([this]() { + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms_)); + Tick(); + } + }); + thread_.detach(); + } + + void Stop() { + if (!running_) return; + running_ = false; + if (thread_.joinable()) thread_.join(); + } + + void AddTask(int timeout_ms, Task task) { + std::lock_guard lock(mutex_); + size_t ticks = timeout_ms / interval_ms_; + size_t index = (current_index_ + ticks) % wheel_size_; + size_t allindex = index; + for (size_t i = 1; allindex < wheel_size_; i++) { + allindex = index * i; + if (allindex >= wheel_size_) break; + wheel_[allindex].push_back(task); + } + } + +private: + void Tick() { + std::lock_guard lock(mutex_); + auto& tasks = wheel_[current_index_]; + for (const auto& task : tasks) { + task(); + } + current_index_ = (current_index_ + 1) % wheel_size_; + } + +private: + size_t wheel_size_; + int interval_ms_; + std::vector> wheel_; + size_t current_index_; + bool running_ = false; + std::thread thread_; + std::mutex mutex_; +}; + +namespace pikiwidb { +class Router { +public: + static Router& Instance(); + + Router(const Router&) = delete; + void operator=(const Router&) = delete; + ~Router(); + + void Init(); + void forward(std::shared_ptr task); + +private: + Router() = default; + std::atomic t_counter_ = 0; + std::atomic brpc_redis_num_{0}; + std::vector> brpc_redis_; + std::hash hasher_; + + TimerWheel* timer_wheel_; +}; + +#define ROUTER Router::Instance() + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/proxy/task_manager.cc b/src/proxy/task_manager.cc new file mode 100644 index 000000000..d9c56006d --- /dev/null +++ b/src/proxy/task_manager.cc @@ -0,0 +1,183 @@ +#include +#include +#include + +#include "task_manager.h" +#include "pstd/log.h" +#include "util.h" +#include "pikiwidb.h" + +namespace pikiwidb { + +const size_t TaskManager::kMaxWorkers = 8; + +bool TaskManager::SetWorkerNum(size_t num) { + if (num <= 1) return true; + if (state_ != State::kNone) { + ERROR("can only called before application run"); + return false; + } + if (!worker_loops_.empty()) { + ERROR("can only called once, not empty loops size: {}", worker_loops_.size()); + return false; + } + worker_num_.store(num); + worker_threads_.reserve(num); + worker_loops_.reserve(num); + return true; +} + +bool TaskManager::Init() { + auto f = [this] { return ChooseNextWorkerEventLoop(); }; + + base_.Init(); + INFO("base loop {} {}, g_baseLoop {}", base_.GetName(), static_cast(&base_)), + static_cast(pikiwidb::EventLoop::Self()); + + return true; +} + +void TaskManager::Run(int ac, char* av[]) { + assert(state_ == State::kNone); + INFO("Process {} starting...", name_); + + StartWorkers(); + base_.Run(); + + for (auto& w : worker_threads_) { + if (w.joinable()) { + w.join(); + } + } + + worker_threads_.clear(); + + INFO("Process {} stopped, goodbye...", name_); +} + +void TaskManager::Exit() { + state_ = State::kStopped; + + BaseLoop()->Stop(); + for (const auto& worker_loops : worker_loops_) { + EventLoop* loop = worker_loops.get(); + loop->Stop(); + } +} + +bool TaskManager::IsExit() const { return state_ == State::kStopped; } + +EventLoop* TaskManager::BaseLoop() { return &base_; } + +EventLoop* TaskManager::ChooseNextWorkerEventLoop() { + if (worker_loops_.empty()) + return BaseLoop(); + + auto& loop = worker_loops_[current_work_loop_++ % worker_loops_.size()]; + return loop.get(); +} + +void TaskManager::StartWorkers() { + assert(state_ == State::kNone); + + size_t index = 1; + while (worker_loops_.size() < worker_num_) { + std::unique_ptr loop = std::make_unique(); + if (!name_.empty()) { + loop->SetName(name_ + std::to_string(index++)); + INFO("loop {}, name {}", static_cast(loop.get()), loop->GetName().c_str()); + } + worker_loops_.push_back(std::move(loop)); + } + + for (index = 0; index < worker_loops_.size(); ++index) { + EventLoop* loop = worker_loops_[index].get(); + std::thread t([loop]() { + loop->Init(); + loop->Run(); + }); + INFO("thread {}, thread loop {}, loop name {}", index, static_cast(loop), loop->GetName().c_str()); + worker_threads_.push_back(std::move(t)); + } + + state_ = State::kStarted; + + TaskMutex_.reserve(worker_num_); + TaskCond_.reserve(worker_num_); + TaskQueue_.reserve(worker_num_); + + for (size_t index = 0; index < worker_num_; ++index) { + TaskMutex_.emplace_back(std::make_unique()); + TaskCond_.emplace_back(std::make_unique()); + TaskQueue_.emplace_back(); + + std::thread t([this, index]() { + while (TaskRunning_) { + std::unique_lock lock(*TaskMutex_[index]); + while (TaskQueue_[index].empty()) { + if (!TaskRunning_) break; + TaskCond_[index]->wait(lock); + } + if (!TaskRunning_) break; + auto task = TaskQueue_[index].front(); + + switch (task->GetTaskType()) { + case ProxyBaseCmd::TaskType::kExecute: + task->Execute(); + break; + case ProxyBaseCmd::TaskType::kCallback: + task->CallBack(); + g_pikiwidb->PushWriteTask(task->Client()); + // return DoCmd 之后有无返回,client 是否有 eventloop 维护 + break; + default: + ERROR("unsupported task type..."); + break; + } + + TaskQueue_[index].pop_front(); + } + INFO("worker write thread {}, goodbye...", index); + }); + + INFO("worker write thread {}, starting...", index); + } + +} + +void TaskManager::SetName(const std::string& name) { name_ = name; } + +void TaskManager::Reset() { + state_ = State::kNone; + BaseLoop()->Reset(); +} + +void TaskManager::PushTask(std::shared_ptr task) { + auto pos = (++t_counter_) % worker_num_; + std::unique_lock lock(*TaskMutex_[pos]); + + TaskQueue_[pos].emplace_back(task); + TaskCond_[pos]->notify_one(); +} + +void TaskManager::Exit() { + TaskRunning_ = false; + + int i = 0; + for (auto& cond : TaskCond_) { + std::unique_lock lock(*TaskMutex_[i++]); + cond->notify_all(); + } + for (auto& wt : TaskThreads_) { + if (wt.joinable()) { + wt.join(); + } + } + + TaskThreads_.clear(); + TaskCond_.clear(); + TaskQueue_.clear(); + TaskMutex_.clear(); +} + +} \ No newline at end of file diff --git a/src/proxy/task_manager.h b/src/proxy/task_manager.h new file mode 100644 index 000000000..a2e2160fb --- /dev/null +++ b/src/proxy/task_manager.h @@ -0,0 +1,74 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "client.h" +#include "proxy_base_cmd.h" +#include "cmd_thread_pool.h" +#include "net/event_loop.h" +#include "base_cmd.h" +#include "cmd_table_manager.h" + +namespace pikiwidb { + + +class TaskManager { +public: + TaskManager() = default; + ~TaskManager() = default; + + static size_t GetMaxWorkerNum() { return kMaxWorkers; } + + bool Init(); + void Run(int argc, char* argv[]); + void Exit(); + void PushTask(std::shared_ptr task); + bool IsExit() const; + void SetName(const std::string& name); + + EventLoop* BaseLoop(); + + EventLoop* ChooseNextWorkerEventLoop(); + + bool SetWorkerNum(size_t n); + + void Reset(); + +protected: + virtual void StartWorkers(); + + static const size_t kMaxWorkers; + + std::string name_; + + EventLoop base_; + + std::atomic worker_num_{0}; + std::vector worker_threads_; + std::vector> worker_loops_; + mutable std::atomic current_work_loop_{0}; + + enum class State { + kNone, + kStarted, + kStopped, + }; + std::atomic state_{State::kNone}; + + pikiwidb::CmdTableManager cmd_table_manager_; + +private: + std::vector TaskThreads_; + std::vector> TaskMutex_; + std::vector> TaskCond_; + std::vector>> TaskQueue_; + + std::atomic t_counter_ = 0; + bool TaskRunning_ = true; +}; + +} // namespace pikiwidb \ No newline at end of file diff --git a/tests/assets/default.conf b/tests/assets/default.conf index f80c86d5c..3f23870e6 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -10,7 +10,7 @@ port 9221 # If you want you can bind a single interface, if the bind option is not # specified all the interfaces will listen for incoming connections. # -# bind 127.0.0.1 +ip 127.0.0.1 # Close the connection after a client is idle for N seconds (0 to disable) @@ -35,7 +35,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 3 +databases 16 ################################ SNAPSHOTTING ################################# # @@ -261,7 +261,6 @@ maxmemory-policy noeviction # maxmemory-samples 5 - ################################ THREADED I/O ################################# # So for instance if you have a four cores boxes, try to use 2 or 3 I/O # threads, if you have a 8 cores, try to use 6 threads. In order to @@ -315,37 +314,35 @@ slowlog-log-slower-than 10000 # You can reclaim memory used by the slow log with SLOWLOG RESET. slowlog-max-len 128 -############################### ADVANCED CONFIG ############################### - -# Redis calls an internal function to perform many background tasks, like -# closing connections of clients in timeot, purging expired keys that are -# never requested, and so forth. -# -# Not all tasks are perforemd with the same frequency, but Redis checks for -# tasks to perform accordingly to the specified "hz" value. -# -# By default "hz" is set to 10. Raising the value will use more CPU when -# Redis is idle, but at the same time will make Redis more responsive when -# there are many keys expiring at the same time, and timeouts may be -# handled with more precision. -# -# The range is between 1 and 500, however a value over 100 is usually not -# a good idea. Most users should use the default of 10 and raise this up to -# 100 only in environments where very low latency is required. -hz 10 ############################### BACKENDS CONFIG ############################### -# PikiwiDB is a in memory database, though it has aof and rdb for dump data to disk, it -# is very limited. Try use leveldb for real storage, pikiwidb as cache. The cache algorithm -# is like linux page cache, please google or read your favorite linux book -# 0 is default, no backend -# 1 is RocksDB, currently only support RocksDB -backend 1 -backendpath dump -# the frequency of dump to backend per second -backendhz 10 -# the rocksdb number per db -db-instance-num 5 +# PikiwiDB uses RocksDB as the underlying storage engine, and the data belonging +# to the same DB is distributed among several RocksDB instances. + +# RocksDB instances number per DB +db-instance-num 3 +# default is 86400 * 7 +small-compaction-threshold 604800 +# default is 86400 * 3 +small-compaction-duration-threshold 259200 + +############################### ROCKSDB CONFIG ############################### +rocksdb-max-subcompactions 2 +rocksdb-max-background-jobs 4 +rocksdb-max-write-buffer-number 2 +rocksdb-min-write-buffer-number-to-merge 2 +# default is 64M +rocksdb-write-buffer-size 67108864 +rocksdb-level0-file-num-compaction-trigger 4 +rocksdb-number-levels 7 +rocksdb-enable-pipelined-write no +rocksdb-level0-slowdown-writes-trigger 20 +rocksdb-level0-stop-writes-trigger 36 # default 86400 * 7 rocksdb-ttl-second 604800 # default 86400 * 3 -rocksdb-periodic-second 259200 +rocksdb-periodic-second 259200; + +############################### RAFT ############################### +use-raft no +# Braft relies on brpc to communicate via the default port number plus the port offset +raft-port-offset 10