Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add proxy module #441

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ bool BaseCmd::CheckArg(size_t num) const {

std::vector<std::string> BaseCmd::CurrentKey(PClient* client) const { return std::vector<std::string>{client->Key()}; }

void BaseCmd::ProxyExecute(PClient* client) {
DEBUG("proxy execute command: {}", client->CmdName());

if (g_config.use_raft.load()) {

}
}

void BaseCmd::Execute(PClient* client) {
DEBUG("execute command: {}", client->CmdName());

Expand Down
2 changes: 2 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {
// 后续如果需要拓展,在这个函数里面拓展
// 对外部调用者来说,只暴露这个函数,其他的都是内部实现
void Execute(PClient* client);

void ProxyExecute(PClient* client);

// binlog 相关的函数,我对这块不熟悉,就没有移植,后面binlog应该可以在Execute里面调用
virtual std::string ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum,
Expand Down
2 changes: 2 additions & 0 deletions src/net/tcp_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ using TcpMessageCallback = std::function<int(TcpConnection*, const char* data, i
using TcpConnectionFailCallback = std::function<void(EventLoop*, const char* peer_ip, int port)>;
// called when a connection being reset
using TcpDisconnectCallback = std::function<void(TcpConnection*)>;
// called when got task callback
using TcpTaskCallback = std::function<void(TcpConnection*)>;

// After client connects the server or the server accepts a new client,
// the pikiwidb will create a TcpConnection to handle the connection.
Expand Down
16 changes: 16 additions & 0 deletions src/proxy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "proxy.h"

namespace pikiwidb {

PProxy& PProxy::Instance() {
static PProxy inst_;
return inst_;
}

bool ParseArgs(int ac, char* av[]) {
// TODO: Parse ip from cfg file
}



}
41 changes: 41 additions & 0 deletions src/proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <memory>
#include "common.h"
#include "net/http_server.h"
#include "net/tcp_connection.h"

namespace pikiwidb {

class PProxy final {
public:
PProxy() = default;
~PProxy() = default;

PProxy& Instance();

bool ParseArgs(int ac, char *av[]);
const PString& GetConfigName() const { return cfg_file_; }

bool Init();
void Run();

void Stop();

void OnNewConnection(TcpConnection* obj);

public:
PString cfg_file_;
uint16_t port_{0};
PString log_level_;

PString master_;
uint16_t master_port_{0};

static const uint32_t kRunidSize;

private:
uint32_t cmd_id_ = 0;

};
}
65 changes: 65 additions & 0 deletions src/proxy/brpc_redis.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "brpc_redis.h"
#include <bthread/bthread.h>
#include <algorithm>
#include <cstddef>
#include <functional>
#include <memory>
#include <utility>
#include <vector>
#include "config.h"
#include "proxy_base_cmd.h"

namespace pikiwidb {

void* thread_entry(void* arg) {
auto func = static_cast<std::function<void()>*>(arg);
(*func)();
delete func;
return nullptr;
}

void BrpcRedis::Open() {

}

void BrpcRedis::PushRedisTask(const std::shared_ptr<ProxyBaseCmd>& task) {
std::lock_guard<std::mutex> lock(lock__);
tasks_.push_back(task);
}

void SetResponse(const brpc::RedisResponse& response, const std::shared_ptr<ProxyBaseCmd>& task, size_t index) {
// TODO: write callback
LOG(INFO) << response.reply(index);



}

void BrpcRedis::Commit() {
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
std::vector<std::shared_ptr<ProxyBaseCmd>> task_batch;
size_t batch_size = std::min((size_t) tasks_.size(), batch_size_);

{
std::lock_guard<std::mutex> lock(lock__);
task_batch.assign(tasks_.begin(), tasks_.begin() + batch_size);
tasks_.erase(tasks_.begin(), tasks_.begin() + batch_size);
}

for (auto& task : task_batch) {
request.AddCommand(task->GetCommand());
}

auto callback = new std::function<void()>([&]() {
channel_.CallMethod(nullptr, &cntl, &request, &response, nullptr);
for (size_t i = 0; i < task_batch.size(); i++) {
SetResponse(response, task_batch[i], i);
}
});

callback();
}

} // namespace pikiwidb
41 changes: 41 additions & 0 deletions src/proxy/brpc_redis.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once
#include <brpc/channel.h>
#include <brpc/redis.h>
#include <cstddef>
#include <memory>
#include <mutex>
#include <queue>
#include <vector>

#include "brpc/redis_reply.h"
#include "redis.h"
#include "proxy_base_cmd.h"

namespace pikiwidb {

class BrpcRedis : public Redis {
public:
void Init() { options.protocol = brpc::PROTOCOL_REDIS; }

void Open();

void PushRedisTask(const std::shared_ptr<ProxyBaseCmd>& task);

void Commit();

brpc::Channel GetChannel() { return channel_; }
brpc::ChannelOptions GetOptions() { return options; }

BrpcRedis() { this->Init(); }

private:
void SetResponse(const brpc::RedisResponse& resp, const std::shared_ptr<ProxyBaseCmd>& task, size_t index);

brpc::Channel channel_;
brpc::ChannelOptions options;
std::mutex lock__;
std::vector<std::shared_ptr<ProxyBaseCmd>> tasks_;
size_t batch_size_ = 5;
};
}

8 changes: 8 additions & 0 deletions src/proxy/proxy_base_cmd.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#include "proxy_base_cmd.h"
#include "log.h"
#include "client.h"

namespace pikiwidb {
// PS: no function need to be defined

}
72 changes: 72 additions & 0 deletions src/proxy/proxy_base_cmd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include <atomic>
#include <map>
#include <memory>
#include <span>
#include <string>
#include <unordered_map>
#include <vector>

#include "proxy_base_cmd.h"
#include "client.h"
#include "store.h"

namespace pikiwidb {

const std::string kTaskNamePing = "ping";

enum TaskFlags {
kTaskFlagsWrite = (1 << 0),

};

class ProxyBaseCmd : public std::enable_shared_from_this<ProxyBaseCmd> {
public:
enum class OpType {
kGet,
kSet,
kDel,
kIncr,
kDecr,
kUnknown,
};

enum class KeyType {
kGKey,
kMKey,
kUnknown,
};

enum class TaskType {
kExecute,
kCallback,
kUnknown,
};

ProxyBaseCmd() = default;
virtual ~ProxyBaseCmd() = default;

virtual void Execute() = 0;
virtual void CallBack() = 0;

virtual std::string GetCommand() = 0;

OpType GetOpType() const { return op_type_; }
KeyType GetKeyType() const { return key_type_; }
TaskType GetTaskType() const { return task_type_; }
virtual std::shared_ptr<PClient> Client() = 0;
virtual std::string GetKey() = 0;

protected:
OpType op_type_ = OpType::kUnknown;
KeyType key_type_ = KeyType::kUnknown;
TaskType task_type_ = TaskType::kUnknown;

// TODO(Tangruilin): counter for mget or mset

private:
virtual bool DoInitial(PClient* client) = 0;
};

}
45 changes: 45 additions & 0 deletions src/proxy/proxy_cmd_kv.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "proxy_cmd_kv.h"
#include "base_cmd.h"
#include <memory>
#include <utility>
#include "pikiwidb.h"

// proxy_cmd_kv.cc
namespace pikiwidb {

std::string SetProxyCmd::GetCommand() {
return "set " + key_ + " " + value_;
}

bool SetProxyCmd::DoInitial(PClient* client) {
// client
client_.reset(client);
}

//
void SetProxyCmd::Execute() {
// TODO: route, (leave interface for mget or mset,
// split task and combine callback)
// Codis: add sub task for mget or mset

// route class: manage all brpc_redis
// task -> route -> brpc_redis_
// Commit might be launch from timer (better) or route
// route::forward(cmd)

ROUTER.forward(shared_from_this());
}

void SetProxyCmd::CallBack() {
// same as cmd_kv.cc
// after DoCmd ?

client_->SetRes(CmdRes::kOK);
}

} // namespace pikiwidb

// TODO:
// 1. 解析 config 文件,知道后台有多少 pikiwidb
// 2. flag 以 proxy 模式启动
// 3. client
33 changes: 33 additions & 0 deletions src/proxy/proxy_cmd_kv.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once
#include "proxy_base_cmd.h"
#include "brpc_redis.h"
#include "router.h"

namespace pikiwidb {

class SetProxyCmd : public ProxyBaseCmd {
public:
enum SetCondition { kNONE, kNX, kXX, kEXORPX };
SetProxyCmd(std::string key, std::string value) : key_(key), value_(value) {};
std::shared_ptr<PClient> Client() { return client_; }

protected:
void Execute() override;
void CallBack() override;
bool DoInitial(PClient* client) override;
std::string GetCommand() override;

private:
std::string key_;
std::string value_;
int64_t sec_ = 0;
std::shared_ptr<PClient> client_; // TODO: need to discuss
Router* router_;

SetProxyCmd::SetCondition condition_{kNONE};

std::unique_ptr<BrpcRedis> brpc_redis_;
};


} // namespace pikiwidb
Loading
Loading