Skip to content

Commit

Permalink
fix: salud and neighborhood stat
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Oct 7, 2024
1 parent 802dbab commit 877601a
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 15 deletions.
8 changes: 5 additions & 3 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type peerStatus interface {

type reserve interface {
storer.RadiusChecker
ReserveSize() int
ReserveCapacityDoubling() int
}

type service struct {
Expand Down Expand Up @@ -221,10 +221,12 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}
}

networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling())

selfHealth := true
if nHoodRadius == networkRadius && s.reserve.StorageRadius() != networkRadius {
if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius {
selfHealth = false
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius)
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius)
}

s.isSelfHealthy.Store(selfHealth)
Expand Down
43 changes: 38 additions & 5 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
topMock "github.com/ethersphere/bee/v2/pkg/topology/mock"
"github.com/ethersphere/bee/v2/pkg/util/testutil"
)

type peer struct {
Expand Down Expand Up @@ -116,6 +117,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand All @@ -127,10 +129,44 @@ func TestSelfUnhealthyRadius(t *testing.T) {
if service.IsHealthy() {
t.Fatalf("self should NOT be healthy")
}
}

if err := service.Close(); err != nil {
func TestSelfHealthyCapacityDoubling(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
}

statusM := &statusMock{make(map[string]peer)}
addrs := make([]swarm.Address, 0, len(peers))
for _, p := range peers {
addrs = append(addrs, p.addr)
statusM.peers[p.addr.ByteString()] = p
}

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

reserve := mockstorer.NewReserve(
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
})
if err != nil {
t.Fatal(err)
}

if !service.IsHealthy() {
t.Fatalf("self should be healthy")
}
}

func TestSubToRadius(t *testing.T) {
Expand Down Expand Up @@ -182,6 +218,7 @@ func TestUnsub(t *testing.T) {
topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
unsub()
Expand All @@ -191,10 +228,6 @@ func TestUnsub(t *testing.T) {
t.Fatal("should not have received an address")
case <-time.After(time.Second):
}

if err := service.Close(); err != nil {
t.Fatal(err)
}
}

type statusMock struct {
Expand Down
15 changes: 13 additions & 2 deletions pkg/storer/mock/mockreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func WithReserveSize(s int) Option {
})
}

func WithCapacityDoubling(s int) Option {
return optionFunc(func(p *ReserveStore) {
p.capacityDoubling = s
})
}

func WithPutHook(f func(swarm.Chunk) error) Option {
return optionFunc(func(p *ReserveStore) {
p.putHook = f
Expand Down Expand Up @@ -106,8 +112,9 @@ type ReserveStore struct {
cursorsErr error
epoch uint64

radius uint8
reservesize int
radius uint8
reservesize int
capacityDoubling int

subResponses []chunksResponse
putHook func(swarm.Chunk) error
Expand Down Expand Up @@ -171,6 +178,10 @@ func (s *ReserveStore) ReserveSize() int {
return s.reservesize
}

func (s *ReserveStore) ReserveCapacityDoubling() int {
return s.capacityDoubling
}

func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) {
return s.cursors, s.epoch, s.cursorsErr
}
Expand Down
95 changes: 94 additions & 1 deletion pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"math/bits"
"slices"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -84,7 +85,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) {
radius := db.StorageRadius()

evictBatches := make(map[string]bool)

err := db.reserve.IterateChunksItems(0, func(ci *reserve.ChunkBinItem) (bool, error) {
if ci.Bin >= radius {
count++
Expand Down Expand Up @@ -412,6 +412,10 @@ func (db *DB) StorageRadius() uint8 {
return db.reserve.Radius()
}

func (db *DB) ReserveCapacityDoubling() int {
return db.reserveOptions.capacityDoubling
}

func (db *DB) ReserveSize() int {
if db.reserve == nil {
return 0
Expand Down Expand Up @@ -493,6 +497,95 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan
}, errC
}

type NeighborhoodStat struct {
Address swarm.Address
ChunkCount int
}

func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) {

radius := db.StorageRadius()

networkRadius := radius + uint8(db.reserveOptions.capacityDoubling)

var neighs []*NeighborhoodStat

Check failure on line 511 in pkg/storer/reserve.go

View workflow job for this annotation

GitHub Actions / Lint

Consider pre-allocating `neighs` (prealloc)
for _, n := range neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) {
neighs = append(neighs, &NeighborhoodStat{Address: n})
}

err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) {
for _, n := range neighs {
if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius {
n.ChunkCount++
break
}
}
return false, nil
})
if err != nil {
return nil, err
}

return neighs, err
}

func neighborhoodPrefixes(base swarm.Address, radius int, suffixLength int) []swarm.Address {
bitCombinationsCount := int(math.Pow(2, float64(suffixLength)))
bitSuffixes := make([]uint8, bitCombinationsCount)

for i := 0; i < bitCombinationsCount; i++ {
bitSuffixes[i] = uint8(i)
}

binPrefixes := make([]swarm.Address, bitCombinationsCount)

// copy base address
for i := range binPrefixes {
binPrefixes[i] = base.Clone()
}

for j := range binPrefixes {
pseudoAddrBytes := binPrefixes[j].Bytes()

// set pseudo suffix
bitSuffixPos := suffixLength - 1
for l := radius + 0; l < radius+suffixLength+1; l++ {
index, pos := l/8, l%8

if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) {
pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
} else {
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}

bitSuffixPos--
}

// clear rest of the bits
for l := radius + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ {
index, pos := l/8, l%8
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
}

return binPrefixes
}

// Clears the bit at pos in n.
func clearBit(n, pos uint8) uint8 {
mask := ^(uint8(1) << pos)
return n & mask
}

// Sets the bit at pos in the integer n.
func setBit(n, pos uint8) uint8 {
return n | 1<<pos
}

func hasBit(n, pos uint8) bool {
return n&(1<<pos) > 0
}

// expiredBatchItem is a storage.Item implementation for expired batches.
type expiredBatchItem struct {
BatchID []byte
Expand Down
91 changes: 91 additions & 0 deletions pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,97 @@ func TestSubscribeBinTrigger(t *testing.T) {
})
}

func TestNeighborhoodStats(t *testing.T) {
t.Parallel()

const (
chunkCountPerPO = 32
maxPO = 6
networkRadius uint8 = 5
doublingFactor uint8 = 2
localRadius uint8 = networkRadius - doublingFactor
)

mustParse := func(s string) swarm.Address {
addr, err := swarm.ParseBitStrAddress(s)
if err != nil {
t.Fatal(err)
}
return addr
}

var (
baseAddr = mustParse("100000")
sister1 = mustParse("100010")
sister2 = mustParse("100100")
sister3 = mustParse("100110")
)

putChunks := func(addr swarm.Address, startingRadius int, st *storer.DB) {
putter := st.ReservePutter()
for i := 0; i < chunkCountPerPO; i++ {
ch := chunk.GenerateValidRandomChunkAt(addr, startingRadius)
err := putter.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
}
}

testF := func(t *testing.T, st *storer.DB) {
t.Helper()

putChunks(baseAddr, int(networkRadius), st)
putChunks(sister1, int(networkRadius), st)
putChunks(sister2, int(networkRadius), st)
putChunks(sister3, int(networkRadius), st)

time.Sleep(time.Second)

neighs, err := st.NeighborhoodsStat(context.Background())
if err != nil {
t.Fatal(err)
}

if len(neighs) != (1 << doublingFactor) {
t.Fatalf("number of neighborhoods does not matche. wanted %d, got %d", 1<<doublingFactor, len(neighs))
}

for _, n := range neighs {
if n.ChunkCount != chunkCountPerPO {
t.Fatalf("chunk count does not match. wanted %d, got %d", chunkCountPerPO, n.ChunkCount)
}
}

if !neighs[0].Address.Equal(baseAddr) || !neighs[1].Address.Equal(sister1) || !neighs[2].Address.Equal(sister2) || !neighs[3].Address.Equal(sister3) {
t.Fatal("chunk addresses do not match")
}
}

t.Run("disk", func(t *testing.T) {
t.Parallel()
opts := dbTestOps(baseAddr, 10000, nil, nil, time.Minute)
opts.ReserveCapacityDoubling = int(doublingFactor)
storer, err := diskStorer(t, opts)()
if err != nil {
t.Fatal(err)
}
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(localRadius))
testF(t, storer)
})
t.Run("mem", func(t *testing.T) {
t.Parallel()
opts := dbTestOps(baseAddr, 10000, nil, nil, time.Minute)
opts.ReserveCapacityDoubling = int(doublingFactor)
storer, err := diskStorer(t, opts)()
if err != nil {
t.Fatal(err)
}
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(localRadius))
testF(t, storer)
})
}

func reserveSizeTest(rs *reserve.Reserve, want int) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
Expand Down
Loading

0 comments on commit 877601a

Please sign in to comment.