diff --git a/pkg/node/node.go b/pkg/node/node.go index 8f767991150..b42e53737cd 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -248,6 +248,12 @@ func NewBee( } }(b) + if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 { + return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1") + } + + reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity + stateStore, stateStoreMetrics, err := InitStateStore(logger, o.DataDir, o.StatestoreCacheCapacity) if err != nil { return nil, err @@ -353,14 +359,6 @@ func NewBee( var batchStore postage.Storer = new(postage.NoOpBatchStore) var evictFn func([]byte) error - var reserveCapacity int - - if o.ReserveCapacityDoubling >= 0 && o.ReserveCapacityDoubling <= 1 { - reserveCapacity = 1 << (22 + o.ReserveCapacityDoubling) - } else { - return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1") - } - if chainEnabled { batchStore, err = batchstore.New( stateStore, @@ -735,6 +733,7 @@ func NewBee( lo.ReserveWakeUpDuration = reserveWakeUpDuration lo.ReserveMinEvictCount = reserveMinEvictCount lo.RadiusSetter = kad + lo.ReserveCapacityDoubling = o.ReserveCapacityDoubling } localStore, err := storer.New(ctx, path, lo) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d0ada7f74d3..d63e813975c 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -42,7 +42,7 @@ type peerStatus interface { type reserve interface { storer.RadiusChecker - ReserveSize() int + ReserveCapacityDoubling() int } type service struct { @@ -200,7 +200,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, continue } - if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) { + if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) { s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr) } else if peer.dur.Seconds() > pDur { s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr) @@ -221,10 +221,12 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, } } + networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling()) + selfHealth := true - if nHoodRadius == networkRadius && s.reserve.StorageRadius() != networkRadius { + if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius { selfHealth = false - s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius) + s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius) } s.isSelfHealthy.Store(selfHealth) diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index b408f06b95c..4aa7926af90 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -17,6 +17,7 @@ import ( mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" topMock "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/ethersphere/bee/v2/pkg/util/testutil" ) type peer struct { @@ -37,11 +38,11 @@ func TestSalud(t *testing.T) { {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - // healthy since radius >= most common radius - 1 + // healthy since radius >= most common radius - 2 {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, // radius too low - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 6, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false}, // dur too long {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false}, @@ -116,6 +117,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { ) service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -127,10 +129,44 @@ func TestSelfUnhealthyRadius(t *testing.T) { if service.IsHealthy() { t.Fatalf("self should NOT be healthy") } +} - if err := service.Close(); err != nil { +func TestSelfHealthyCapacityDoubling(t *testing.T) { + t.Parallel() + peers := []peer{ + // fully healhy + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + } + + statusM := &statusMock{make(map[string]peer)} + addrs := make([]swarm.Address, 0, len(peers)) + for _, p := range peers { + addrs = append(addrs, p.addr) + statusM.peers[p.addr.ByteString()] = p + } + + topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) + + reserve := mockstorer.NewReserve( + mockstorer.WithRadius(6), + mockstorer.WithReserveSize(100), + mockstorer.WithCapacityDoubling(2), + ) + + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) + + err := spinlock.Wait(time.Minute, func() bool { + return len(topM.PeersHealth()) == len(peers) + }) + if err != nil { t.Fatal(err) } + + if !service.IsHealthy() { + t.Fatalf("self should be healthy") + } } func TestSubToRadius(t *testing.T) { @@ -182,6 +218,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() unsub() @@ -191,10 +228,6 @@ func TestUnsub(t *testing.T) { t.Fatal("should not have received an address") case <-time.After(time.Second): } - - if err := service.Close(); err != nil { - t.Fatal(err) - } } type statusMock struct { diff --git a/pkg/storer/cachestore.go b/pkg/storer/cachestore.go index 5051e975da8..b4a759e57ac 100644 --- a/pkg/storer/cachestore.go +++ b/pkg/storer/cachestore.go @@ -40,8 +40,8 @@ func (db *DB) cacheWorker(ctx context.Context) { } evict := size - capc - if evict < db.opts.cacheMinEvictCount { // evict at least a min count - evict = db.opts.cacheMinEvictCount + if evict < db.reserveOptions.cacheMinEvictCount { // evict at least a min count + evict = db.reserveOptions.cacheMinEvictCount } dur := captureDuration(time.Now()) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 4053ca7cf6f..277e6b7c125 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -502,7 +502,6 @@ func (r *Reserve) IterateChunksItems(startBin uint8, cb func(*ChunkBinItem) (boo PrefixAtStart: true, }, func(res storage.Result) (bool, error) { item := res.Entry.(*ChunkBinItem) - stop, err := cb(item) if stop || err != nil { return true, err diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index fbd27330f9f..05e164c6784 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -78,6 +78,12 @@ func WithReserveSize(s int) Option { }) } +func WithCapacityDoubling(s int) Option { + return optionFunc(func(p *ReserveStore) { + p.capacityDoubling = s + }) +} + func WithPutHook(f func(swarm.Chunk) error) Option { return optionFunc(func(p *ReserveStore) { p.putHook = f @@ -106,8 +112,9 @@ type ReserveStore struct { cursorsErr error epoch uint64 - radius uint8 - reservesize int + radius uint8 + reservesize int + capacityDoubling int subResponses []chunksResponse putHook func(swarm.Chunk) error @@ -171,6 +178,10 @@ func (s *ReserveStore) ReserveSize() int { return s.reservesize } +func (s *ReserveStore) ReserveCapacityDoubling() int { + return s.capacityDoubling +} + func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) { return s.cursors, s.epoch, s.cursorsErr } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index fe389048344..a3e75ee239b 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math" + "math/bits" "slices" "sync" "sync/atomic" @@ -55,7 +56,7 @@ func (db *DB) startReserveWorkers( go db.reserveWorker(ctx) select { - case <-time.After(db.opts.reserveWarmupDuration): + case <-time.After(db.reserveOptions.warmupDuration): case <-db.quit: return } @@ -84,7 +85,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) { radius := db.StorageRadius() evictBatches := make(map[string]bool) - err := db.reserve.IterateChunksItems(0, func(ci *reserve.ChunkBinItem) (bool, error) { if ci.Bin >= radius { count++ @@ -121,7 +121,7 @@ func (db *DB) reserveWorker(ctx context.Context) { overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity) defer overCapUnsub() - thresholdTicker := time.NewTicker(db.opts.reserveWakeupDuration) + thresholdTicker := time.NewTicker(db.reserveOptions.wakeupDuration) defer thresholdTicker.Stop() _, _ = db.countWithinRadius(ctx) @@ -159,7 +159,7 @@ func (db *DB) reserveWorker(ctx context.Context) { continue } - if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.opts.minimumRadius { + if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.reserveOptions.minimumRadius { radius-- if err := db.reserve.SetRadius(radius); err != nil { db.logger.Error(err, "reserve set radius") @@ -362,8 +362,8 @@ func (db *DB) unreserve(ctx context.Context) (err error) { } evict := target - totalEvicted - if evict < int(db.opts.reserveMinEvictCount) { // evict at least a min count - evict = int(db.opts.reserveMinEvictCount) + if evict < int(db.reserveOptions.minEvictCount) { // evict at least a min count + evict = int(db.reserveOptions.minEvictCount) } binEvicted, err := db.evictBatch(ctx, b, evict, radius) @@ -412,6 +412,10 @@ func (db *DB) StorageRadius() uint8 { return db.reserve.Radius() } +func (db *DB) ReserveCapacityDoubling() int { + return db.reserveOptions.capacityDoubling +} + func (db *DB) ReserveSize() int { if db.reserve == nil { return 0 @@ -493,6 +497,96 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan }, errC } +type NeighborhoodStat struct { + Address swarm.Address + ChunkCount int +} + +func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) { + + radius := db.StorageRadius() + + networkRadius := radius + uint8(db.reserveOptions.capacityDoubling) + + prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) + neighs := make([]*NeighborhoodStat, len(prefixes)) + for i, n := range prefixes { + neighs[i] = &NeighborhoodStat{Address: n} + } + + err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) { + for _, n := range neighs { + if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius { + n.ChunkCount++ + break + } + } + return false, nil + }) + if err != nil { + return nil, err + } + + return neighs, err +} + +func neighborhoodPrefixes(base swarm.Address, radius int, suffixLength int) []swarm.Address { + bitCombinationsCount := int(math.Pow(2, float64(suffixLength))) + bitSuffixes := make([]uint8, bitCombinationsCount) + + for i := 0; i < bitCombinationsCount; i++ { + bitSuffixes[i] = uint8(i) + } + + binPrefixes := make([]swarm.Address, bitCombinationsCount) + + // copy base address + for i := range binPrefixes { + binPrefixes[i] = base.Clone() + } + + for j := range binPrefixes { + pseudoAddrBytes := binPrefixes[j].Bytes() + + // set pseudo suffix + bitSuffixPos := suffixLength - 1 + for l := radius + 0; l < radius+suffixLength+1; l++ { + index, pos := l/8, l%8 + + if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { + pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } else { + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } + + bitSuffixPos-- + } + + // clear rest of the bits + for l := radius + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { + index, pos := l/8, l%8 + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } + } + + return binPrefixes +} + +// Clears the bit at pos in n. +func clearBit(n, pos uint8) uint8 { + mask := ^(uint8(1) << pos) + return n & mask +} + +// Sets the bit at pos in the integer n. +func setBit(n, pos uint8) uint8 { + return n | 1< 0 +} + // expiredBatchItem is a storage.Item implementation for expired batches. type expiredBatchItem struct { BatchID []byte diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index ce5d253be96..8fd04a0581e 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -666,6 +666,97 @@ func TestSubscribeBinTrigger(t *testing.T) { }) } +func TestNeighborhoodStats(t *testing.T) { + t.Parallel() + + const ( + chunkCountPerPO = 32 + maxPO = 6 + networkRadius uint8 = 5 + doublingFactor uint8 = 2 + localRadius uint8 = networkRadius - doublingFactor + ) + + mustParse := func(s string) swarm.Address { + addr, err := swarm.ParseBitStrAddress(s) + if err != nil { + t.Fatal(err) + } + return addr + } + + var ( + baseAddr = mustParse("100000") + sister1 = mustParse("100010") + sister2 = mustParse("100100") + sister3 = mustParse("100110") + ) + + putChunks := func(addr swarm.Address, startingRadius int, st *storer.DB) { + putter := st.ReservePutter() + for i := 0; i < chunkCountPerPO; i++ { + ch := chunk.GenerateValidRandomChunkAt(addr, startingRadius) + err := putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + } + + testF := func(t *testing.T, st *storer.DB) { + t.Helper() + + putChunks(baseAddr, int(networkRadius), st) + putChunks(sister1, int(networkRadius), st) + putChunks(sister2, int(networkRadius), st) + putChunks(sister3, int(networkRadius), st) + + time.Sleep(time.Second) + + neighs, err := st.NeighborhoodsStat(context.Background()) + if err != nil { + t.Fatal(err) + } + + if len(neighs) != (1 << doublingFactor) { + t.Fatalf("number of neighborhoods does not matche. wanted %d, got %d", 1<