Skip to content

Commit

Permalink
fix: important reserve changes same ps index
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon committed Sep 30, 2024
1 parent b69dcc7 commit 4c40e9a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 85 deletions.
132 changes: 58 additions & 74 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,99 +140,83 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

// same chunk address, same batch, same index
if sameAddressOldStamp != nil && bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) {
// same chunk address, same batch
if sameAddressOldStamp != nil {
sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp)
if err != nil {
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

// index collision with another chunk
if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) {
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
// same index
if bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) {
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
r.logger.Debug(
"replacing chunk stamp index",
"old_chunk", oldStampIndex.ChunkAddress,
"new_chunk", chunk.Address(),
"batch_id", hex.EncodeToString(chunk.Stamp().BatchID()),
)
// remove index items and chunk data
err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, oldStampIndex.BatchID, oldStampIndex.StampHash)

oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
}
// load item to get the binID
err = s.IndexStore().Get(oldBatchRadiusItem)
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err)
return err
}
shouldDecrReserveSize = true
}

oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
}
// load item to get the binID
err = s.IndexStore().Get(oldBatchRadiusItem)
if err != nil {
return err
}
// delete old chunk index items
err = errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp),
)
if err != nil {
return err
}

// delete old chunk index items
err = errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp),
)
if err != nil {
return err
}
binID, err := r.IncBinID(s.IndexStore(), bin)
if err != nil {
return err
}

binID, err := r.IncBinID(s.IndexStore(), bin)
if err != nil {
return err
}
err = errors.Join(
stampindex.Store(s.IndexStore(), reserveScope, chunk),
chunkstamp.Store(s.IndexStore(), reserveScope, chunk),
s.IndexStore().Put(&BatchRadiusItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
StampHash: stampHash,
}),
s.IndexStore().Put(&ChunkBinItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
ChunkType: chunkType,
StampHash: stampHash,
}),
)
if err != nil {
return err
}

err = errors.Join(
stampindex.Store(s.IndexStore(), reserveScope, chunk),
chunkstamp.Store(s.IndexStore(), reserveScope, chunk),
s.IndexStore().Put(&BatchRadiusItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
StampHash: stampHash,
}),
s.IndexStore().Put(&ChunkBinItem{
Bin: bin,
BinID: binID,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
ChunkType: chunkType,
StampHash: stampHash,
}),
)
if err != nil {
return err
}
if chunkType != swarm.ChunkTypeSingleOwner {
return nil
}

if chunkType != swarm.ChunkTypeSingleOwner {
return nil
r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
}

r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
}

// different address, same batch, index collision
if loadedStampIndex {
if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) {
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
Expand Down
45 changes: 34 additions & 11 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,14 @@ func TestSameChunkAddress(t *testing.T) {
bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes())
binBinIDs[bin] += 1
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
if err != nil {
t.Fatal(err)
}
bin2 := swarm.Proximity(baseAddr.Bytes(), ch2.Address().Bytes())
binBinIDs[bin2] += 1
size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
if size2-size1 != 2 {
t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1)
}
})

Expand Down Expand Up @@ -269,11 +271,20 @@ func TestSameChunkAddress(t *testing.T) {
s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer)
ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 6))
bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes())
err := r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if err != nil {
t.Fatal(err)
}
binBinIDs[bin] += 2
replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin])
checkChunkInIndexStore(t, ts.IndexStore(), bin, binBinIDs[bin]-1, ch1)
checkChunkInIndexStore(t, ts.IndexStore(), bin, binBinIDs[bin], ch2)
size2 := r.Size()
if size2-size1 != 1 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
if size2-size1 != 2 {
t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1)
}
})

Expand Down Expand Up @@ -435,16 +446,17 @@ func TestSameChunkAddress(t *testing.T) {
ch3BinID := binBinIDs[bin2]

checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, true)
// different index, same batch
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch3.Stamp().BatchID(), Address: ch3.Address(), StampHash: ch3StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: ch1BinID, StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch3BinID, StampHash: ch3StampHash}, false)

size2 := r.Size()

// (ch1 + ch2) == 2 and then ch3 reduces reserve size by 1
if size2-size1 != 1 {
// (ch1 + ch2) == 2
if size2-size1 != 2 {
t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1)
}
})
Expand Down Expand Up @@ -908,3 +920,14 @@ func getSigner(t *testing.T) crypto.Signer {
}
return crypto.NewDefaultSigner(privKey)
}

func checkChunkInIndexStore(t *testing.T, s storage.Reader, bin uint8, binId uint64, ch swarm.Chunk) {
t.Helper()
stampHash, err := ch.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

checkStore(t, s, &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false)
checkStore(t, s, &reserve.ChunkBinItem{Bin: bin, BinID: binId, StampHash: stampHash}, false)
}

0 comments on commit 4c40e9a

Please sign in to comment.