Skip to content

Commit

Permalink
Merge pull request #5852 from Algo-devops-service/relstable3.20.1
Browse files Browse the repository at this point in the history
  • Loading branch information
algojohnlee authored Dec 5, 2023
2 parents 7037cb3 + 14c0d8d commit 6a6a15d
Show file tree
Hide file tree
Showing 167 changed files with 6,570 additions and 2,679 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Please refer to our [CONTRIBUTING](CONTRIBUTING.md) document.

## Project Layout

`go-algorand` is split into various subsystems containing varius packages.
`go-algorand` is split into various subsystems containing various packages.

### Core

Expand Down
2 changes: 1 addition & 1 deletion buildnumber.dat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0
1
42 changes: 24 additions & 18 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -370,9 +371,11 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error

attemptsCount := 0
var blk *bookkeeping.Block
var cert *agreement.Certificate
// check to see if the current ledger might have this block. If so, we should try this first instead of downloading anything.
if ledgerBlock, err := cs.ledger.Block(blockRound); err == nil {
if ledgerBlock, ledgerCert, err0 := cs.ledger.BlockCert(blockRound); err0 == nil {
blk = &ledgerBlock
cert = &ledgerCert
}
var protoParams config.ConsensusParams
var ok bool
Expand All @@ -384,7 +387,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -462,7 +465,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreBalancesRound : %v", err))
}

err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk, cert)
if err != nil {
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
Expand Down Expand Up @@ -542,29 +545,32 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
prevBlock := &topBlock
blocksFetched := uint64(1) // we already got the first block in the previous step.
var blk *bookkeeping.Block
var cert *agreement.Certificate
for retryCount := uint64(1); blocksFetched <= lookback; {
if err := cs.ctx.Err(); err != nil {
return cs.stopOrAbort()
}

blk = nil
cert = nil
// check to see if the current ledger might have this block. If so, we should try this first instead of downloading anything.
if ledgerBlock, err := cs.ledger.Block(topBlock.Round() - basics.Round(blocksFetched)); err == nil {
if ledgerBlock, ledgerCert, err0 := cs.ledger.BlockCert(topBlock.Round() - basics.Round(blocksFetched)); err0 == nil {
blk = &ledgerBlock
cert = &ledgerCert
} else {
switch err.(type) {
switch err0.(type) {
case ledgercore.ErrNoEntry:
// this is expected, ignore this one.
default:
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err)
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0)
}
}

var psp *peerSelectorPeer
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -624,7 +630,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

// all good, persist and move on.
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk, cert)
if err != nil {
cs.log.Warnf("processStageBlocksDownload failed to store downloaded staging block for round %d", blk.Round())
cs.updateBlockRetrievalStatistics(-1, -1)
Expand All @@ -649,17 +655,17 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
// fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer.
// The method return stop=true if the caller should exit the current operation
// If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed.
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from : %w", err)
return nil, time.Duration(0), psp, true, cs.abort(err)
return nil, nil, time.Duration(0), psp, true, cs.abort(err)
}
peer := psp.Peer

Expand All @@ -669,26 +675,26 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
}
fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config)
blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
blk, cert, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
if err != nil {
if cs.ctx.Err() != nil {
return nil, time.Duration(0), psp, true, cs.stopOrAbort()
return nil, nil, time.Duration(0), psp, true, cs.stopOrAbort()
}
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
}
// success
return blk, downloadDuration, psp, false, nil
return blk, cert, downloadDuration, psp, false, nil
}

// processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality.
Expand Down
70 changes: 67 additions & 3 deletions catchup/catchpointService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,37 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/components/mocks"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)

type catchpointCatchupLedger struct {
}

func (l *catchpointCatchupLedger) Block(rnd basics.Round) (blk bookkeeping.Block, err error) {
func (l *catchpointCatchupLedger) BlockCert(rnd basics.Round) (blk bookkeeping.Block, cert agreement.Certificate, err error) {
blk = bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
UpgradeState: bookkeeping.UpgradeState{
CurrentProtocol: protocol.ConsensusCurrentVersion,
},
},
}
cert = agreement.Certificate{}
commitments, err := blk.PaysetCommit()
if err != nil {
return blk, err
return blk, cert, err
}
blk.TxnCommitments = commitments

return blk, nil
return blk, cert, nil
}

func (l *catchpointCatchupLedger) GenesisHash() (d crypto.Digest) {
Expand Down Expand Up @@ -95,3 +98,64 @@ func TestCatchpointServicePeerRank(t *testing.T) {
err := cs.processStageLatestBlockDownload()
require.NoError(t, err)
}

type catchpointAccessorMock struct {
mocks.MockCatchpointCatchupAccessor
t *testing.T
topBlk bookkeeping.Block
}

func (m *catchpointAccessorMock) EnsureFirstBlock(ctx context.Context) (blk bookkeeping.Block, err error) {
return m.topBlk, nil
}

func (m *catchpointAccessorMock) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) {
require.NotNil(m.t, blk)
require.NotNil(m.t, cert)
return nil
}

type catchpointCatchupLedger2 struct {
catchpointCatchupLedger
blk bookkeeping.Block
}

func (l *catchpointCatchupLedger2) BlockCert(rnd basics.Round) (blk bookkeeping.Block, cert agreement.Certificate, err error) {
return l.blk, agreement.Certificate{}, nil
}

// TestProcessStageBlocksDownloadNilCert ensures StoreBlock does not receive a nil certificate when ledger has already had a block.
// It uses two mocks catchpointAccessorMock and catchpointCatchupLedger2 and pre-crafted blocks to make a single iteration of processStageBlocksDownload.
func TestProcessStageBlocksDownloadNilCert(t *testing.T) {
partitiontest.PartitionTest(t)

var err error
blk1 := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: 1,
UpgradeState: bookkeeping.UpgradeState{
CurrentProtocol: protocol.ConsensusCurrentVersion,
},
},
}
blk1.TxnCommitments, err = blk1.PaysetCommit()
require.NoError(t, err)

blk2 := blk1
blk2.BlockHeader.Round = 2
blk2.BlockHeader.Branch = blk1.Hash()
blk2.TxnCommitments, err = blk2.PaysetCommit()
require.NoError(t, err)

ctx, cf := context.WithCancel(context.Background())
cs := CatchpointCatchupService{
ctx: ctx,
cancelCtxFunc: cf,
ledgerAccessor: &catchpointAccessorMock{topBlk: blk2, t: t},
ledger: &catchpointCatchupLedger2{blk: blk1},
log: logging.TestingLog(t),
}

err = cs.processStageBlocksDownload()
require.NoError(t, err)
}
50 changes: 31 additions & 19 deletions catchup/peerSelector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
peerRank4LowBlockTime = 801
peerRank4HighBlockTime = 999

// peerRankNoBlockForRound is used for responses failed because of no block for round
// This indicates a peer is either behind or a block has not happened yet, or does not have a block that is old enough.
peerRankNoBlockForRound = 2000

// peerRankDownloadFailed is used for responses which could be temporary, such as missing files, or such that we don't
// have clear resolution
peerRankDownloadFailed = 10000
Expand Down Expand Up @@ -110,11 +114,13 @@ type peerPool struct {
// client to provide feedback regarding the peer's performance, and to have the subsequent
// query(s) take advantage of that intel.
type peerSelector struct {
mu deadlock.Mutex
net peersRetriever
mu deadlock.Mutex
net peersRetriever
// peerClasses is the list of peer classes we want to have in the peerSelector.
peerClasses []peerClass
pools []peerPool
counter uint64
// pools is the list of peer pools, each pool contains a list of peers with the same rank.
pools []peerPool
counter uint64
}

// historicStats stores the past windowSize ranks for the peer passed
Expand All @@ -141,7 +147,7 @@ func makeHistoricStatus(windowSize int, class peerClass) *historicStats {
// that will determine the rank of the peer.
hs := historicStats{
windowSize: windowSize,
rankSamples: make([]int, windowSize, windowSize),
rankSamples: make([]int, windowSize),
requestGaps: make([]uint64, 0, windowSize),
rankSum: uint64(class.initialRank) * uint64(windowSize),
gapSum: 0.0}
Expand Down Expand Up @@ -227,18 +233,24 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera

// Download may fail for various reasons. Give it additional tries
// and see if it recovers/improves.
if value == peerRankDownloadFailed {
factor := float64(1.0)
switch value {
// - Set the rank to the class upper bound multiplied
// by the number of downloadFailures.
// - Each downloadFailure increments the counter, and
// each non-failure decrements it, until it gets to 0.
// - When the peer is consistently failing to
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
case peerRankNoBlockForRound:
// for the no block errors apply very smooth rank increase
factor = 0.1
fallthrough
case peerRankDownloadFailed:
hs.downloadFailures++
// - Set the rank to the class upper bound multiplied
// by the number of downloadFailures.
// - Each downloadFailure increments the counter, and
// each non-failure decrements it, until it gets to 0.
// - When the peer is consistently failing to
// download, the value added to rankSum will
// increase at an increasing rate to evict the peer
// from the class sooner.
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)))
} else {
value = upperBound(class) * int(math.Exp2(float64(hs.downloadFailures)*factor))
default:
if hs.downloadFailures > 0 {
hs.downloadFailures--
}
Expand All @@ -250,12 +262,12 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
// The average performance of the peer
average := float64(hs.rankSum) / float64(len(hs.rankSamples))

if int(average) > upperBound(class) && initialRank == peerRankDownloadFailed {
if int(average) > upperBound(class) && (initialRank == peerRankDownloadFailed || initialRank == peerRankNoBlockForRound) {
// peerRankDownloadFailed will be delayed, to give the peer
// additional time to improve. If does not improve over time,
// the average will exceed the class limit. At this point,
// it will be pushed down to download failed class.
return peerRankDownloadFailed
return initialRank
}

// A penalty is added relative to how freequently the peer is used
Expand Down Expand Up @@ -468,7 +480,7 @@ func (ps *peerSelector) refreshAvailablePeers() {
for peerIdx := len(pool.peers) - 1; peerIdx >= 0; peerIdx-- {
peer := pool.peers[peerIdx].peer
if peerAddress := peerAddress(peer); peerAddress != "" {
if toRemove, _ := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
if toRemove := existingPeers[pool.peers[peerIdx].class.peerClass][peerAddress]; toRemove {
// need to be removed.
pool.peers = append(pool.peers[:peerIdx], pool.peers[peerIdx+1:]...)
}
Expand Down
Loading

0 comments on commit 6a6a15d

Please sign in to comment.