Skip to content

Commit

Permalink
Merge pull request #5793 from Algo-devops-service/relstable3.19.0
Browse files Browse the repository at this point in the history
  • Loading branch information
algojohnlee authored Oct 20, 2023
2 parents 4ed7ec0 + bcdc8e7 commit 7037cb3
Show file tree
Hide file tree
Showing 306 changed files with 56,089 additions and 14,815 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ crypto/libs

# doc intermediates
data/transactions/logic/*.md
!data/transactions/logic/TEAL_opcodes*.md

*.pem

Expand Down
89 changes: 73 additions & 16 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package agreement
import (
"context"
"fmt"
"time"

"github.com/algorand/go-algorand/logging/logspec"
"github.com/algorand/go-algorand/logging/telemetryspec"
Expand Down Expand Up @@ -65,6 +66,7 @@ type action interface {
do(context.Context, *Service)

String() string
ComparableStr() string
}

type nonpersistent struct{}
Expand All @@ -87,6 +89,8 @@ func (a noopAction) String() string {
return a.t().String()
}

func (a noopAction) ComparableStr() string { return a.String() }

type networkAction struct {
nonpersistent

Expand Down Expand Up @@ -119,6 +123,13 @@ func (a networkAction) String() string {
return fmt.Sprintf("%s: %2v", a.t().String(), a.Tag)
}

func (a networkAction) ComparableStr() string {
if a.Tag == protocol.AgreementVoteTag {
return fmt.Sprintf("%s: %2v: %3v-%2v-%2v", a.t().String(), a.Tag, a.UnauthenticatedVote.R.Round, a.UnauthenticatedVote.R.Period, a.UnauthenticatedVote.R.Step)
}
return a.String()
}

func (a networkAction) do(ctx context.Context, s *Service) {
if a.T == broadcastVotes {
tag := protocol.AgreementVoteTag
Expand Down Expand Up @@ -191,6 +202,18 @@ func (a cryptoAction) String() string {
return a.t().String()
}

func (a cryptoAction) ComparableStr() (s string) {
switch a.T {
case verifyVote:
s = fmt.Sprintf("%s: %3v-%2v TaskIndex %d", a.t().String(), a.Round, a.Period, a.TaskIndex)
case verifyPayload:
s = fmt.Sprintf("%s: %3v-%2v Pinned %v", a.t().String(), a.Round, a.Period, a.Pinned)
case verifyBundle:
s = fmt.Sprintf("%s: %3v-%2v-%2v", a.t().String(), a.Round, a.Period, a.Step)
}
return
}

func (a cryptoAction) do(ctx context.Context, s *Service) {
switch a.T {
case verifyVote:
Expand All @@ -209,6 +232,11 @@ type ensureAction struct {
Payload proposal
// the certificate proving commitment
Certificate Certificate

// The time that the winning proposal-vote was validated, relative to the beginning of the round
voteValidatedAt time.Duration
// The dynamic filter timeout calculated for this round, even if not enabled, for reporting to telemetry.
dynamicFilterTimeout time.Duration
}

func (a ensureAction) t() actionType {
Expand All @@ -219,6 +247,8 @@ func (a ensureAction) String() string {
return fmt.Sprintf("%s: %.5s: %v, %v, %.5s", a.t().String(), a.Payload.Digest().String(), a.Certificate.Round, a.Certificate.Period, a.Certificate.Proposal.BlockDigest.String())
}

func (a ensureAction) ComparableStr() string { return a.String() }

func (a ensureAction) do(ctx context.Context, s *Service) {
logEvent := logspec.AgreementEvent{
Hash: a.Certificate.Proposal.BlockDigest.String(),
Expand All @@ -231,29 +261,33 @@ func (a ensureAction) do(ctx context.Context, s *Service) {
logEvent.Type = logspec.RoundConcluded
s.log.with(logEvent).Infof("committed round %d with pre-validated block %v", a.Certificate.Round, a.Certificate.Proposal)
s.log.EventWithDetails(telemetryspec.Agreement, telemetryspec.BlockAcceptedEvent, telemetryspec.BlockAcceptedEventDetails{
Address: a.Certificate.Proposal.OriginalProposer.String(),
Hash: a.Certificate.Proposal.BlockDigest.String(),
Round: uint64(a.Certificate.Round),
ValidatedAt: a.Payload.validatedAt,
ReceivedAt: a.Payload.receivedAt,
PreValidated: true,
PropBufLen: uint64(len(s.demux.rawProposals)),
VoteBufLen: uint64(len(s.demux.rawVotes)),
Address: a.Certificate.Proposal.OriginalProposer.String(),
Hash: a.Certificate.Proposal.BlockDigest.String(),
Round: uint64(a.Certificate.Round),
ValidatedAt: a.Payload.validatedAt,
ReceivedAt: a.Payload.receivedAt,
VoteValidatedAt: a.voteValidatedAt,
DynamicFilterTimeout: a.dynamicFilterTimeout,
PreValidated: true,
PropBufLen: uint64(len(s.demux.rawProposals)),
VoteBufLen: uint64(len(s.demux.rawVotes)),
})
s.Ledger.EnsureValidatedBlock(a.Payload.ve, a.Certificate)
} else {
block := a.Payload.Block
logEvent.Type = logspec.RoundConcluded
s.log.with(logEvent).Infof("committed round %d with block %v", a.Certificate.Round, a.Certificate.Proposal)
s.log.EventWithDetails(telemetryspec.Agreement, telemetryspec.BlockAcceptedEvent, telemetryspec.BlockAcceptedEventDetails{
Address: a.Certificate.Proposal.OriginalProposer.String(),
Hash: a.Certificate.Proposal.BlockDigest.String(),
Round: uint64(a.Certificate.Round),
ValidatedAt: a.Payload.validatedAt,
ReceivedAt: a.Payload.receivedAt,
PreValidated: false,
PropBufLen: uint64(len(s.demux.rawProposals)),
VoteBufLen: uint64(len(s.demux.rawVotes)),
Address: a.Certificate.Proposal.OriginalProposer.String(),
Hash: a.Certificate.Proposal.BlockDigest.String(),
Round: uint64(a.Certificate.Round),
ValidatedAt: a.Payload.validatedAt,
ReceivedAt: a.Payload.receivedAt,
VoteValidatedAt: a.voteValidatedAt,
DynamicFilterTimeout: a.dynamicFilterTimeout,
PreValidated: false,
PropBufLen: uint64(len(s.demux.rawProposals)),
VoteBufLen: uint64(len(s.demux.rawVotes)),
})
s.Ledger.EnsureBlock(block, a.Certificate)
}
Expand All @@ -278,6 +312,8 @@ func (a stageDigestAction) String() string {
return fmt.Sprintf("%s: %.5s. %v. %v", a.t().String(), a.Certificate.Proposal.BlockDigest.String(), a.Certificate.Round, a.Certificate.Period)
}

func (a stageDigestAction) ComparableStr() string { return a.String() }

func (a stageDigestAction) do(ctx context.Context, service *Service) {
logEvent := logspec.AgreementEvent{
Hash: a.Certificate.Proposal.BlockDigest.String(),
Expand All @@ -304,8 +340,25 @@ func (a rezeroAction) String() string {
return a.t().String()
}

func (a rezeroAction) ComparableStr() string {
return fmt.Sprintf("%s: %d", a.t().String(), a.Round)
}

func (a rezeroAction) do(ctx context.Context, s *Service) {
s.Clock = s.Clock.Zero()
// Preserve the zero time of the new round a.Round (for
// period 0) for future use if a late proposal-vote arrives,
// for late credential tracking.
if _, ok := s.historicalClocks[a.Round]; !ok {
s.historicalClocks[a.Round] = s.Clock
}

// Garbage collect clocks that are too old
for rnd := range s.historicalClocks {
if a.Round > rnd+credentialRoundLag {
delete(s.historicalClocks, rnd)
}
}
}

type pseudonodeAction struct {
Expand All @@ -326,6 +379,8 @@ func (a pseudonodeAction) String() string {
return fmt.Sprintf("%v %3v-%2v-%2v: %.5v", a.t().String(), a.Round, a.Period, a.Step, a.Proposal.BlockDigest.String())
}

func (a pseudonodeAction) ComparableStr() string { return a.String() }

func (a pseudonodeAction) persistent() bool {
return a.T == attest
}
Expand Down Expand Up @@ -518,3 +573,5 @@ func (c checkpointAction) do(ctx context.Context, s *Service) {
func (c checkpointAction) String() string {
return c.t().String()
}

func (c checkpointAction) ComparableStr() string { return c.String() }
83 changes: 83 additions & 0 deletions agreement/credentialArrivalHistory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package agreement

import (
"sort"
"time"
)

// credentialArrivalHistory maintains a circular buffer of time.Duration samples.
type credentialArrivalHistory struct {
history []time.Duration
writePtr int
full bool
}

func makeCredentialArrivalHistory(size int) credentialArrivalHistory {
if size < 0 {
panic("can't create CredentialArrivalHistory with negative size")
}
history := credentialArrivalHistory{history: make([]time.Duration, size)}
history.reset()
return history
}

// store saves a new sample into the circular buffer.
// If the buffer is full, it overwrites the oldest sample.
func (history *credentialArrivalHistory) store(sample time.Duration) {
if len(history.history) == 0 {
return
}

history.history[history.writePtr] = sample
history.writePtr++
if history.writePtr == len(history.history) {
history.full = true
history.writePtr = 0
}
}

// reset marks the history buffer as empty
func (history *credentialArrivalHistory) reset() {
history.writePtr = 0
history.full = false
}

// isFull checks if the circular buffer has been fully populated at least once.
func (history *credentialArrivalHistory) isFull() bool {
return history.full
}

// orderStatistics returns the idx'th time duration in the sorted history array.
// It assumes that history is full and the idx is within the array bounds, and
// panics if either of these assumptions doesn't hold.
func (history *credentialArrivalHistory) orderStatistics(idx int) time.Duration {
if !history.isFull() {
panic("history not full")
}
if idx < 0 || idx >= len(history.history) {
panic("index out of bounds")
}

// if history.history is long, then we could optimize this function to use
// the linear time order statistics algorithm.
sortedArrivals := make([]time.Duration, len(history.history))
copy(sortedArrivals[:], history.history[:])
sort.Slice(sortedArrivals, func(i, j int) bool { return sortedArrivals[i] < sortedArrivals[j] })
return sortedArrivals[idx]
}
Loading

0 comments on commit 7037cb3

Please sign in to comment.