diff --git a/cmd/query.go b/cmd/query.go index 55d707b8..735aa63f 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -199,7 +199,7 @@ func queryUnrelayedPackets(ctx *config.Context) *cobra.Command { if err != nil { return err } - sp, err := st.UnrelayedPackets(c[src], c[dst], sh) + sp, err := st.UnrelayedPackets(c[src], c[dst], sh, true) if err != nil { return err } @@ -248,7 +248,7 @@ func queryUnrelayedAcknowledgements(ctx *config.Context) *cobra.Command { return err } - sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh) + sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh, true) if err != nil { return err } diff --git a/cmd/tx.go b/cmd/tx.go index 77ed1bcd..4fb9658f 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -182,7 +182,7 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command { return err } - sp, err := st.UnrelayedPackets(c[src], c[dst], sh) + sp, err := st.UnrelayedPackets(c[src], c[dst], sh, false) if err != nil { return err } @@ -224,7 +224,7 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command { // sp.Src contains all sequences acked on SRC but acknowledgement not processed on DST // sp.Dst contains all sequences acked on DST but acknowledgement not processed on SRC - sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh) + sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh, false) if err != nil { return err } diff --git a/core/naive-strategy.go b/core/naive-strategy.go index 6260231c..b455bdb5 100644 --- a/core/naive-strategy.go +++ b/core/naive-strategy.go @@ -39,24 +39,32 @@ func (st NaiveStrategy) SetupRelay(ctx context.Context, src, dst *ProvableChain) return nil } -func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error) { +func getQueryContext(chain *ProvableChain, sh SyncHeaders, useFinalizedHeader bool) (QueryContext, error) { + if useFinalizedHeader { + return sh.GetQueryContext(chain.ChainID()), nil + } else { + height, err := chain.LatestHeight() + if err != nil { + return nil, err + } + return NewQueryContext(context.TODO(), height), nil + } +} + +func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) { var ( eg = new(errgroup.Group) srcPackets PacketInfoList dstPackets PacketInfoList ) - srcCtx := sh.GetQueryContext(src.ChainID()) - dstCtx := sh.GetQueryContext(dst.ChainID()) - - var srcLatestCtx, dstLatestCtx QueryContext - if srcHeight, err := src.LatestHeight(); err != nil { + srcCtx, err := getQueryContext(src, sh, true) + if err != nil { return nil, err - } else if dstHeight, err := dst.LatestHeight(); err != nil { + } + dstCtx, err := getQueryContext(dst, sh, true) + if err != nil { return nil, err - } else { - srcLatestCtx = NewQueryContext(context.TODO(), srcHeight) - dstLatestCtx = NewQueryContext(context.TODO(), dstHeight) } eg.Go(func() error { @@ -83,26 +91,40 @@ func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders return nil, err } - eg.Go(func() error { - seqs, err := dst.QueryUnreceivedPackets(dstLatestCtx, srcPackets.ExtractSequenceList()) + // If includeRelayedButUnfinalized is true, this function should return packets of which RecvPacket is not finalized yet. + // In this case, filtering packets by QueryUnreceivedPackets is not needed because QueryUnfinalizedRelayPackets + // has already returned packets that completely match this condition. + if !includeRelayedButUnfinalized { + srcCtx, err := getQueryContext(src, sh, false) if err != nil { - return err + return nil, err } - srcPackets = srcPackets.Filter(seqs) - return nil - }) - - eg.Go(func() error { - seqs, err := src.QueryUnreceivedPackets(srcLatestCtx, dstPackets.ExtractSequenceList()) + dstCtx, err := getQueryContext(dst, sh, false) if err != nil { - return err + return nil, err } - dstPackets = dstPackets.Filter(seqs) - return nil - }) - if err := eg.Wait(); err != nil { - return nil, err + eg.Go(func() error { + seqs, err := dst.QueryUnreceivedPackets(dstCtx, srcPackets.ExtractSequenceList()) + if err != nil { + return err + } + srcPackets = srcPackets.Filter(seqs) + return nil + }) + + eg.Go(func() error { + seqs, err := src.QueryUnreceivedPackets(srcCtx, dstPackets.ExtractSequenceList()) + if err != nil { + return err + } + dstPackets = dstPackets.Filter(seqs) + return nil + }) + + if err := eg.Wait(); err != nil { + return nil, err + } } return &RelayPackets{ @@ -111,7 +133,7 @@ func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders }, nil } -func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets, sh SyncHeaders) error { +func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error { // set the maximum relay transaction constraints msgs := &RelayMsgs{ Src: []sdk.Msg{}, @@ -131,7 +153,7 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets, return err } - if len(sp.Src) > 0 { + if len(rp.Src) > 0 { hs, err := sh.SetupHeadersForUpdate(src, dst) if err != nil { return err @@ -141,7 +163,7 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets, } } - if len(sp.Dst) > 0 { + if len(rp.Dst) > 0 { hs, err := sh.SetupHeadersForUpdate(dst, src) if err != nil { return err @@ -151,11 +173,11 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets, } } - packetsForDst, err := collectPackets(srcCtx, src, sp.Src, dstAddress) + packetsForDst, err := collectPackets(srcCtx, src, rp.Src, dstAddress) if err != nil { return err } - packetsForSrc, err := collectPackets(dstCtx, dst, sp.Dst, srcAddress) + packetsForSrc, err := collectPackets(dstCtx, dst, rp.Dst, srcAddress) if err != nil { return err } @@ -182,24 +204,20 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets, return nil } -func (st NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error) { +func (st NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) { var ( eg = new(errgroup.Group) srcAcks PacketInfoList dstAcks PacketInfoList ) - srcCtx := sh.GetQueryContext(src.ChainID()) - dstCtx := sh.GetQueryContext(dst.ChainID()) - - var srcCtxLatest, dstCtxLatest QueryContext - if srcHeight, err := src.LatestHeight(); err != nil { + srcCtx, err := getQueryContext(src, sh, true) + if err != nil { return nil, err - } else if dstHeight, err := dst.LatestHeight(); err != nil { + } + dstCtx, err := getQueryContext(dst, sh, true) + if err != nil { return nil, err - } else { - srcCtxLatest = NewQueryContext(context.TODO(), srcHeight) - dstCtxLatest = NewQueryContext(context.TODO(), dstHeight) } eg.Go(func() error { @@ -228,26 +246,40 @@ func (st NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh Sy return nil, err } - eg.Go(func() error { - seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtxLatest, srcAcks.ExtractSequenceList()) + // If includeRelayedButUnfinalized is true, this function should return packets of which AcknowledgePacket is not finalized yet. + // In this case, filtering packets by QueryUnreceivedAcknowledgements is not needed because QueryUnfinalizedRelayAcknowledgements + // has already returned packets that completely match this condition. + if !includeRelayedButUnfinalized { + srcCtx, err := getQueryContext(src, sh, false) if err != nil { - return err + return nil, err } - srcAcks = srcAcks.Filter(seqs) - return nil - }) - - eg.Go(func() error { - seqs, err := src.QueryUnreceivedAcknowledgements(srcCtxLatest, dstAcks.ExtractSequenceList()) + dstCtx, err := getQueryContext(dst, sh, false) if err != nil { - return err + return nil, err } - dstAcks = dstAcks.Filter(seqs) - return nil - }) - if err := eg.Wait(); err != nil { - return nil, err + eg.Go(func() error { + seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtx, srcAcks.ExtractSequenceList()) + if err != nil { + return err + } + srcAcks = srcAcks.Filter(seqs) + return nil + }) + + eg.Go(func() error { + seqs, err := src.QueryUnreceivedAcknowledgements(srcCtx, dstAcks.ExtractSequenceList()) + if err != nil { + return err + } + dstAcks = dstAcks.Filter(seqs) + return nil + }) + + if err := eg.Wait(); err != nil { + return nil, err + } } return &RelayPackets{ @@ -278,7 +310,7 @@ func logPacketsRelayed(src, dst Chain, num int) { num, dst.ChainID(), dst.Path().PortID, src.ChainID(), src.Path().PortID) } -func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *RelayPackets, sh SyncHeaders) error { +func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error { // set the maximum relay transaction constraints msgs := &RelayMsgs{ Src: []sdk.Msg{}, @@ -298,7 +330,7 @@ func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *Relay return err } - if len(sp.Src) > 0 { + if len(rp.Src) > 0 { hs, err := sh.SetupHeadersForUpdate(src, dst) if err != nil { return err @@ -308,7 +340,7 @@ func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *Relay } } - if len(sp.Dst) > 0 { + if len(rp.Dst) > 0 { hs, err := sh.SetupHeadersForUpdate(dst, src) if err != nil { return err @@ -318,11 +350,11 @@ func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *Relay } } - acksForDst, err := collectAcks(srcCtx, src, sp.Src, dstAddress) + acksForDst, err := collectAcks(srcCtx, src, rp.Src, dstAddress) if err != nil { return err } - acksForSrc, err := collectAcks(dstCtx, dst, sp.Dst, srcAddress) + acksForSrc, err := collectAcks(dstCtx, dst, rp.Dst, srcAddress) if err != nil { return err } diff --git a/core/service.go b/core/service.go index 6c1cbee9..7be3374e 100644 --- a/core/service.go +++ b/core/service.go @@ -65,7 +65,7 @@ func (srv *RelayService) Serve(ctx context.Context) error { // relay packets if unrelayed seqs exist - pseqs, err := srv.st.UnrelayedPackets(srv.src, srv.dst, srv.sh) + pseqs, err := srv.st.UnrelayedPackets(srv.src, srv.dst, srv.sh, false) if err != nil { return err } @@ -75,7 +75,7 @@ func (srv *RelayService) Serve(ctx context.Context) error { // relay acks if unrelayed seqs exist - aseqs, err := srv.st.UnrelayedAcknowledgements(srv.src, srv.dst, srv.sh) + aseqs, err := srv.st.UnrelayedAcknowledgements(srv.src, srv.dst, srv.sh, false) if err != nil { return err } diff --git a/core/strategies.go b/core/strategies.go index 9d81c5f9..c76bb5e6 100644 --- a/core/strategies.go +++ b/core/strategies.go @@ -7,12 +7,25 @@ import ( // StrategyI defines type StrategyI interface { + // GetType returns the type of the strategy GetType() string + + // SetupRelay performs chain-specific setup for `src` and `dst` before starting the relay SetupRelay(ctx context.Context, src, dst *ProvableChain) error - UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error) + + // UnrelayedPackets returns packets to execute RecvPacket to on `src` and `dst`. + // `includeRelayedButUnfinalized` decides if the result includes packets of which recvPacket has been executed but not finalized + UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) + + // RelayPackets executes RecvPacket to the packets contained in `rp` on both chains (`src` and `dst`). RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error - UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error) - RelayAcknowledgements(src, dst *ProvableChain, ra *RelayPackets, sh SyncHeaders) error + + // UnrelayedAcknowledgements returns packets to execute AcknowledgePacket to on `src` and `dst`. + // `includeRelayedButUnfinalized` decides if the result includes packets of which acknowledgePacket has been executed but not finalized + UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) + + // RelayAcknowledgements executes AcknowledgePacket to the packets contained in `rp` on both chains (`src` and `dst`). + RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error } // StrategyCfg defines which relaying strategy to take for a given path