From 629e0f63693e3344b058902cbf7c330ee7c07947 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 3 Oct 2024 20:09:38 +0300 Subject: [PATCH 1/4] feat(sample): swip21 changes --- pkg/node/node.go | 15 ++- pkg/salud/salud.go | 2 +- pkg/salud/salud_test.go | 4 +- pkg/storer/cachestore.go | 4 +- pkg/storer/internal/reserve/reserve.go | 1 - pkg/storer/reserve.go | 10 +- pkg/storer/sample.go | 14 ++- pkg/storer/sample_test.go | 135 ++++++++++++++++++++++++- pkg/storer/storer.go | 35 ++++--- 9 files changed, 178 insertions(+), 42 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 8f767991150..b42e53737cd 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -248,6 +248,12 @@ func NewBee( } }(b) + if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 { + return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1") + } + + reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity + stateStore, stateStoreMetrics, err := InitStateStore(logger, o.DataDir, o.StatestoreCacheCapacity) if err != nil { return nil, err @@ -353,14 +359,6 @@ func NewBee( var batchStore postage.Storer = new(postage.NoOpBatchStore) var evictFn func([]byte) error - var reserveCapacity int - - if o.ReserveCapacityDoubling >= 0 && o.ReserveCapacityDoubling <= 1 { - reserveCapacity = 1 << (22 + o.ReserveCapacityDoubling) - } else { - return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1") - } - if chainEnabled { batchStore, err = batchstore.New( stateStore, @@ -735,6 +733,7 @@ func NewBee( lo.ReserveWakeUpDuration = reserveWakeUpDuration lo.ReserveMinEvictCount = reserveMinEvictCount lo.RadiusSetter = kad + lo.ReserveCapacityDoubling = o.ReserveCapacityDoubling } localStore, err := storer.New(ctx, path, lo) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d0ada7f74d3..57568e2cd79 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -200,7 +200,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, continue } - if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) { + if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) { s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr) } else if peer.dur.Seconds() > pDur { s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr) diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index b408f06b95c..ecfe7bdfe45 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -37,11 +37,11 @@ func TestSalud(t *testing.T) { {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - // healthy since radius >= most common radius - 1 + // healthy since radius >= most common radius - 2 {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, // radius too low - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 6, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false}, // dur too long {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false}, diff --git a/pkg/storer/cachestore.go b/pkg/storer/cachestore.go index 5051e975da8..b4a759e57ac 100644 --- a/pkg/storer/cachestore.go +++ b/pkg/storer/cachestore.go @@ -40,8 +40,8 @@ func (db *DB) cacheWorker(ctx context.Context) { } evict := size - capc - if evict < db.opts.cacheMinEvictCount { // evict at least a min count - evict = db.opts.cacheMinEvictCount + if evict < db.reserveOptions.cacheMinEvictCount { // evict at least a min count + evict = db.reserveOptions.cacheMinEvictCount } dur := captureDuration(time.Now()) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 4053ca7cf6f..277e6b7c125 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -502,7 +502,6 @@ func (r *Reserve) IterateChunksItems(startBin uint8, cb func(*ChunkBinItem) (boo PrefixAtStart: true, }, func(res storage.Result) (bool, error) { item := res.Entry.(*ChunkBinItem) - stop, err := cb(item) if stop || err != nil { return true, err diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index fe389048344..9983c5c6f80 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -55,7 +55,7 @@ func (db *DB) startReserveWorkers( go db.reserveWorker(ctx) select { - case <-time.After(db.opts.reserveWarmupDuration): + case <-time.After(db.reserveOptions.warmupDuration): case <-db.quit: return } @@ -121,7 +121,7 @@ func (db *DB) reserveWorker(ctx context.Context) { overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity) defer overCapUnsub() - thresholdTicker := time.NewTicker(db.opts.reserveWakeupDuration) + thresholdTicker := time.NewTicker(db.reserveOptions.wakeupDuration) defer thresholdTicker.Stop() _, _ = db.countWithinRadius(ctx) @@ -159,7 +159,7 @@ func (db *DB) reserveWorker(ctx context.Context) { continue } - if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.opts.minimumRadius { + if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.reserveOptions.minimumRadius { radius-- if err := db.reserve.SetRadius(radius); err != nil { db.logger.Error(err, "reserve set radius") @@ -362,8 +362,8 @@ func (db *DB) unreserve(ctx context.Context) (err error) { } evict := target - totalEvicted - if evict < int(db.opts.reserveMinEvictCount) { // evict at least a min count - evict = int(db.opts.reserveMinEvictCount) + if evict < int(db.reserveOptions.minEvictCount) { // evict at least a min count + evict = int(db.reserveOptions.minEvictCount) } binEvicted, err := db.evictBatch(ctx, b, evict, radius) diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index 197a6cb773e..1591bd4e817 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -139,6 +139,13 @@ func (db *DB) ReserveSample( allStats.BatchesBelowValueDuration = time.Since(t) + // If the node has doubled their capacity by some factor, sampling process need to only pertain to the + // chunks of the selected neighborhood as determined by the anchor and the "network" radius and NOT the whole reseve. + // The sampling must select chunk with proximity greater than or equal to the regular network radius. + // The regular network storage radius of the network is the sum of the local radius and the doubling factor. + // For example, the regular radius is 11, but the local node has a doubling factor of 3, so the local radius will eventually drop to 8. + neighborhoodProximity := storageRadius + uint8(db.reserveOptions.capacityDoubling) + // Phase 1: Iterate chunk addresses g.Go(func() error { start := time.Now() @@ -149,9 +156,12 @@ func (db *DB) ReserveSample( addStats(stats) }() - err := db.reserve.IterateChunksItems(storageRadius, func(chi *reserve.ChunkBinItem) (bool, error) { + err := db.reserve.IterateChunksItems(storageRadius, func(ch *reserve.ChunkBinItem) (bool, error) { + if swarm.Proximity(ch.Address.Bytes(), anchor) < neighborhoodProximity { + return false, nil + } select { - case chunkC <- chi: + case chunkC <- ch: stats.TotalIterated++ return false, nil case <-ctx.Done(): diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go index 522f6807236..3b788d50d21 100644 --- a/pkg/storer/sample_test.go +++ b/pkg/storer/sample_test.go @@ -58,13 +58,18 @@ func TestReserveSampler(t *testing.T) { var sample1 storer.Sample + var ( + radius uint8 = 5 + anchor = swarm.RandAddressAt(t, baseAddr, int(radius)).Bytes() + ) + t.Run("reserve sample 1", func(t *testing.T) { - sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil) + sample, err := st.ReserveSample(context.TODO(), anchor, radius, timeVar, nil) if err != nil { t.Fatal(err) } - assertValidSample(t, sample) + assertValidSample(t, sample, radius, anchor) assertSampleNoErrors(t, sample) if sample.Stats.NewIgnored != 0 { @@ -92,7 +97,7 @@ func TestReserveSampler(t *testing.T) { // Now we generate another sample with the older timestamp. This should give us // the exact same sample, ensuring that none of the later chunks were considered. t.Run("reserve sample 2", func(t *testing.T) { - sample, err := st.ReserveSample(context.TODO(), []byte("anchor"), 5, timeVar, nil) + sample, err := st.ReserveSample(context.TODO(), anchor, 5, timeVar, nil) if err != nil { t.Fatal(err) } @@ -136,14 +141,131 @@ func TestReserveSampler(t *testing.T) { }) } +func TestReserveSamplerSisterNeighborhood(t *testing.T) { + const ( + chunkCountPerPO = 32 + maxPO = 6 + networkRadius uint8 = 5 + doublingFactor uint8 = 2 + localRadius uint8 = networkRadius - doublingFactor + ) + + randChunks := func(baseAddr swarm.Address, startingRadius int, timeVar uint64) []swarm.Chunk { + var chs []swarm.Chunk + for po := startingRadius; po < maxPO; po++ { + for i := 0; i < chunkCountPerPO; i++ { + ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(3, 2, false) + if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC + ch = chunk.GenerateTestRandomSoChunk(t, ch) + } + + // override stamp timestamp to be before the consensus timestamp + ch = ch.WithStamp(postagetesting.MustNewStampWithTimestamp(timeVar)) + chs = append(chs, ch) + } + } + return chs + } + + testF := func(t *testing.T, baseAddr swarm.Address, st *storer.DB) { + t.Helper() + + timeVar := uint64(time.Now().UnixNano()) + chs := randChunks(baseAddr, int(localRadius), timeVar) + putter := st.ReservePutter() + for _, ch := range chs { + err := putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + sisterAnchor := swarm.RandAddressAt(t, baseAddr, int(localRadius)) + + // chunks belonging to the sister neighborhood + chs = randChunks(sisterAnchor, int(localRadius), timeVar) + putter = st.ReservePutter() + for _, ch := range chs { + err := putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + t.Run("reserve size", reserveSizeTest(st.Reserve(), chunkCountPerPO*maxPO)) + + t.Run("reserve sample", func(t *testing.T) { + sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), 0, timeVar, nil) + if err != nil { + t.Fatal(err) + } + + assertValidSample(t, sample, doublingFactor, baseAddr.Bytes()) + assertSampleNoErrors(t, sample) + + if sample.Stats.NewIgnored != 0 { + t.Fatalf("sample should not have ignored chunks") + } + }) + + t.Run("reserve sample 2", func(t *testing.T) { + sample, err := st.ReserveSample(context.TODO(), sisterAnchor.Bytes(), localRadius, timeVar, nil) + if err != nil { + t.Fatal(err) + } + + assertValidSample(t, sample, localRadius, baseAddr.Bytes()) + assertSampleNoErrors(t, sample) + + for _, s := range sample.Items { + if got := swarm.Proximity(s.ChunkAddress.Bytes(), baseAddr.Bytes()); got != localRadius { + t.Fatalf("promixity must be exactly %d, got %d", localRadius, got) + } + } + + if sample.Stats.NewIgnored != 0 { + t.Fatalf("sample should not have ignored chunks") + } + }) + + } + + t.Run("disk", func(t *testing.T) { + t.Parallel() + baseAddr := swarm.RandAddress(t) + opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second) + opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil } + opts.ReserveCapacityDoubling = 2 + + storer, err := diskStorer(t, opts)() + if err != nil { + t.Fatal(err) + } + testF(t, baseAddr, storer) + }) + t.Run("mem", func(t *testing.T) { + t.Parallel() + baseAddr := swarm.RandAddress(t) + opts := dbTestOps(baseAddr, 1000, nil, nil, time.Second) + opts.ValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil } + opts.ReserveCapacityDoubling = 2 + + storer, err := memStorer(t, opts)() + if err != nil { + t.Fatal(err) + } + testF(t, baseAddr, storer) + }) +} + func TestRandSample(t *testing.T) { t.Parallel() sample := storer.RandSample(t, nil) - assertValidSample(t, sample) + assertValidSample(t, sample, 0, nil) } -func assertValidSample(t *testing.T, sample storer.Sample) { +func assertValidSample(t *testing.T, sample storer.Sample, minRadius uint8, anchor []byte) { t.Helper() // Assert that sample size is exactly storer.SampleSize @@ -165,6 +287,9 @@ func assertValidSample(t *testing.T, sample storer.Sample) { if item.Stamp == nil { t.Fatalf("sample item [%d]: stamp should be set", i) } + if got := swarm.Proximity(item.ChunkAddress.Bytes(), anchor); got < minRadius { + t.Fatalf("sample item [%d]: chunk should have proximity %d with the anchor, got %d", i, minRadius, got) + } } for i, item := range sample.Items { assertSampleItem(item, i) diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 09803c573a6..d3bf812005d 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -379,9 +379,10 @@ type Options struct { RadiusSetter topology.SetStorageRadiuser StateStore storage.StateStorer - ReserveCapacity int - ReserveWakeUpDuration time.Duration - ReserveMinEvictCount uint64 + ReserveCapacity int + ReserveWakeUpDuration time.Duration + ReserveMinEvictCount uint64 + ReserveCapacityDoubling int CacheCapacity uint64 CacheMinEvictCount uint64 @@ -437,17 +438,18 @@ type DB struct { validStamp postage.ValidStampFn setSyncerOnce sync.Once syncer Syncer - opts workerOpts + reserveOptions reserveOpts pinIntegrity *PinIntegrity } -type workerOpts struct { - reserveWarmupDuration time.Duration - reserveWakeupDuration time.Duration - reserveMinEvictCount uint64 - cacheMinEvictCount uint64 - minimumRadius uint8 +type reserveOpts struct { + warmupDuration time.Duration + wakeupDuration time.Duration + minEvictCount uint64 + cacheMinEvictCount uint64 + minimumRadius uint8 + capacityDoubling int } // New returns a newly constructed DB object which implements all the above @@ -534,12 +536,13 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { validStamp: opts.ValidStamp, events: events.NewSubscriber(), reserveBinEvents: events.NewSubscriber(), - opts: workerOpts{ - reserveWarmupDuration: opts.WarmupDuration, - reserveWakeupDuration: opts.ReserveWakeUpDuration, - reserveMinEvictCount: opts.ReserveMinEvictCount, - cacheMinEvictCount: opts.CacheMinEvictCount, - minimumRadius: uint8(opts.MinimumStorageRadius), + reserveOptions: reserveOpts{ + warmupDuration: opts.WarmupDuration, + wakeupDuration: opts.ReserveWakeUpDuration, + minEvictCount: opts.ReserveMinEvictCount, + cacheMinEvictCount: opts.CacheMinEvictCount, + minimumRadius: uint8(opts.MinimumStorageRadius), + capacityDoubling: opts.ReserveCapacityDoubling, }, directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes), pinIntegrity: pinIntegrity, From 802dbab2119b85fb7d343102b4e23f1002122654 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 5 Oct 2024 17:21:13 +0300 Subject: [PATCH 2/4] fix: comment --- pkg/storer/sample.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index 1591bd4e817..b58328ea5f4 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -141,9 +141,9 @@ func (db *DB) ReserveSample( // If the node has doubled their capacity by some factor, sampling process need to only pertain to the // chunks of the selected neighborhood as determined by the anchor and the "network" radius and NOT the whole reseve. - // The sampling must select chunk with proximity greater than or equal to the regular network radius. // The regular network storage radius of the network is the sum of the local radius and the doubling factor. // For example, the regular radius is 11, but the local node has a doubling factor of 3, so the local radius will eventually drop to 8. + // So the sampling must only consider chunks with proximity 11 to the anchor. neighborhoodProximity := storageRadius + uint8(db.reserveOptions.capacityDoubling) // Phase 1: Iterate chunk addresses From 877601aa7791904b7a7b61d9672d60ffac75f95f Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 7 Oct 2024 21:51:06 +0300 Subject: [PATCH 3/4] fix: salud and neighborhood stat --- pkg/salud/salud.go | 8 +-- pkg/salud/salud_test.go | 43 +++++++++++++-- pkg/storer/mock/mockreserve.go | 15 +++++- pkg/storer/reserve.go | 95 +++++++++++++++++++++++++++++++++- pkg/storer/reserve_test.go | 91 ++++++++++++++++++++++++++++++++ pkg/storer/sample_test.go | 14 +++-- 6 files changed, 251 insertions(+), 15 deletions(-) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index 57568e2cd79..d63e813975c 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -42,7 +42,7 @@ type peerStatus interface { type reserve interface { storer.RadiusChecker - ReserveSize() int + ReserveCapacityDoubling() int } type service struct { @@ -221,10 +221,12 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, } } + networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling()) + selfHealth := true - if nHoodRadius == networkRadius && s.reserve.StorageRadius() != networkRadius { + if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius { selfHealth = false - s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius) + s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius) } s.isSelfHealthy.Store(selfHealth) diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index ecfe7bdfe45..4aa7926af90 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -17,6 +17,7 @@ import ( mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" topMock "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/ethersphere/bee/v2/pkg/util/testutil" ) type peer struct { @@ -116,6 +117,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { ) service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -127,10 +129,44 @@ func TestSelfUnhealthyRadius(t *testing.T) { if service.IsHealthy() { t.Fatalf("self should NOT be healthy") } +} - if err := service.Close(); err != nil { +func TestSelfHealthyCapacityDoubling(t *testing.T) { + t.Parallel() + peers := []peer{ + // fully healhy + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + } + + statusM := &statusMock{make(map[string]peer)} + addrs := make([]swarm.Address, 0, len(peers)) + for _, p := range peers { + addrs = append(addrs, p.addr) + statusM.peers[p.addr.ByteString()] = p + } + + topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) + + reserve := mockstorer.NewReserve( + mockstorer.WithRadius(6), + mockstorer.WithReserveSize(100), + mockstorer.WithCapacityDoubling(2), + ) + + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) + + err := spinlock.Wait(time.Minute, func() bool { + return len(topM.PeersHealth()) == len(peers) + }) + if err != nil { t.Fatal(err) } + + if !service.IsHealthy() { + t.Fatalf("self should be healthy") + } } func TestSubToRadius(t *testing.T) { @@ -182,6 +218,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) + testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() unsub() @@ -191,10 +228,6 @@ func TestUnsub(t *testing.T) { t.Fatal("should not have received an address") case <-time.After(time.Second): } - - if err := service.Close(); err != nil { - t.Fatal(err) - } } type statusMock struct { diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index fbd27330f9f..05e164c6784 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -78,6 +78,12 @@ func WithReserveSize(s int) Option { }) } +func WithCapacityDoubling(s int) Option { + return optionFunc(func(p *ReserveStore) { + p.capacityDoubling = s + }) +} + func WithPutHook(f func(swarm.Chunk) error) Option { return optionFunc(func(p *ReserveStore) { p.putHook = f @@ -106,8 +112,9 @@ type ReserveStore struct { cursorsErr error epoch uint64 - radius uint8 - reservesize int + radius uint8 + reservesize int + capacityDoubling int subResponses []chunksResponse putHook func(swarm.Chunk) error @@ -171,6 +178,10 @@ func (s *ReserveStore) ReserveSize() int { return s.reservesize } +func (s *ReserveStore) ReserveCapacityDoubling() int { + return s.capacityDoubling +} + func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) { return s.cursors, s.epoch, s.cursorsErr } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 9983c5c6f80..c59f2f10373 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math" + "math/bits" "slices" "sync" "sync/atomic" @@ -84,7 +85,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) { radius := db.StorageRadius() evictBatches := make(map[string]bool) - err := db.reserve.IterateChunksItems(0, func(ci *reserve.ChunkBinItem) (bool, error) { if ci.Bin >= radius { count++ @@ -412,6 +412,10 @@ func (db *DB) StorageRadius() uint8 { return db.reserve.Radius() } +func (db *DB) ReserveCapacityDoubling() int { + return db.reserveOptions.capacityDoubling +} + func (db *DB) ReserveSize() int { if db.reserve == nil { return 0 @@ -493,6 +497,95 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan }, errC } +type NeighborhoodStat struct { + Address swarm.Address + ChunkCount int +} + +func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) { + + radius := db.StorageRadius() + + networkRadius := radius + uint8(db.reserveOptions.capacityDoubling) + + var neighs []*NeighborhoodStat + for _, n := range neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) { + neighs = append(neighs, &NeighborhoodStat{Address: n}) + } + + err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) { + for _, n := range neighs { + if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius { + n.ChunkCount++ + break + } + } + return false, nil + }) + if err != nil { + return nil, err + } + + return neighs, err +} + +func neighborhoodPrefixes(base swarm.Address, radius int, suffixLength int) []swarm.Address { + bitCombinationsCount := int(math.Pow(2, float64(suffixLength))) + bitSuffixes := make([]uint8, bitCombinationsCount) + + for i := 0; i < bitCombinationsCount; i++ { + bitSuffixes[i] = uint8(i) + } + + binPrefixes := make([]swarm.Address, bitCombinationsCount) + + // copy base address + for i := range binPrefixes { + binPrefixes[i] = base.Clone() + } + + for j := range binPrefixes { + pseudoAddrBytes := binPrefixes[j].Bytes() + + // set pseudo suffix + bitSuffixPos := suffixLength - 1 + for l := radius + 0; l < radius+suffixLength+1; l++ { + index, pos := l/8, l%8 + + if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { + pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } else { + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } + + bitSuffixPos-- + } + + // clear rest of the bits + for l := radius + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { + index, pos := l/8, l%8 + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } + } + + return binPrefixes +} + +// Clears the bit at pos in n. +func clearBit(n, pos uint8) uint8 { + mask := ^(uint8(1) << pos) + return n & mask +} + +// Sets the bit at pos in the integer n. +func setBit(n, pos uint8) uint8 { + return n | 1< 0 +} + // expiredBatchItem is a storage.Item implementation for expired batches. type expiredBatchItem struct { BatchID []byte diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index ce5d253be96..8fd04a0581e 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -666,6 +666,97 @@ func TestSubscribeBinTrigger(t *testing.T) { }) } +func TestNeighborhoodStats(t *testing.T) { + t.Parallel() + + const ( + chunkCountPerPO = 32 + maxPO = 6 + networkRadius uint8 = 5 + doublingFactor uint8 = 2 + localRadius uint8 = networkRadius - doublingFactor + ) + + mustParse := func(s string) swarm.Address { + addr, err := swarm.ParseBitStrAddress(s) + if err != nil { + t.Fatal(err) + } + return addr + } + + var ( + baseAddr = mustParse("100000") + sister1 = mustParse("100010") + sister2 = mustParse("100100") + sister3 = mustParse("100110") + ) + + putChunks := func(addr swarm.Address, startingRadius int, st *storer.DB) { + putter := st.ReservePutter() + for i := 0; i < chunkCountPerPO; i++ { + ch := chunk.GenerateValidRandomChunkAt(addr, startingRadius) + err := putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + } + + testF := func(t *testing.T, st *storer.DB) { + t.Helper() + + putChunks(baseAddr, int(networkRadius), st) + putChunks(sister1, int(networkRadius), st) + putChunks(sister2, int(networkRadius), st) + putChunks(sister3, int(networkRadius), st) + + time.Sleep(time.Second) + + neighs, err := st.NeighborhoodsStat(context.Background()) + if err != nil { + t.Fatal(err) + } + + if len(neighs) != (1 << doublingFactor) { + t.Fatalf("number of neighborhoods does not matche. wanted %d, got %d", 1< Date: Tue, 8 Oct 2024 00:12:09 +0300 Subject: [PATCH 4/4] fix: lint --- pkg/storer/reserve.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index c59f2f10373..a3e75ee239b 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -508,9 +508,10 @@ func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error networkRadius := radius + uint8(db.reserveOptions.capacityDoubling) - var neighs []*NeighborhoodStat - for _, n := range neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) { - neighs = append(neighs, &NeighborhoodStat{Address: n}) + prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) + neighs := make([]*NeighborhoodStat, len(prefixes)) + for i, n := range prefixes { + neighs[i] = &NeighborhoodStat{Address: n} } err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) {