Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: let mset/msetnx/setx/setbit/SDiffstore/SInterstore/SMove/SPop/SUnionstore command use new batch #320

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector<st
return rocksdb::Status::Corruption("SDiffsotre invalid parameter, no keys");
}

rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot;

Expand Down Expand Up @@ -354,23 +354,23 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector<st
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.SetCount(static_cast<int32_t>(members.size()));
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
} else if (s.IsNotFound()) {
char str[4];
EncodeFixed32(str, members.size());
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
} else {
return s;
}
for (const auto& member : members) {
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
}
*ret = static_cast<int32_t>(members.size());
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic);
value_to_dest = std::move(members);
return s;
Expand Down Expand Up @@ -460,7 +460,7 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector<s
return rocksdb::Status::Corruption("SInterstore invalid parameter, no keys");
}

rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot;

Expand Down Expand Up @@ -549,23 +549,23 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector<s
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.SetCount(static_cast<int32_t>(members.size()));
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
} else if (s.IsNotFound()) {
char str[4];
EncodeFixed32(str, members.size());
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
} else {
return s;
}
for (const auto& member : members) {
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
}
*ret = static_cast<int32_t>(members.size());
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic);
value_to_dest = std::move(members);
return s;
Expand Down Expand Up @@ -679,7 +679,7 @@ Status Redis::SMembersWithTTL(const Slice& key, std::vector<std::string>* member

rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;

uint64_t version = 0;
Expand Down Expand Up @@ -712,8 +712,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.ModifyCount(-1);
batch.Put(handles_[kSetsMetaCF], base_source.Encode(), meta_value);
batch.Delete(handles_[kSetsDataCF], sets_member_key.Encode());
batch->Put(kSetsMetaCF, base_source.Encode(), meta_value);
batch->Delete(kSetsDataCF, sets_member_key.Encode());
statistic++;
} else if (s.IsNotFound()) {
*ret = 0;
Expand All @@ -736,10 +736,10 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
if (parsed_sets_meta_value.IsStale() || parsed_sets_meta_value.Count() == 0) {
version = parsed_sets_meta_value.InitialMetaValue();
parsed_sets_meta_value.SetCount(1);
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue i_val(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), i_val.Encode());
} else {
std::string member_value;
version = parsed_sets_meta_value.Version();
Expand All @@ -751,8 +751,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
}
parsed_sets_meta_value.ModifyCount(1);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
} else if (!s.ok()) {
return s;
}
Expand All @@ -762,14 +762,14 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
EncodeFixed32(str, 1);
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
} else {
return s;
}
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, source.ToString(), 1);
return s;
}
Expand All @@ -778,7 +778,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
std::default_random_engine engine;

std::string meta_value;
rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
ScopeRecordLock l(lock_mgr_, key);

uint64_t start_us = pstd::NowMicros();
Expand All @@ -801,14 +801,14 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
auto iter = db_->NewIterator(default_read_options_, handles_[kSetsDataCF]);
for (iter->Seek(sets_member_key.EncodeSeekKey()); iter->Valid() && cur_index < size;
iter->Next(), cur_index++) {
batch.Delete(handles_[kSetsDataCF], iter->key());
batch->Delete(kSetsDataCF, iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
}

// parsed_sets_meta_value.ModifyCount(-cnt);
// batch.Put(handles_[kSetsMetaCF], key, meta_value);
batch.Delete(handles_[kSetsMetaCF], base_meta_key.Encode());
batch->Delete(kSetsMetaCF, base_meta_key.Encode());
delete iter;

} else {
Expand Down Expand Up @@ -838,7 +838,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
}
if (sets_index.find(cur_index) != sets_index.end()) {
del_count++;
batch.Delete(handles_[kSetsDataCF], iter->key());
batch->Delete(kSetsDataCF, iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
}
Expand All @@ -848,14 +848,14 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.ModifyCount(static_cast<int32_t>(-cnt));
batch.Put(handles_[kSetsMetaCF], base_meta_key.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_meta_key.Encode(), meta_value);
delete iter;
}
}
} else {
return s;
}
return db_->Write(default_write_options_, &batch);
return batch->Commit();
}

rocksdb::Status Redis::ResetSpopCount(const std::string& key) { return spop_counts_store_->Remove(key); }
Expand Down Expand Up @@ -1042,7 +1042,7 @@ rocksdb::Status Redis::SUnionstore(const Slice& destination, const std::vector<s
return rocksdb::Status::Corruption("SUnionstore invalid parameter, no keys");
}

rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot;

Expand Down Expand Up @@ -1097,23 +1097,23 @@ rocksdb::Status Redis::SUnionstore(const Slice& destination, const std::vector<s
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.SetCount(static_cast<int32_t>(members.size()));
batch.Put(handles_[kSetsMetaCF], destination, meta_value);
batch->Put(kSetsMetaCF, destination, meta_value);
} else if (s.IsNotFound()) {
char str[4];
EncodeFixed32(str, members.size());
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
} else {
return s;
}
for (const auto& member : members) {
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue i_val(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), i_val.Encode());
}
*ret = static_cast<int32_t>(members.size());
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic);
value_to_dest = std::move(members);
return s;
Expand Down
14 changes: 9 additions & 5 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,13 @@ Status Redis::MSet(const std::vector<KeyValue>& kvs) {
}

MultiScopeRecordLock ml(lock_mgr_, keys);
rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
for (const auto& kv : kvs) {
BaseKey base_key(kv.key);
StringsValue strings_value(kv.value);
batch.Put(base_key.Encode(), strings_value.Encode());
batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode());
}
return db_->Write(default_write_options_, &batch);
return batch->Commit();
}

Status Redis::MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret) {
Expand Down Expand Up @@ -712,7 +712,9 @@ Status Redis::SetBit(const Slice& key, int64_t offset, int32_t on, int32_t* ret)
}
StringsValue strings_value(data_value);
strings_value.SetEtime(timestamp);
return db_->Put(rocksdb::WriteOptions(), base_key.Encode(), strings_value.Encode());
auto batch = Batch::CreateBatch(this);
batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode());
return batch->Commit();
} else {
return s;
}
Expand All @@ -730,7 +732,9 @@ Status Redis::Setex(const Slice& key, const Slice& value, uint64_t ttl) {

BaseKey base_key(key);
ScopeRecordLock l(lock_mgr_, key);
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
auto batch = Batch::CreateBatch(this);
batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode());
return batch->Commit();
}

Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, const uint64_t ttl) {
Expand Down
Loading
Loading