Skip to content

Commit

Permalink
modify list cmd with cache
Browse files Browse the repository at this point in the history
  • Loading branch information
hahahashen committed Aug 20, 2024
1 parent 41c5962 commit 3e3b0e7
Show file tree
Hide file tree
Showing 23 changed files with 1,372 additions and 981 deletions.
11 changes: 6 additions & 5 deletions cmake/rediscache.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ SET(REDISCACHE_LIBRARIES "${LIB_INSTALL_DIR}/librediscache.a" CACHE FILEPATH "re
ExternalProject_Add(
extern_rediscache
${EXTERNAL_PROJECT_LOG_ARGS}
URL https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz
URL_HASH MD5=02c8aadc018dd8d4d3803cc420d1d75b
#URL https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz
#URL_HASH MD5=02c8aadc018dd8d4d3803cc420d1d75b
#temp used
GIT_REPOSITORY [email protected]:hahahashen/rediscache.git
GIT_TAG feat/removeUseTcMallocMacroDefinition
CMAKE_ARGS
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
-DCMAKE_BUILD_TYPE=Debug
-DCMAKE_INSTALL_PREFIX=${LIB_INSTALL_PREFIX}
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DPROJECT_BINARY_DIR=${LIB_INSTALL_PREFIX}
-DCMAKE_FIND_LIBRARY_SUFFIXES=${LIB_INSTALL_PREFIX}
-DSNAPPY_BUILD_TESTS=OFF
#-DWITH_JEMALLOC=${JEMALLOC_ON}
#rediscache 通过这个变量设置安装后,头文件的目录 设置这个变量后,deps-release/include路径下就有rediscache了,很神奇
-DCMAKE_INSTALL_INCLUDEDIR=${REDISCACHE_INCLUDE_DIR}
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
BUILD_COMMAND make -j${CPU_CORE}
Expand Down
9 changes: 4 additions & 5 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,15 @@ void BaseCmd::Execute(PClient* client) {
return;
}

if (IsNeedCacheDo()
&& PCACHE_NONE != g_config.cache_mode.load()
&& PSTORE.GetBackend(dbIndex)->GetCache()->CacheStatus() == PCACHE_STATUS_OK) {
if (IsNeedCacheDo() && PCACHE_NONE != g_config.cache_mode.load() &&
PSTORE.GetBackend(dbIndex)->GetCache()->CacheStatus() == PCACHE_STATUS_OK) {
if (IsNeedReadCache()) {
ReadCache(client);
}
if ( HasFlag(kCmdFlagsReadonly)&& client->CacheMiss()) {
if (HasFlag(kCmdFlagsReadonly) && client->CacheMiss()) {
//@tobechecked 下面这行是pika实现中会用到的,pikiwidb中cmd层不用上key锁,因为storage层上了
//所以不需要加上这行,但是涉及锁所以再次确认比较好
//pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
// pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
DoThroughDB(client);
if (IsNeedUpdateCache()) {
DoUpdateCache(client);
Expand Down
47 changes: 23 additions & 24 deletions src/cache/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.


#pragma once

#include <cstdint>
Expand All @@ -14,19 +13,19 @@ namespace cache {

/* Redis maxmemory strategies */
enum RedisMaxmemoryPolicy {
CACHE_VOLATILE_LRU = 0,
CACHE_ALLKEYS_LRU = 1,
CACHE_VOLATILE_LFU = 2,
CACHE_ALLKEYS_LFU = 3,
CACHE_VOLATILE_RANDOM = 4,
CACHE_ALLKEYS_RANDOM = 5,
CACHE_VOLATILE_TTL = 6,
CACHE_NO_EVICTION = 7
CACHE_VOLATILE_LRU = 0,
CACHE_ALLKEYS_LRU = 1,
CACHE_VOLATILE_LFU = 2,
CACHE_ALLKEYS_LFU = 3,
CACHE_VOLATILE_RANDOM = 4,
CACHE_ALLKEYS_RANDOM = 5,
CACHE_VOLATILE_TTL = 6,
CACHE_NO_EVICTION = 7
};

#define CACHE_DEFAULT_MAXMEMORY CONFIG_DEFAULT_MAXMEMORY // 10G
#define CACHE_DEFAULT_MAXMEMORY CONFIG_DEFAULT_MAXMEMORY // 10G
#define CACHE_DEFAULT_MAXMEMORY_SAMPLES CONFIG_DEFAULT_MAXMEMORY_SAMPLES
#define CACHE_DEFAULT_LFU_DECAY_TIME CONFIG_DEFAULT_LFU_DECAY_TIME
#define CACHE_DEFAULT_LFU_DECAY_TIME CONFIG_DEFAULT_LFU_DECAY_TIME

/*
* cache start pos
Expand All @@ -39,20 +38,20 @@ constexpr int CACHE_START_FROM_END = -1;
#define DEFAULT_CACHE_ITEMS_PER_KEY 512

struct CacheConfig {
uint64_t maxmemory; /* Can used max memory */
int32_t maxmemory_policy; /* Policy for key eviction */
int32_t maxmemory_samples; /* Precision of random sampling */
int32_t lfu_decay_time; /* LFU counter decay factor. */
int32_t zset_cache_start_direction;
int32_t zset_cache_field_num_per_key;
uint64_t maxmemory; /* Can used max memory */
int32_t maxmemory_policy; /* Policy for key eviction */
int32_t maxmemory_samples; /* Precision of random sampling */
int32_t lfu_decay_time; /* LFU counter decay factor. */
int32_t zset_cache_start_direction;
int32_t zset_cache_field_num_per_key;

CacheConfig()
: maxmemory(CACHE_DEFAULT_MAXMEMORY)
, maxmemory_policy(CACHE_NO_EVICTION)
, maxmemory_samples(CACHE_DEFAULT_MAXMEMORY_SAMPLES)
, lfu_decay_time(CACHE_DEFAULT_LFU_DECAY_TIME)
, zset_cache_start_direction(CACHE_START_FROM_BEGIN)
, zset_cache_field_num_per_key(DEFAULT_CACHE_ITEMS_PER_KEY){}
: maxmemory(CACHE_DEFAULT_MAXMEMORY),
maxmemory_policy(CACHE_NO_EVICTION),
maxmemory_samples(CACHE_DEFAULT_MAXMEMORY_SAMPLES),
lfu_decay_time(CACHE_DEFAULT_LFU_DECAY_TIME),
zset_cache_start_direction(CACHE_START_FROM_BEGIN),
zset_cache_field_num_per_key(DEFAULT_CACHE_ITEMS_PER_KEY) {}

CacheConfig& operator=(const CacheConfig& obj) {
maxmemory = obj.maxmemory;
Expand All @@ -64,4 +63,4 @@ struct CacheConfig {
return *this;
}
};
} // namespace cache
} // namespace cache
273 changes: 273 additions & 0 deletions src/cache/list.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "pstd_defer.h"
#include "redisCache.h"

namespace cache {

Status RedisCache::LIndex(std::string &key, int64_t index, std::string *element) {
sds val;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };
int ret = RcLIndex(cache_, kobj, index, &val);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
} else if (REDIS_ITEM_NOT_EXIST == ret) {
return Status::NotFound("index not exist");
}
return Status::Corruption("RcLIndex failed");
}

element->clear();
element->assign(val, sdslen(val));
sdsfree(val);

return Status::OK();
}

Status RedisCache::LInsert(std::string &key, storage::BeforeOrAfter &before_or_after, std::string &pivot,
std::string &value) {
int ret = RcFreeMemoryIfNeeded(cache_);
if (C_OK != ret) {
return Status::Corruption("[error] Free memory faild !");
}

int where = (before_or_after == storage::Before) ? REDIS_LIST_HEAD : REDIS_LIST_TAIL;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj *pobj = createObject(OBJ_STRING, sdsnewlen(pivot.data(), pivot.size()));
robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size()));
DEFER { DecrObjectsRefCount(kobj, pobj, vobj); };
int res = RcLInsert(cache_, kobj, where, pobj, vobj);
if (C_OK != res) {
if (REDIS_KEY_NOT_EXIST == res) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcLInsert failed");
}

return Status::OK();
}

Status RedisCache::LLen(const std::string &key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };
int ret = RcLLen(cache_, kobj, reinterpret_cast<unsigned long *>(len));
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcLLen failed");
}

return Status::OK();
}

Status RedisCache::LPop(std::string &key, std::string *element) {
sds val;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };
int ret = RcLPop(cache_, kobj, &val);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcLPop failed");
}

element->clear();
element->assign(val, sdslen(val));
sdsfree(val);

return Status::OK();
}

Status RedisCache::LPush(std::string &key, std::vector<std::string> &values) {
int ret = RcFreeMemoryIfNeeded(cache_);
if (C_OK != ret) {
return Status::Corruption("[error] Free memory faild !");
}

robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj **vals = (robj **)zcallocate(sizeof(robj *) * values.size());
for (unsigned int i = 0; i < values.size(); ++i) {
vals[i] = createObject(OBJ_STRING, sdsnewlen(values[i].data(), values[i].size()));
}
DEFER {
FreeObjectList(vals, values.size());
DecrObjectsRefCount(kobj);
};
int res = RcLPush(cache_, kobj, vals, values.size());
if (C_OK != res) {
return Status::Corruption("RcLPush failed");
}

return Status::OK();
}

Status RedisCache::LPushx(std::string &key, std::vector<std::string> &values) {
int ret = RcFreeMemoryIfNeeded(cache_);
if (C_OK != ret) {
return Status::Corruption("[error] Free memory faild !");
}

robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj **vals = (robj **)zcallocate(sizeof(robj *) * values.size());
for (unsigned int i = 0; i < values.size(); ++i) {
vals[i] = createObject(OBJ_STRING, sdsnewlen(values[i].data(), values[i].size()));
}
DEFER {
FreeObjectList(vals, values.size());
DecrObjectsRefCount(kobj);
};
int res = RcLPushx(cache_, kobj, vals, values.size());
if (C_OK != res) {
if (REDIS_KEY_NOT_EXIST == res) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcLPushx failed");
}

return Status::OK();
}

Status RedisCache::LRange(std::string &key, int64_t start, int64_t stop, std::vector<std::string> *values) {
sds *vals = nullptr;
uint64_t vals_size = 0;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };
int ret = RcLRange(cache_, kobj, start, stop, &vals, reinterpret_cast<unsigned long *>(&vals_size));
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcLRange failed");
}

for (uint64_t i = 0; i < vals_size; ++i) {
values->push_back(std::string(vals[i], sdslen(vals[i])));
}

FreeSdsList(vals, vals_size);
return Status::OK();
}

Status RedisCache::LRem(std::string &key, int64_t count, std::string &value) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size()));
DEFER { DecrObjectsRefCount(kobj, vobj); };
int ret = RcLRem(cache_, kobj, count, vobj);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcLRem failed");
}

return Status::OK();
}

Status RedisCache::LSet(std::string &key, int64_t index, std::string &value) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj *vobj = createObject(OBJ_STRING, sdsnewlen(value.data(), value.size()));
DEFER { DecrObjectsRefCount(kobj, vobj); };
int ret = RcLSet(cache_, kobj, index, vobj);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
} else if (REDIS_ITEM_NOT_EXIST == ret) {
return Status::NotFound("item not exist");
}
return Status::Corruption("RcLSet failed");
}

return Status::OK();
}

Status RedisCache::LTrim(std::string &key, int64_t start, int64_t stop) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };
int ret = RcLTrim(cache_, kobj, start, stop);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
} else {
return Status::Corruption("RcLTrim failed");
}
}

return Status::OK();
}

Status RedisCache::RPop(std::string &key, std::string *element) {
sds val;
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER { DecrObjectsRefCount(kobj); };
int ret = RcRPop(cache_, kobj, &val);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcRPop failed");
}

element->clear();
element->assign(val, sdslen(val));
sdsfree(val);

return Status::OK();
}

Status RedisCache::RPush(std::string &key, std::vector<std::string> &values) {
int res = RcFreeMemoryIfNeeded(cache_);
if (C_OK != res) {
return Status::Corruption("[error] Free memory faild !");
}

robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj **vals = (robj **)zcallocate(sizeof(robj *) * values.size());
for (unsigned int i = 0; i < values.size(); ++i) {
vals[i] = createObject(OBJ_STRING, sdsnewlen(values[i].data(), values[i].size()));
}
DEFER {
FreeObjectList(vals, values.size());
DecrObjectsRefCount(kobj);
};
int ret = RcRPush(cache_, kobj, vals, values.size());
if (C_OK != ret) {
return Status::Corruption("RcRPush failed");
}

return Status::OK();
}

Status RedisCache::RPushx(std::string &key, std::vector<std::string> &values) {
int res = RcFreeMemoryIfNeeded(cache_);
if (C_OK != res) {
return Status::Corruption("[error] Free memory faild !");
}

robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
robj **vals = (robj **)zcallocate(sizeof(robj *) * values.size());
for (unsigned int i = 0; i < values.size(); ++i) {
vals[i] = createObject(OBJ_STRING, sdsnewlen(values[i].data(), values[i].size()));
}
DEFER {
FreeObjectList(vals, values.size());
DecrObjectsRefCount(kobj);
};
int ret = RcRPushx(cache_, kobj, vals, values.size());
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcRPushx failed");
}

return Status::OK();
}

} // namespace cache
Loading

0 comments on commit 3e3b0e7

Please sign in to comment.