diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index 57568e2cd79..d63e813975c 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -42,7 +42,7 @@ type peerStatus interface { type reserve interface { storer.RadiusChecker - ReserveSize() int + ReserveCapacityDoubling() int } type service struct { @@ -221,10 +221,12 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, } } + networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling()) + selfHealth := true - if nHoodRadius == networkRadius && s.reserve.StorageRadius() != networkRadius { + if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius { selfHealth = false - s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius) + s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius) } s.isSelfHealthy.Store(selfHealth) diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index ecfe7bdfe45..4aa7926af90 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -17,6 +17,7 @@ import ( mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" topMock "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/ethersphere/bee/v2/pkg/util/testutil" ) type peer struct { @@ -116,6 +117,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { ) service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -127,10 +129,44 @@ func TestSelfUnhealthyRadius(t *testing.T) { if service.IsHealthy() { t.Fatalf("self should NOT be healthy") } +} - if err := service.Close(); err != nil { +func TestSelfHealthyCapacityDoubling(t *testing.T) { + t.Parallel() + peers := []peer{ + // fully healhy + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + } + + statusM := &statusMock{make(map[string]peer)} + addrs := make([]swarm.Address, 0, len(peers)) + for _, p := range peers { + addrs = append(addrs, p.addr) + statusM.peers[p.addr.ByteString()] = p + } + + topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) + + reserve := mockstorer.NewReserve( + mockstorer.WithRadius(6), + mockstorer.WithReserveSize(100), + mockstorer.WithCapacityDoubling(2), + ) + + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) + + err := spinlock.Wait(time.Minute, func() bool { + return len(topM.PeersHealth()) == len(peers) + }) + if err != nil { t.Fatal(err) } + + if !service.IsHealthy() { + t.Fatalf("self should be healthy") + } } func TestSubToRadius(t *testing.T) { @@ -182,6 +218,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() unsub() @@ -191,10 +228,6 @@ func TestUnsub(t *testing.T) { t.Fatal("should not have received an address") case <-time.After(time.Second): } - - if err := service.Close(); err != nil { - t.Fatal(err) - } } type statusMock struct { diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index fbd27330f9f..05e164c6784 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -78,6 +78,12 @@ func WithReserveSize(s int) Option { }) } +func WithCapacityDoubling(s int) Option { + return optionFunc(func(p *ReserveStore) { + p.capacityDoubling = s + }) +} + func WithPutHook(f func(swarm.Chunk) error) Option { return optionFunc(func(p *ReserveStore) { p.putHook = f @@ -106,8 +112,9 @@ type ReserveStore struct { cursorsErr error epoch uint64 - radius uint8 - reservesize int + radius uint8 + reservesize int + capacityDoubling int subResponses []chunksResponse putHook func(swarm.Chunk) error @@ -171,6 +178,10 @@ func (s *ReserveStore) ReserveSize() int { return s.reservesize } +func (s *ReserveStore) ReserveCapacityDoubling() int { + return s.capacityDoubling +} + func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) { return s.cursors, s.epoch, s.cursorsErr } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 9983c5c6f80..c59f2f10373 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math" + "math/bits" "slices" "sync" "sync/atomic" @@ -84,7 +85,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) { radius := db.StorageRadius() evictBatches := make(map[string]bool) - err := db.reserve.IterateChunksItems(0, func(ci *reserve.ChunkBinItem) (bool, error) { if ci.Bin >= radius { count++ @@ -412,6 +412,10 @@ func (db *DB) StorageRadius() uint8 { return db.reserve.Radius() } +func (db *DB) ReserveCapacityDoubling() int { + return db.reserveOptions.capacityDoubling +} + func (db *DB) ReserveSize() int { if db.reserve == nil { return 0 @@ -493,6 +497,95 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan }, errC } +type NeighborhoodStat struct { + Address swarm.Address + ChunkCount int +} + +func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) { + + radius := db.StorageRadius() + + networkRadius := radius + uint8(db.reserveOptions.capacityDoubling) + + var neighs []*NeighborhoodStat + for _, n := range neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) { + neighs = append(neighs, &NeighborhoodStat{Address: n}) + } + + err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) { + for _, n := range neighs { + if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius { + n.ChunkCount++ + break + } + } + return false, nil + }) + if err != nil { + return nil, err + } + + return neighs, err +} + +func neighborhoodPrefixes(base swarm.Address, radius int, suffixLength int) []swarm.Address { + bitCombinationsCount := int(math.Pow(2, float64(suffixLength))) + bitSuffixes := make([]uint8, bitCombinationsCount) + + for i := 0; i < bitCombinationsCount; i++ { + bitSuffixes[i] = uint8(i) + } + + binPrefixes := make([]swarm.Address, bitCombinationsCount) + + // copy base address + for i := range binPrefixes { + binPrefixes[i] = base.Clone() + } + + for j := range binPrefixes { + pseudoAddrBytes := binPrefixes[j].Bytes() + + // set pseudo suffix + bitSuffixPos := suffixLength - 1 + for l := radius + 0; l < radius+suffixLength+1; l++ { + index, pos := l/8, l%8 + + if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { + pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } else { + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } + + bitSuffixPos-- + } + + // clear rest of the bits + for l := radius + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { + index, pos := l/8, l%8 + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } + } + + return binPrefixes +} + +// Clears the bit at pos in n. +func clearBit(n, pos uint8) uint8 { + mask := ^(uint8(1) << pos) + return n & mask +} + +// Sets the bit at pos in the integer n. +func setBit(n, pos uint8) uint8 { + return n | 1< 0 +} + // expiredBatchItem is a storage.Item implementation for expired batches. type expiredBatchItem struct { BatchID []byte diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index ce5d253be96..8fd04a0581e 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -666,6 +666,97 @@ func TestSubscribeBinTrigger(t *testing.T) { }) } +func TestNeighborhoodStats(t *testing.T) { + t.Parallel() + + const ( + chunkCountPerPO = 32 + maxPO = 6 + networkRadius uint8 = 5 + doublingFactor uint8 = 2 + localRadius uint8 = networkRadius - doublingFactor + ) + + mustParse := func(s string) swarm.Address { + addr, err := swarm.ParseBitStrAddress(s) + if err != nil { + t.Fatal(err) + } + return addr + } + + var ( + baseAddr = mustParse("100000") + sister1 = mustParse("100010") + sister2 = mustParse("100100") + sister3 = mustParse("100110") + ) + + putChunks := func(addr swarm.Address, startingRadius int, st *storer.DB) { + putter := st.ReservePutter() + for i := 0; i < chunkCountPerPO; i++ { + ch := chunk.GenerateValidRandomChunkAt(addr, startingRadius) + err := putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + } + + testF := func(t *testing.T, st *storer.DB) { + t.Helper() + + putChunks(baseAddr, int(networkRadius), st) + putChunks(sister1, int(networkRadius), st) + putChunks(sister2, int(networkRadius), st) + putChunks(sister3, int(networkRadius), st) + + time.Sleep(time.Second) + + neighs, err := st.NeighborhoodsStat(context.Background()) + if err != nil { + t.Fatal(err) + } + + if len(neighs) != (1 << doublingFactor) { + t.Fatalf("number of neighborhoods does not matche. wanted %d, got %d", 1<