Skip to content

Commit

Permalink
fast track critical pending outtx by reducing re-try interval
Browse files Browse the repository at this point in the history
  • Loading branch information
brewmaster012 committed Aug 11, 2023
1 parent 5fed6e6 commit 3606d01
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package zetaclient
import (
"fmt"
"math"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -123,14 +122,25 @@ func (co *CoreObserver) startSendScheduler() {

supportedChains := GetSupportedChains()
for _, c := range supportedChains {

if c == nil {
co.logger.ZetaChainWatcher.Error().Msg("chain nil")
continue
}
signer := co.signerMap[*c]
chainClient := co.clientMap[*c]
sendList, err := co.bridge.GetAllPendingCctx(uint64(c.ChainId))
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("failed to GetAllPendingCctx for chain %s", c.ChainName.String())
continue
}
res, err := co.bridge.GetAllOutTxTrackerByChain(*c)
if err != nil {
co.logger.ZetaChainWatcher.Warn().Err(err).Msgf("failed to GetAllOutTxTrackerByChain for chain %s", c.ChainName.String())
}
trackerMap := make(map[uint64]bool)
for _, v := range res {
trackerMap[v.Nonce] = true
}

for idx, send := range sendList {
if send.GetCurrentOutTxParam().ReceiverChainId != c.ChainId {
Expand Down Expand Up @@ -180,7 +190,27 @@ func (co *CoreObserver) startSendScheduler() {
co.logger.ZetaChainWatcher.Error().Msgf("unknown ob type on chain %s: type %T", chain, ob)
continue
}
if nonce%interval == currentHeight%interval && !outTxMan.IsOutTxActive(outTxID) {

// determining critical outtx; if it satisfies following criteria
// 1. it's the first pending outtx for this chain
// 2. the following 5 nonces have been in tracker
isCritical := false
criticalInterval := uint64(10) // for critical pending outTx we reduce re-try interval
if nonce%criticalInterval == currentHeight%criticalInterval && idx == 0 {
nextFiveNonce := true
for i := nonce + 1; i <= nonce+5; i++ {
if _, found := trackerMap[i]; !found {
nextFiveNonce = false
break
}
}
if nextFiveNonce {
isCritical = true
break
}
}

if (isCritical || nonce%interval == currentHeight%interval) && !outTxMan.IsOutTxActive(outTxID) {
outTxMan.StartTryProcess(outTxID)
co.logger.ZetaChainWatcher.Debug().Msgf("chain %s: Sign outtx %s with value %d\n", chain, send.Index, send.GetCurrentOutTxParam().Amount)
go signer.TryProcessOutTx(send, outTxMan, outTxID, chainClient, co.bridge, currentHeight)
Expand All @@ -200,44 +230,6 @@ func (co *CoreObserver) startSendScheduler() {
}
}

// trim "bogus" pending sends that are not actually pending
// input sends must be sorted by nonce ascending
func trimSends(sends []*types.CrossChainTx) int {
start := 0
for i := len(sends) - 1; i >= 1; i-- {
// from right to left, if there's a big hole, then before the gap are probably
// bogus "pending" sends that are already processed but not yet confirmed.
if sends[i].GetCurrentOutTxParam().OutboundTxTssNonce > sends[i-1].GetCurrentOutTxParam().OutboundTxTssNonce+1000 {
start = i
break
}
}
return start
}

func SplitAndSortSendListByChain(sendList []*types.CrossChainTx) map[string][]*types.CrossChainTx {
sendMap := make(map[string][]*types.CrossChainTx)
for _, send := range sendList {
targetChain, err := GetTargetChain(send)
if targetChain == "" || err != nil {
continue
}
if _, found := sendMap[targetChain]; !found {
sendMap[targetChain] = make([]*types.CrossChainTx, 0)
}
sendMap[targetChain] = append(sendMap[targetChain], send)
}
for chain, sends := range sendMap {
sort.Slice(sends, func(i, j int) bool {
return sends[i].GetCurrentOutTxParam().OutboundTxTssNonce < sends[j].GetCurrentOutTxParam().OutboundTxTssNonce
})
start := trimSends(sends)
sendMap[chain] = sends[start:]
log.Debug().Msgf("chain %s, start %d, len %d, start nonce %d", chain, start, len(sendMap[chain]), sends[start].GetCurrentOutTxParam().OutboundTxTssNonce)
}
return sendMap
}

func GetTargetChain(send *types.CrossChainTx) (string, error) {
chainID := send.GetCurrentOutTxParam().ReceiverChainId
chain := common.GetChainFromChainID(chainID)
Expand Down

0 comments on commit 3606d01

Please sign in to comment.