diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index b6e48b0264..d482e8cc32 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -3,7 +3,6 @@ package zetaclient import ( "fmt" "math" - "sort" "strings" "time" @@ -123,7 +122,10 @@ func (co *CoreObserver) startSendScheduler() { supportedChains := GetSupportedChains() for _, c := range supportedChains { - + if c == nil { + co.logger.ZetaChainWatcher.Error().Msg("chain nil") + continue + } signer := co.signerMap[*c] chainClient := co.clientMap[*c] sendList, err := co.bridge.GetAllPendingCctx(uint64(c.ChainId)) @@ -131,6 +133,14 @@ func (co *CoreObserver) startSendScheduler() { co.logger.ZetaChainWatcher.Error().Err(err).Msgf("failed to GetAllPendingCctx for chain %s", c.ChainName.String()) continue } + res, err := co.bridge.GetAllOutTxTrackerByChain(*c) + if err != nil { + co.logger.ZetaChainWatcher.Warn().Err(err).Msgf("failed to GetAllOutTxTrackerByChain for chain %s", c.ChainName.String()) + } + trackerMap := make(map[uint64]bool) + for _, v := range res { + trackerMap[v.Nonce] = true + } for idx, send := range sendList { if send.GetCurrentOutTxParam().ReceiverChainId != c.ChainId { @@ -180,7 +190,27 @@ func (co *CoreObserver) startSendScheduler() { co.logger.ZetaChainWatcher.Error().Msgf("unknown ob type on chain %s: type %T", chain, ob) continue } - if nonce%interval == currentHeight%interval && !outTxMan.IsOutTxActive(outTxID) { + + // determining critical outtx; if it satisfies following criteria + // 1. it's the first pending outtx for this chain + // 2. the following 5 nonces have been in tracker + isCritical := false + criticalInterval := uint64(10) // for critical pending outTx we reduce re-try interval + if nonce%criticalInterval == currentHeight%criticalInterval && idx == 0 { + nextFiveNonce := true + for i := nonce + 1; i <= nonce+5; i++ { + if _, found := trackerMap[i]; !found { + nextFiveNonce = false + break + } + } + if nextFiveNonce { + isCritical = true + break + } + } + + if (isCritical || nonce%interval == currentHeight%interval) && !outTxMan.IsOutTxActive(outTxID) { outTxMan.StartTryProcess(outTxID) co.logger.ZetaChainWatcher.Debug().Msgf("chain %s: Sign outtx %s with value %d\n", chain, send.Index, send.GetCurrentOutTxParam().Amount) go signer.TryProcessOutTx(send, outTxMan, outTxID, chainClient, co.bridge, currentHeight) @@ -200,44 +230,6 @@ func (co *CoreObserver) startSendScheduler() { } } -// trim "bogus" pending sends that are not actually pending -// input sends must be sorted by nonce ascending -func trimSends(sends []*types.CrossChainTx) int { - start := 0 - for i := len(sends) - 1; i >= 1; i-- { - // from right to left, if there's a big hole, then before the gap are probably - // bogus "pending" sends that are already processed but not yet confirmed. - if sends[i].GetCurrentOutTxParam().OutboundTxTssNonce > sends[i-1].GetCurrentOutTxParam().OutboundTxTssNonce+1000 { - start = i - break - } - } - return start -} - -func SplitAndSortSendListByChain(sendList []*types.CrossChainTx) map[string][]*types.CrossChainTx { - sendMap := make(map[string][]*types.CrossChainTx) - for _, send := range sendList { - targetChain, err := GetTargetChain(send) - if targetChain == "" || err != nil { - continue - } - if _, found := sendMap[targetChain]; !found { - sendMap[targetChain] = make([]*types.CrossChainTx, 0) - } - sendMap[targetChain] = append(sendMap[targetChain], send) - } - for chain, sends := range sendMap { - sort.Slice(sends, func(i, j int) bool { - return sends[i].GetCurrentOutTxParam().OutboundTxTssNonce < sends[j].GetCurrentOutTxParam().OutboundTxTssNonce - }) - start := trimSends(sends) - sendMap[chain] = sends[start:] - log.Debug().Msgf("chain %s, start %d, len %d, start nonce %d", chain, start, len(sendMap[chain]), sends[start].GetCurrentOutTxParam().OutboundTxTssNonce) - } - return sendMap -} - func GetTargetChain(send *types.CrossChainTx) (string, error) { chainID := send.GetCurrentOutTxParam().ReceiverChainId chain := common.GetChainFromChainID(chainID)