Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement locking in CommitMultiStore #46

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,4 @@ retract (

replace github.com/cometbft/cometbft => github.com/dydxprotocol/cometbft v0.38.6-0.20240220185844-e704122c8540

replace cosmossdk.io/store => github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240326190927-d35618165018
replace cosmossdk.io/store => github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240515175455-8168b4407fac
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/dydxprotocol/cometbft v0.38.6-0.20240220185844-e704122c8540 h1:pkYQbAdOAAoZBSId9kLupCgZHj8YvA9LzM31fVYpjlw=
github.com/dydxprotocol/cometbft v0.38.6-0.20240220185844-e704122c8540/go.mod h1:REQN+ObgfYxi39TcYR/Hv95C9bPxY3sYJCvghryj7vY=
github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240326190927-d35618165018 h1:Dn08pzQTajFp1GHaZFd0istbjl793PaT50vfj4mVKNs=
github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240326190927-d35618165018/go.mod h1:zMcD3hfNwd0WMTpdRUhS3QxoCoEtBXWeoKsu3iaLBbQ=
github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240515175455-8168b4407fac h1:frUaYZlrs9/Tk8fAHjMhcrpk73UEZ36fD7s+megReKQ=
github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240515175455-8168b4407fac/go.mod h1:zMcD3hfNwd0WMTpdRUhS3QxoCoEtBXWeoKsu3iaLBbQ=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
Expand Down
363 changes: 363 additions & 0 deletions store/cachemulti/locking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
package cachemulti_test

import (
"sync"
"testing"
"time"

"cosmossdk.io/log"
"cosmossdk.io/store/metrics"
pruningtypes "cosmossdk.io/store/pruning/types"
"cosmossdk.io/store/rootmulti"
"cosmossdk.io/store/types"
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStore_LinearizeReadsAndWrites(t *testing.T) {
key := []byte("kv_store_key")
storeKey := types.NewKVStoreKey("store1")
lockKey := []byte("a")

db := dbm.NewMemDB()
store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db)
err := store.LoadLatestVersion()
assert.NoError(t, err)
lockingCms := store.LockingCacheMultiStore()

wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()

lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
kvStore := lockingCms.GetKVStore(storeKey)
v := kvStore.Get(key)
if v == nil {
kvStore.Set(key, []byte{1})
} else {
v[0]++
kvStore.Set(key, v)
}
lockingCms.Write()
}()
}

wg.Wait()
require.Equal(t, []byte{100}, lockingCms.GetKVStore(storeKey).Get(key))
}

func TestStore_LockOrderToPreventDeadlock(t *testing.T) {
key := []byte("kv_store_key")
storeKey := types.NewKVStoreKey("store1")
lockKeyA := []byte("a")
lockKeyB := []byte("b")

db := dbm.NewMemDB()
store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db)
err := store.LoadLatestVersion()
assert.NoError(t, err)
lockingCms := store.LockingCacheMultiStore()

// Acquire keys in two different orders ensuring that we don't reach deadlock.
wg := sync.WaitGroup{}
wg.Add(200)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()

lockingCms.Lock([][]byte{lockKeyA, lockKeyB})
defer lockingCms.Unlock([][]byte{lockKeyA, lockKeyB})
kvStore := lockingCms.GetKVStore(storeKey)
v := kvStore.Get(key)
if v == nil {
kvStore.Set(key, []byte{1})
} else {
v[0]++
kvStore.Set(key, v)
}
lockingCms.Write()
}()

go func() {
defer wg.Done()

lockingCms.Lock([][]byte{lockKeyB, lockKeyA})
defer lockingCms.Unlock([][]byte{lockKeyB, lockKeyA})
kvStore := lockingCms.GetKVStore(storeKey)
v := kvStore.Get(key)
if v == nil {
kvStore.Set(key, []byte{1})
} else {
v[0]++
kvStore.Set(key, v)
}
lockingCms.Write()
}()
}

wg.Wait()
require.Equal(t, []byte{200}, lockingCms.GetKVStore(storeKey).Get(key))
}

func TestStore_AllowForParallelUpdates(t *testing.T) {
storeKey := types.NewKVStoreKey("store1")

db := dbm.NewMemDB()
store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db)
err := store.LoadLatestVersion()
assert.NoError(t, err)
lockingCms := store.LockingCacheMultiStore()

wg := sync.WaitGroup{}
wg.Add(100)

for i := byte(0); i < 100; i++ {
k := []byte{i}
go func() {
defer wg.Done()

// We specifically don't unlock the keys during processing so that we can show that we must process all
// of these in parallel before the wait group is done.
lockingCms.Lock([][]byte{k})
lockingCms.GetKVStore(storeKey).Set(k, k)
lockingCms.Write()
}()
}

wg.Wait()
for i := byte(0); i < 100; i++ {
lockingCms.Unlock([][]byte{{i}})
}
for i := byte(0); i < 100; i++ {
require.Equal(t, []byte{i}, lockingCms.GetKVStore(storeKey).Get([]byte{i}))
}
}

func TestStore_AddLocksDuringTransaction(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding - doesn't this mean that lock ordering can be non deterministic across txns which can lead to deadlocks? am I missing something obvious

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. I put this in the comment in cachemulti/store.go. It is the user's responsibility to acquire these locks in an order that won't result in deadlock.

For example, I might make it an invariant to always take clob-id-based locks before subaccount-based locks for some set of goroutines that may run concurrently. But it would also follow that taking subaccount-based locks in two different Lock() statements would be unsafe (unless I am able to further bifurcate the subaccount keys and guarantee that each key will always be in either the first or second lock statement).

key := []byte("kv_store_key")
storeKey := types.NewKVStoreKey("store1")
lockKey := []byte("lockkey")

db := dbm.NewMemDB()
store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db)
err := store.LoadLatestVersion()
assert.NoError(t, err)
lockingCms := store.LockingCacheMultiStore()

wg := sync.WaitGroup{}
wg.Add(100)
for i := byte(0); i < 100; i++ {
k := []byte{i}
go func() {
defer wg.Done()

lockingCms.Lock([][]byte{k})
defer lockingCms.Unlock([][]byte{k})
lockingCms.GetKVStore(storeKey).Set(k, k)

lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
kvStore := lockingCms.GetKVStore(storeKey)
v := kvStore.Get(key)
if v == nil {
kvStore.Set(key, []byte{1})
} else {
v[0]++
kvStore.Set(key, v)
}
lockingCms.Write()
}()
}

wg.Wait()
for i := byte(0); i < 100; i++ {
require.Equal(t, []byte{i}, lockingCms.GetKVStore(storeKey).Get([]byte{i}))
}
require.Equal(t, []byte{100}, lockingCms.GetKVStore(storeKey).Get(key))
}

func TestStore_MaintainLockOverMultipleTransactions(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding - what is the intended use case for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example:
Order placement involves matching with the best order, doing the balance changes for that match, and then repeating for the next order. However doing the balance changes requires modifying the fee collector. I don't want to hold a lock on the fee collector for the majority of the transaction; it would kill parallelism.

So then I separate this into an individual commit per potential maker order. This allows me to take the fee collector lock at the very end and release it before going into the next potential match.

However that would mean that individual matches for a clob pair might be interleaved with matches from another taker order. I would want to hold a clob pair lock for the whole sequence of transactions to prevent this.

keyA := []byte("kv_store_key_a")
keyB := []byte("kv_store_key_b")
storeKey := types.NewKVStoreKey("store1")
lockKeyA := []byte("lockkeya")
lockKeyB := []byte("lockkeyb")

db := dbm.NewMemDB()
store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db)
err := store.LoadLatestVersion()
assert.NoError(t, err)
lockingCms := store.LockingCacheMultiStore()

// Key A is set differently in the first and second transaction so we can check it
// to see what transaction was run last.
lockingCms.GetKVStore(storeKey).Set(keyA, []byte{0})
lockingCms.GetKVStore(storeKey).Set(keyB, []byte{0})

wg := sync.WaitGroup{}
wg.Add(100)
for i := byte(0); i < 100; i++ {
k := []byte{i}
go func() {
defer wg.Done()

lockingCms.Lock([][]byte{k})
defer lockingCms.Unlock([][]byte{k})
lockingCms.GetKVStore(storeKey).Set(k, k)

lockingCms.Lock([][]byte{lockKeyA})
defer lockingCms.Unlock([][]byte{lockKeyA})

func() {
lockingCms.Lock([][]byte{lockKeyB})
defer lockingCms.Unlock([][]byte{lockKeyB})

assert.Equal(t, []byte{0}, lockingCms.GetKVStore(storeKey).Get(keyA))
lockingCms.GetKVStore(storeKey).Set(keyA, []byte{1})
v := lockingCms.GetKVStore(storeKey).Get(keyB)
v[0]++
lockingCms.GetKVStore(storeKey).Set(keyB, v)
lockingCms.Write()
}()

func() {
lockingCms.Lock([][]byte{lockKeyB})
defer lockingCms.Unlock([][]byte{lockKeyB})

assert.Equal(t, []byte{1}, lockingCms.GetKVStore(storeKey).Get(keyA))
lockingCms.GetKVStore(storeKey).Set(keyA, []byte{0})
v := lockingCms.GetKVStore(storeKey).Get(keyB)
v[0]++
lockingCms.GetKVStore(storeKey).Set(keyB, v)
lockingCms.Write()
}()
}()
}

wg.Wait()
require.Equal(t, []byte{200}, lockingCms.GetKVStore(storeKey).Get(keyB))
}

func TestStore_ReadWriteLock(t *testing.T) {
numReadersKey := []byte("kv_store_key_a")
numWritersKey := []byte("kv_store_key_b")
maxNumReadersKey := []byte("kv_store_key_c")
maxNumWritersKey := []byte("kv_store_key_d")
storeKey := types.NewKVStoreKey("store1")
rwLockKey := []byte("lockkeya")
lockKey := []byte("lockkeyb")

db := dbm.NewMemDB()
store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics())
store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db)
err := store.LoadLatestVersion()
assert.NoError(t, err)
lockingCms := store.LockingCacheMultiStore()

lockingCms.GetKVStore(storeKey).Set(numReadersKey, []byte{0})
lockingCms.GetKVStore(storeKey).Set(numWritersKey, []byte{0})
lockingCms.GetKVStore(storeKey).Set(maxNumReadersKey, []byte{0})
lockingCms.GetKVStore(storeKey).Set(maxNumWritersKey, []byte{0})

wg := sync.WaitGroup{}
wg.Add(200)
// Start 100 readers and 100 writers. Record the maximum number of readers and writers seen.
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()

lockingCms.RLockRW([][]byte{rwLockKey})
defer lockingCms.RUnlockRW([][]byte{rwLockKey})

func() {
lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
v := lockingCms.GetKVStore(storeKey).Get(numReadersKey)
v[0]++
lockingCms.GetKVStore(storeKey).Set(numReadersKey, v)
lockingCms.Write()
}()

time.Sleep(100 * time.Millisecond)

func() {
lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
numReaders := lockingCms.GetKVStore(storeKey).Get(numReadersKey)[0]
maxNumReaders := lockingCms.GetKVStore(storeKey).Get(maxNumReadersKey)[0]
if numReaders > maxNumReaders {
lockingCms.GetKVStore(storeKey).Set(maxNumReadersKey, []byte{numReaders})
}
lockingCms.Write()
}()

func() {
lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
v := lockingCms.GetKVStore(storeKey).Get(numReadersKey)
v[0]--
lockingCms.GetKVStore(storeKey).Set(numReadersKey, v)
lockingCms.Write()
}()
}()

go func() {
defer wg.Done()

lockingCms.LockRW([][]byte{rwLockKey})
defer lockingCms.UnlockRW([][]byte{rwLockKey})

func() {
lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
v := lockingCms.GetKVStore(storeKey).Get(numWritersKey)
v[0]++
lockingCms.GetKVStore(storeKey).Set(numWritersKey, v)
lockingCms.Write()
}()

func() {
lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
numWriters := lockingCms.GetKVStore(storeKey).Get(numWritersKey)[0]
maxNumWriters := lockingCms.GetKVStore(storeKey).Get(maxNumWritersKey)[0]
if numWriters > maxNumWriters {
lockingCms.GetKVStore(storeKey).Set(maxNumWritersKey, []byte{numWriters})
}
lockingCms.Write()
lockingCms.Write()
}()

func() {
lockingCms.Lock([][]byte{lockKey})
defer lockingCms.Unlock([][]byte{lockKey})
v := lockingCms.GetKVStore(storeKey).Get(numWritersKey)
v[0]--
lockingCms.GetKVStore(storeKey).Set(numWritersKey, v)
lockingCms.Write()
}()
}()
}

wg.Wait()
// At some point there should be more than one reader. If this test is flaky, sleep time
// can be added to the reader to deflake.
require.Less(t, []byte{1}, lockingCms.GetKVStore(storeKey).Get(maxNumReadersKey))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test failing here, possibly flaky

// There must be at most one writer at once.
require.Equal(t, []byte{1}, lockingCms.GetKVStore(storeKey).Get(maxNumWritersKey))
}
Loading
Loading