From 4c40e9a307eb13e92d4e97554c1332301de44c0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 30 Sep 2024 11:26:56 +0200 Subject: [PATCH] fix: important reserve changes same ps index --- pkg/storer/internal/reserve/reserve.go | 132 +++++++++----------- pkg/storer/internal/reserve/reserve_test.go | 45 +++++-- 2 files changed, 92 insertions(+), 85 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 8480f2b1c9b..e2ca74a381f 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -140,99 +140,83 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - // same chunk address, same batch, same index - if sameAddressOldStamp != nil && bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) { + // same chunk address, same batch + if sameAddressOldStamp != nil { sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) if err != nil { return err } - prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev >= curr { - return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) - } - // index collision with another chunk - if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) { - prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) + // same index + if bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) { + prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) + curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } - r.logger.Debug( - "replacing chunk stamp index", - "old_chunk", oldStampIndex.ChunkAddress, - "new_chunk", chunk.Address(), - "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), - ) - // remove index items and chunk data - err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, oldStampIndex.BatchID, oldStampIndex.StampHash) + + oldBatchRadiusItem := &BatchRadiusItem{ + Bin: bin, + Address: chunk.Address(), + BatchID: sameAddressOldStampIndex.BatchID, + StampHash: sameAddressOldStampIndex.StampHash, + } + // load item to get the binID + err = s.IndexStore().Get(oldBatchRadiusItem) if err != nil { - return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) + return err } - shouldDecrReserveSize = true - } - oldBatchRadiusItem := &BatchRadiusItem{ - Bin: bin, - Address: chunk.Address(), - BatchID: sameAddressOldStampIndex.BatchID, - StampHash: sameAddressOldStampIndex.StampHash, - } - // load item to get the binID - err = s.IndexStore().Get(oldBatchRadiusItem) - if err != nil { - return err - } + // delete old chunk index items + err = errors.Join( + s.IndexStore().Delete(oldBatchRadiusItem), + s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), + stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp), + ) + if err != nil { + return err + } - // delete old chunk index items - err = errors.Join( - s.IndexStore().Delete(oldBatchRadiusItem), - s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), - stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp), - chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp), - ) - if err != nil { - return err - } + binID, err := r.IncBinID(s.IndexStore(), bin) + if err != nil { + return err + } - binID, err := r.IncBinID(s.IndexStore(), bin) - if err != nil { - return err - } + err = errors.Join( + stampindex.Store(s.IndexStore(), reserveScope, chunk), + chunkstamp.Store(s.IndexStore(), reserveScope, chunk), + s.IndexStore().Put(&BatchRadiusItem{ + Bin: bin, + BinID: binID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + StampHash: stampHash, + }), + s.IndexStore().Put(&ChunkBinItem{ + Bin: bin, + BinID: binID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + ChunkType: chunkType, + StampHash: stampHash, + }), + ) + if err != nil { + return err + } - err = errors.Join( - stampindex.Store(s.IndexStore(), reserveScope, chunk), - chunkstamp.Store(s.IndexStore(), reserveScope, chunk), - s.IndexStore().Put(&BatchRadiusItem{ - Bin: bin, - BinID: binID, - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - StampHash: stampHash, - }), - s.IndexStore().Put(&ChunkBinItem{ - Bin: bin, - BinID: binID, - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - ChunkType: chunkType, - StampHash: stampHash, - }), - ) - if err != nil { - return err - } + if chunkType != swarm.ChunkTypeSingleOwner { + return nil + } - if chunkType != swarm.ChunkTypeSingleOwner { - return nil + r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) + return s.ChunkStore().Replace(ctx, chunk) } - - r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) - return s.ChunkStore().Replace(ctx, chunk) } // different address, same batch, index collision - if loadedStampIndex { + if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) { prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index ad7dd13d019..d06bb86ca46 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -197,12 +197,14 @@ func TestSameChunkAddress(t *testing.T) { bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) binBinIDs[bin] += 1 err = r.Put(ctx, ch2) - if !errors.Is(err, storage.ErrOverwriteNewerChunk) { - t.Fatal("expected error") + if err != nil { + t.Fatal(err) } + bin2 := swarm.Proximity(baseAddr.Bytes(), ch2.Address().Bytes()) + binBinIDs[bin2] += 1 size2 := r.Size() - if size2-size1 != 1 { - t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + if size2-size1 != 2 { + t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1) } }) @@ -269,11 +271,20 @@ func TestSameChunkAddress(t *testing.T) { s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 6)) bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) + err := r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } binBinIDs[bin] += 2 - replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin]) + checkChunkInIndexStore(t, ts.IndexStore(), bin, binBinIDs[bin]-1, ch1) + checkChunkInIndexStore(t, ts.IndexStore(), bin, binBinIDs[bin], ch2) size2 := r.Size() - if size2-size1 != 1 { - t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + if size2-size1 != 2 { + t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1) } }) @@ -435,16 +446,17 @@ func TestSameChunkAddress(t *testing.T) { ch3BinID := binBinIDs[bin2] checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, true) + // different index, same batch + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch3.Stamp().BatchID(), Address: ch3.Address(), StampHash: ch3StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: ch1BinID, StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch3BinID, StampHash: ch3StampHash}, false) size2 := r.Size() - // (ch1 + ch2) == 2 and then ch3 reduces reserve size by 1 - if size2-size1 != 1 { + // (ch1 + ch2) == 2 + if size2-size1 != 2 { t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) } }) @@ -908,3 +920,14 @@ func getSigner(t *testing.T) crypto.Signer { } return crypto.NewDefaultSigner(privKey) } + +func checkChunkInIndexStore(t *testing.T, s storage.Reader, bin uint8, binId uint64, ch swarm.Chunk) { + t.Helper() + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + checkStore(t, s, &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false) + checkStore(t, s, &reserve.ChunkBinItem{Bin: bin, BinID: binId, StampHash: stampHash}, false) +}