Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sample): swip21 changes #4848

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ func NewBee(
}
}(b)

if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}

reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity
martinconic marked this conversation as resolved.
Show resolved Hide resolved

stateStore, stateStoreMetrics, err := InitStateStore(logger, o.DataDir, o.StatestoreCacheCapacity)
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,14 +359,6 @@ func NewBee(
var batchStore postage.Storer = new(postage.NoOpBatchStore)
var evictFn func([]byte) error

var reserveCapacity int

if o.ReserveCapacityDoubling >= 0 && o.ReserveCapacityDoubling <= 1 {
reserveCapacity = 1 << (22 + o.ReserveCapacityDoubling)
} else {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}

if chainEnabled {
batchStore, err = batchstore.New(
stateStore,
Expand Down Expand Up @@ -735,6 +733,7 @@ func NewBee(
lo.ReserveWakeUpDuration = reserveWakeUpDuration
lo.ReserveMinEvictCount = reserveMinEvictCount
lo.RadiusSetter = kad
lo.ReserveCapacityDoubling = o.ReserveCapacityDoubling
}

localStore, err := storer.New(ctx, path, lo)
Expand Down
10 changes: 6 additions & 4 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type peerStatus interface {

type reserve interface {
storer.RadiusChecker
ReserveSize() int
ReserveCapacityDoubling() int
}

type service struct {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) {
if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
Expand All @@ -221,10 +221,12 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}
}

networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling())

selfHealth := true
if nHoodRadius == networkRadius && s.reserve.StorageRadius() != networkRadius {
if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius {
selfHealth = false
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius)
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius)
}

s.isSelfHealthy.Store(selfHealth)
Expand Down
47 changes: 40 additions & 7 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
topMock "github.com/ethersphere/bee/v2/pkg/topology/mock"
"github.com/ethersphere/bee/v2/pkg/util/testutil"
)

type peer struct {
Expand All @@ -37,11 +38,11 @@ func TestSalud(t *testing.T) {
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},

// healthy since radius >= most common radius - 1
// healthy since radius >= most common radius - 2
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},

// radius too low
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 6, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},

// dur too long
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
Expand Down Expand Up @@ -116,6 +117,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand All @@ -127,10 +129,44 @@ func TestSelfUnhealthyRadius(t *testing.T) {
if service.IsHealthy() {
t.Fatalf("self should NOT be healthy")
}
}

if err := service.Close(); err != nil {
func TestSelfHealthyCapacityDoubling(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
}

statusM := &statusMock{make(map[string]peer)}
addrs := make([]swarm.Address, 0, len(peers))
for _, p := range peers {
addrs = append(addrs, p.addr)
statusM.peers[p.addr.ByteString()] = p
}

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

reserve := mockstorer.NewReserve(
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
})
if err != nil {
t.Fatal(err)
}

if !service.IsHealthy() {
t.Fatalf("self should be healthy")
}
}

func TestSubToRadius(t *testing.T) {
Expand Down Expand Up @@ -182,6 +218,7 @@ func TestUnsub(t *testing.T) {
topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
unsub()
Expand All @@ -191,10 +228,6 @@ func TestUnsub(t *testing.T) {
t.Fatal("should not have received an address")
case <-time.After(time.Second):
}

if err := service.Close(); err != nil {
t.Fatal(err)
}
}

type statusMock struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (db *DB) cacheWorker(ctx context.Context) {
}

evict := size - capc
if evict < db.opts.cacheMinEvictCount { // evict at least a min count
evict = db.opts.cacheMinEvictCount
if evict < db.reserveOptions.cacheMinEvictCount { // evict at least a min count
evict = db.reserveOptions.cacheMinEvictCount
}

dur := captureDuration(time.Now())
Expand Down
1 change: 0 additions & 1 deletion pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ func (r *Reserve) IterateChunksItems(startBin uint8, cb func(*ChunkBinItem) (boo
PrefixAtStart: true,
}, func(res storage.Result) (bool, error) {
item := res.Entry.(*ChunkBinItem)

stop, err := cb(item)
if stop || err != nil {
return true, err
Expand Down
15 changes: 13 additions & 2 deletions pkg/storer/mock/mockreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func WithReserveSize(s int) Option {
})
}

func WithCapacityDoubling(s int) Option {
return optionFunc(func(p *ReserveStore) {
p.capacityDoubling = s
})
}

func WithPutHook(f func(swarm.Chunk) error) Option {
return optionFunc(func(p *ReserveStore) {
p.putHook = f
Expand Down Expand Up @@ -106,8 +112,9 @@ type ReserveStore struct {
cursorsErr error
epoch uint64

radius uint8
reservesize int
radius uint8
reservesize int
capacityDoubling int

subResponses []chunksResponse
putHook func(swarm.Chunk) error
Expand Down Expand Up @@ -171,6 +178,10 @@ func (s *ReserveStore) ReserveSize() int {
return s.reservesize
}

func (s *ReserveStore) ReserveCapacityDoubling() int {
return s.capacityDoubling
}

func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) {
return s.cursors, s.epoch, s.cursorsErr
}
Expand Down
106 changes: 100 additions & 6 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"math/bits"
"slices"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (db *DB) startReserveWorkers(
go db.reserveWorker(ctx)

select {
case <-time.After(db.opts.reserveWarmupDuration):
case <-time.After(db.reserveOptions.warmupDuration):
case <-db.quit:
return
}
Expand Down Expand Up @@ -84,7 +85,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) {
radius := db.StorageRadius()

evictBatches := make(map[string]bool)

err := db.reserve.IterateChunksItems(0, func(ci *reserve.ChunkBinItem) (bool, error) {
if ci.Bin >= radius {
count++
Expand Down Expand Up @@ -121,7 +121,7 @@ func (db *DB) reserveWorker(ctx context.Context) {
overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity)
defer overCapUnsub()

thresholdTicker := time.NewTicker(db.opts.reserveWakeupDuration)
thresholdTicker := time.NewTicker(db.reserveOptions.wakeupDuration)
defer thresholdTicker.Stop()

_, _ = db.countWithinRadius(ctx)
Expand Down Expand Up @@ -159,7 +159,7 @@ func (db *DB) reserveWorker(ctx context.Context) {
continue
}

if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.opts.minimumRadius {
if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.reserveOptions.minimumRadius {
radius--
if err := db.reserve.SetRadius(radius); err != nil {
db.logger.Error(err, "reserve set radius")
Expand Down Expand Up @@ -362,8 +362,8 @@ func (db *DB) unreserve(ctx context.Context) (err error) {
}

evict := target - totalEvicted
if evict < int(db.opts.reserveMinEvictCount) { // evict at least a min count
evict = int(db.opts.reserveMinEvictCount)
if evict < int(db.reserveOptions.minEvictCount) { // evict at least a min count
evict = int(db.reserveOptions.minEvictCount)
}

binEvicted, err := db.evictBatch(ctx, b, evict, radius)
Expand Down Expand Up @@ -412,6 +412,10 @@ func (db *DB) StorageRadius() uint8 {
return db.reserve.Radius()
}

func (db *DB) ReserveCapacityDoubling() int {
return db.reserveOptions.capacityDoubling
}

func (db *DB) ReserveSize() int {
if db.reserve == nil {
return 0
Expand Down Expand Up @@ -493,6 +497,96 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan
}, errC
}

type NeighborhoodStat struct {
Address swarm.Address
ChunkCount int
}

func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) {

radius := db.StorageRadius()

networkRadius := radius + uint8(db.reserveOptions.capacityDoubling)

prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling)
neighs := make([]*NeighborhoodStat, len(prefixes))
for i, n := range prefixes {
neighs[i] = &NeighborhoodStat{Address: n}
}

err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) {
for _, n := range neighs {
if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius {
n.ChunkCount++
break
}
}
return false, nil
})
if err != nil {
return nil, err
}

return neighs, err
}

func neighborhoodPrefixes(base swarm.Address, radius int, suffixLength int) []swarm.Address {
bitCombinationsCount := int(math.Pow(2, float64(suffixLength)))
bitSuffixes := make([]uint8, bitCombinationsCount)

for i := 0; i < bitCombinationsCount; i++ {
bitSuffixes[i] = uint8(i)
}

binPrefixes := make([]swarm.Address, bitCombinationsCount)

// copy base address
for i := range binPrefixes {
binPrefixes[i] = base.Clone()
}

for j := range binPrefixes {
pseudoAddrBytes := binPrefixes[j].Bytes()

// set pseudo suffix
bitSuffixPos := suffixLength - 1
for l := radius + 0; l < radius+suffixLength+1; l++ {
index, pos := l/8, l%8

if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) {
pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
} else {
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}

bitSuffixPos--
}

// clear rest of the bits
for l := radius + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ {
index, pos := l/8, l%8
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
}

return binPrefixes
}

// Clears the bit at pos in n.
func clearBit(n, pos uint8) uint8 {
mask := ^(uint8(1) << pos)
return n & mask
}

// Sets the bit at pos in the integer n.
func setBit(n, pos uint8) uint8 {
return n | 1<<pos
}

func hasBit(n, pos uint8) bool {
return n&(1<<pos) > 0
}

// expiredBatchItem is a storage.Item implementation for expired batches.
type expiredBatchItem struct {
BatchID []byte
Expand Down
Loading
Loading