Skip to content

Commit

Permalink
Merge pull request #94 from siburu/fix-query-unrelayed
Browse files Browse the repository at this point in the history
Fix StrategyI::Unrelayed{Packet,Acknowledgements}
  • Loading branch information
siburu authored Aug 8, 2023
2 parents 9f64d28 + c57a8bc commit 30449b9
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 69 deletions.
4 changes: 2 additions & 2 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func queryUnrelayedPackets(ctx *config.Context) *cobra.Command {
if err != nil {
return err
}
sp, err := st.UnrelayedPackets(c[src], c[dst], sh)
sp, err := st.UnrelayedPackets(c[src], c[dst], sh, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func queryUnrelayedAcknowledgements(ctx *config.Context) *cobra.Command {
return err
}

sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh)
sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh, true)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {
return err
}

sp, err := st.UnrelayedPackets(c[src], c[dst], sh)
sp, err := st.UnrelayedPackets(c[src], c[dst], sh, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command {

// sp.Src contains all sequences acked on SRC but acknowledgement not processed on DST
// sp.Dst contains all sequences acked on DST but acknowledgement not processed on SRC
sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh)
sp, err := st.UnrelayedAcknowledgements(c[src], c[dst], sh, false)
if err != nil {
return err
}
Expand Down
152 changes: 92 additions & 60 deletions core/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,32 @@ func (st NaiveStrategy) SetupRelay(ctx context.Context, src, dst *ProvableChain)
return nil
}

func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error) {
func getQueryContext(chain *ProvableChain, sh SyncHeaders, useFinalizedHeader bool) (QueryContext, error) {
if useFinalizedHeader {
return sh.GetQueryContext(chain.ChainID()), nil
} else {
height, err := chain.LatestHeight()
if err != nil {
return nil, err
}
return NewQueryContext(context.TODO(), height), nil
}
}

func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) {
var (
eg = new(errgroup.Group)
srcPackets PacketInfoList
dstPackets PacketInfoList
)

srcCtx := sh.GetQueryContext(src.ChainID())
dstCtx := sh.GetQueryContext(dst.ChainID())

var srcLatestCtx, dstLatestCtx QueryContext
if srcHeight, err := src.LatestHeight(); err != nil {
srcCtx, err := getQueryContext(src, sh, true)
if err != nil {
return nil, err
} else if dstHeight, err := dst.LatestHeight(); err != nil {
}
dstCtx, err := getQueryContext(dst, sh, true)
if err != nil {
return nil, err
} else {
srcLatestCtx = NewQueryContext(context.TODO(), srcHeight)
dstLatestCtx = NewQueryContext(context.TODO(), dstHeight)
}

eg.Go(func() error {
Expand All @@ -83,26 +91,40 @@ func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders
return nil, err
}

eg.Go(func() error {
seqs, err := dst.QueryUnreceivedPackets(dstLatestCtx, srcPackets.ExtractSequenceList())
// If includeRelayedButUnfinalized is true, this function should return packets of which RecvPacket is not finalized yet.
// In this case, filtering packets by QueryUnreceivedPackets is not needed because QueryUnfinalizedRelayPackets
// has already returned packets that completely match this condition.
if !includeRelayedButUnfinalized {
srcCtx, err := getQueryContext(src, sh, false)
if err != nil {
return err
return nil, err
}
srcPackets = srcPackets.Filter(seqs)
return nil
})

eg.Go(func() error {
seqs, err := src.QueryUnreceivedPackets(srcLatestCtx, dstPackets.ExtractSequenceList())
dstCtx, err := getQueryContext(dst, sh, false)
if err != nil {
return err
return nil, err
}
dstPackets = dstPackets.Filter(seqs)
return nil
})

if err := eg.Wait(); err != nil {
return nil, err
eg.Go(func() error {
seqs, err := dst.QueryUnreceivedPackets(dstCtx, srcPackets.ExtractSequenceList())
if err != nil {
return err
}
srcPackets = srcPackets.Filter(seqs)
return nil
})

eg.Go(func() error {
seqs, err := src.QueryUnreceivedPackets(srcCtx, dstPackets.ExtractSequenceList())
if err != nil {
return err
}
dstPackets = dstPackets.Filter(seqs)
return nil
})

if err := eg.Wait(); err != nil {
return nil, err
}
}

return &RelayPackets{
Expand All @@ -111,7 +133,7 @@ func (st NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders
}, nil
}

func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets, sh SyncHeaders) error {
func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []sdk.Msg{},
Expand All @@ -131,7 +153,7 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets,
return err
}

if len(sp.Src) > 0 {
if len(rp.Src) > 0 {
hs, err := sh.SetupHeadersForUpdate(src, dst)
if err != nil {
return err
Expand All @@ -141,7 +163,7 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets,
}
}

if len(sp.Dst) > 0 {
if len(rp.Dst) > 0 {
hs, err := sh.SetupHeadersForUpdate(dst, src)
if err != nil {
return err
Expand All @@ -151,11 +173,11 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets,
}
}

packetsForDst, err := collectPackets(srcCtx, src, sp.Src, dstAddress)
packetsForDst, err := collectPackets(srcCtx, src, rp.Src, dstAddress)
if err != nil {
return err
}
packetsForSrc, err := collectPackets(dstCtx, dst, sp.Dst, srcAddress)
packetsForSrc, err := collectPackets(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
return err
}
Expand All @@ -182,24 +204,20 @@ func (st NaiveStrategy) RelayPackets(src, dst *ProvableChain, sp *RelayPackets,
return nil
}

func (st NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error) {
func (st NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) {
var (
eg = new(errgroup.Group)
srcAcks PacketInfoList
dstAcks PacketInfoList
)

srcCtx := sh.GetQueryContext(src.ChainID())
dstCtx := sh.GetQueryContext(dst.ChainID())

var srcCtxLatest, dstCtxLatest QueryContext
if srcHeight, err := src.LatestHeight(); err != nil {
srcCtx, err := getQueryContext(src, sh, true)
if err != nil {
return nil, err
} else if dstHeight, err := dst.LatestHeight(); err != nil {
}
dstCtx, err := getQueryContext(dst, sh, true)
if err != nil {
return nil, err
} else {
srcCtxLatest = NewQueryContext(context.TODO(), srcHeight)
dstCtxLatest = NewQueryContext(context.TODO(), dstHeight)
}

eg.Go(func() error {
Expand Down Expand Up @@ -228,26 +246,40 @@ func (st NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh Sy
return nil, err
}

eg.Go(func() error {
seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtxLatest, srcAcks.ExtractSequenceList())
// If includeRelayedButUnfinalized is true, this function should return packets of which AcknowledgePacket is not finalized yet.
// In this case, filtering packets by QueryUnreceivedAcknowledgements is not needed because QueryUnfinalizedRelayAcknowledgements
// has already returned packets that completely match this condition.
if !includeRelayedButUnfinalized {
srcCtx, err := getQueryContext(src, sh, false)
if err != nil {
return err
return nil, err
}
srcAcks = srcAcks.Filter(seqs)
return nil
})

eg.Go(func() error {
seqs, err := src.QueryUnreceivedAcknowledgements(srcCtxLatest, dstAcks.ExtractSequenceList())
dstCtx, err := getQueryContext(dst, sh, false)
if err != nil {
return err
return nil, err
}
dstAcks = dstAcks.Filter(seqs)
return nil
})

if err := eg.Wait(); err != nil {
return nil, err
eg.Go(func() error {
seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtx, srcAcks.ExtractSequenceList())
if err != nil {
return err
}
srcAcks = srcAcks.Filter(seqs)
return nil
})

eg.Go(func() error {
seqs, err := src.QueryUnreceivedAcknowledgements(srcCtx, dstAcks.ExtractSequenceList())
if err != nil {
return err
}
dstAcks = dstAcks.Filter(seqs)
return nil
})

if err := eg.Wait(); err != nil {
return nil, err
}
}

return &RelayPackets{
Expand Down Expand Up @@ -278,7 +310,7 @@ func logPacketsRelayed(src, dst Chain, num int) {
num, dst.ChainID(), dst.Path().PortID, src.ChainID(), src.Path().PortID)
}

func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *RelayPackets, sh SyncHeaders) error {
func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []sdk.Msg{},
Expand All @@ -298,7 +330,7 @@ func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *Relay
return err
}

if len(sp.Src) > 0 {
if len(rp.Src) > 0 {
hs, err := sh.SetupHeadersForUpdate(src, dst)
if err != nil {
return err
Expand All @@ -308,7 +340,7 @@ func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *Relay
}
}

if len(sp.Dst) > 0 {
if len(rp.Dst) > 0 {
hs, err := sh.SetupHeadersForUpdate(dst, src)
if err != nil {
return err
Expand All @@ -318,11 +350,11 @@ func (st NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, sp *Relay
}
}

acksForDst, err := collectAcks(srcCtx, src, sp.Src, dstAddress)
acksForDst, err := collectAcks(srcCtx, src, rp.Src, dstAddress)
if err != nil {
return err
}
acksForSrc, err := collectAcks(dstCtx, dst, sp.Dst, srcAddress)
acksForSrc, err := collectAcks(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (srv *RelayService) Serve(ctx context.Context) error {

// relay packets if unrelayed seqs exist

pseqs, err := srv.st.UnrelayedPackets(srv.src, srv.dst, srv.sh)
pseqs, err := srv.st.UnrelayedPackets(srv.src, srv.dst, srv.sh, false)
if err != nil {
return err
}
Expand All @@ -75,7 +75,7 @@ func (srv *RelayService) Serve(ctx context.Context) error {

// relay acks if unrelayed seqs exist

aseqs, err := srv.st.UnrelayedAcknowledgements(srv.src, srv.dst, srv.sh)
aseqs, err := srv.st.UnrelayedAcknowledgements(srv.src, srv.dst, srv.sh, false)
if err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions core/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,25 @@ import (

// StrategyI defines
type StrategyI interface {
// GetType returns the type of the strategy
GetType() string

// SetupRelay performs chain-specific setup for `src` and `dst` before starting the relay
SetupRelay(ctx context.Context, src, dst *ProvableChain) error
UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error)

// UnrelayedPackets returns packets to execute RecvPacket to on `src` and `dst`.
// `includeRelayedButUnfinalized` decides if the result includes packets of which recvPacket has been executed but not finalized
UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error)

// RelayPackets executes RecvPacket to the packets contained in `rp` on both chains (`src` and `dst`).
RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error
UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders) (*RelayPackets, error)
RelayAcknowledgements(src, dst *ProvableChain, ra *RelayPackets, sh SyncHeaders) error

// UnrelayedAcknowledgements returns packets to execute AcknowledgePacket to on `src` and `dst`.
// `includeRelayedButUnfinalized` decides if the result includes packets of which acknowledgePacket has been executed but not finalized
UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error)

// RelayAcknowledgements executes AcknowledgePacket to the packets contained in `rp` on both chains (`src` and `dst`).
RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) error
}

// StrategyCfg defines which relaying strategy to take for a given path
Expand Down

0 comments on commit 30449b9

Please sign in to comment.