Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: fast track critical pending outtx by reducing re-try interval #958

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 32 additions & 41 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package zetaclient
import (
"fmt"
"math"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -123,14 +122,25 @@ func (co *CoreObserver) startSendScheduler() {

supportedChains := GetSupportedChains()
for _, c := range supportedChains {

if c == nil {
co.logger.ZetaChainWatcher.Error().Msg("chain nil")
continue
}
signer := co.signerMap[*c]
chainClient := co.clientMap[*c]
sendList, err := co.bridge.GetAllPendingCctx(uint64(c.ChainId))
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("failed to GetAllPendingCctx for chain %s", c.ChainName.String())
continue
}
res, err := co.bridge.GetAllOutTxTrackerByChain(*c)
if err != nil {
co.logger.ZetaChainWatcher.Warn().Err(err).Msgf("failed to GetAllOutTxTrackerByChain for chain %s", c.ChainName.String())
}
trackerMap := make(map[uint64]bool)
for _, v := range res {
trackerMap[v.Nonce] = true
}

for idx, send := range sendList {
if send.GetCurrentOutTxParam().ReceiverChainId != c.ChainId {
Expand Down Expand Up @@ -180,7 +190,26 @@ func (co *CoreObserver) startSendScheduler() {
co.logger.ZetaChainWatcher.Error().Msgf("unknown ob type on chain %s: type %T", chain, ob)
continue
}
if nonce%interval == currentHeight%interval && !outTxMan.IsOutTxActive(outTxID) {

// determining critical outtx; if it satisfies following criteria
// 1. it's the first pending outtx for this chain
// 2. the following 5 nonces have been in tracker
isCritical := false
criticalInterval := uint64(10) // for critical pending outTx we reduce re-try interval
if nonce%criticalInterval == currentHeight%criticalInterval && idx == 0 {
nextFiveNonce := true
for i := nonce + 1; i <= nonce+5; i++ {
if _, found := trackerMap[i]; !found {
nextFiveNonce = false
break
}
}
if nextFiveNonce {
isCritical = true
}
}

if (isCritical || nonce%interval == currentHeight%interval) && !outTxMan.IsOutTxActive(outTxID) {
outTxMan.StartTryProcess(outTxID)
co.logger.ZetaChainWatcher.Debug().Msgf("chain %s: Sign outtx %s with value %d\n", chain, send.Index, send.GetCurrentOutTxParam().Amount)
go signer.TryProcessOutTx(send, outTxMan, outTxID, chainClient, co.bridge, currentHeight)
Expand All @@ -200,44 +229,6 @@ func (co *CoreObserver) startSendScheduler() {
}
}

// trim "bogus" pending sends that are not actually pending
// input sends must be sorted by nonce ascending
func trimSends(sends []*types.CrossChainTx) int {
start := 0
for i := len(sends) - 1; i >= 1; i-- {
// from right to left, if there's a big hole, then before the gap are probably
// bogus "pending" sends that are already processed but not yet confirmed.
if sends[i].GetCurrentOutTxParam().OutboundTxTssNonce > sends[i-1].GetCurrentOutTxParam().OutboundTxTssNonce+1000 {
start = i
break
}
}
return start
}

func SplitAndSortSendListByChain(sendList []*types.CrossChainTx) map[string][]*types.CrossChainTx {
sendMap := make(map[string][]*types.CrossChainTx)
for _, send := range sendList {
targetChain, err := GetTargetChain(send)
if targetChain == "" || err != nil {
continue
}
if _, found := sendMap[targetChain]; !found {
sendMap[targetChain] = make([]*types.CrossChainTx, 0)
}
sendMap[targetChain] = append(sendMap[targetChain], send)
}
for chain, sends := range sendMap {
sort.Slice(sends, func(i, j int) bool {
return sends[i].GetCurrentOutTxParam().OutboundTxTssNonce < sends[j].GetCurrentOutTxParam().OutboundTxTssNonce
})
start := trimSends(sends)
sendMap[chain] = sends[start:]
log.Debug().Msgf("chain %s, start %d, len %d, start nonce %d", chain, start, len(sendMap[chain]), sends[start].GetCurrentOutTxParam().OutboundTxTssNonce)
}
return sendMap
}

func GetTargetChain(send *types.CrossChainTx) (string, error) {
chainID := send.GetCurrentOutTxParam().ReceiverChainId
chain := common.GetChainFromChainID(chainID)
Expand Down
Loading