Skip to content

Commit

Permalink
Consolidate tx broadcast methods (#1117)
Browse files Browse the repository at this point in the history
* Consolidate tx broadcast methods into one

* check config var

* catch sdk errors after broadcast

* Update comment
  • Loading branch information
agouin authored Mar 10, 2023
1 parent 8a5a9a0 commit 1270279
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 238 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/cosmos/relayer/v2
go 1.19

require (
cosmossdk.io/api v0.3.1
cosmossdk.io/errors v1.0.0-beta.7
cosmossdk.io/math v1.0.0-beta.6.0.20230216172121-959ce49135e4
github.com/avast/retry-go/v4 v4.3.2
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.12.0 // indirect
cloud.google.com/go/storage v1.29.0 // indirect
cosmossdk.io/api v0.3.1 // indirect
cosmossdk.io/core v0.5.1 // indirect
cosmossdk.io/depinject v1.0.0-alpha.3 // indirect
cosmossdk.io/tools/rosetta v0.2.1 // indirect
Expand Down
118 changes: 0 additions & 118 deletions relayer/chains/cosmos/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
package cosmos

import (
"context"
"errors"
"fmt"
"time"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

const (
Expand All @@ -21,115 +12,6 @@ type _err string

func (e _err) Error() string { return string(e) }

type rpcTxBroadcaster interface {
Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error)
BroadcastTxSync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error)

// TODO: implement commit and async as well
// BroadcastTxCommit(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTxCommit, error)
// BroadcastTxAsync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error)
}

func (cc *CosmosProvider) BroadcastTx(ctx context.Context, tx []byte) (*sdk.TxResponse, error) {
var (
blockTimeout = defaultBroadcastWaitTimeout
err error
)

if cc.PCfg.BlockTimeout != "" {
blockTimeout, err = time.ParseDuration(cc.PCfg.BlockTimeout)
if err != nil {
// Did you call Validate() method on ChainClientConfig struct
// before coming here?
return nil, err
}
}

return broadcastTx(
ctx,
cc.RPCClient,
cc.Codec.TxConfig.TxDecoder(),
tx,
blockTimeout,
)
}

// broadcastTx broadcasts a TX and then waits for the TX to be included in the block.
// The waiting will either be canceled after the waitTimeout has run out or the context
// exited.
func broadcastTx(
ctx context.Context,
broadcaster rpcTxBroadcaster,
txDecoder sdk.TxDecoder,
tx []byte,
waitTimeout time.Duration,
) (*sdk.TxResponse, error) {
// broadcast tx sync waits for check tx to pass
// NOTE: this can return w/ a timeout
// need to investigate if this will leave the tx
// in the mempool or we can retry the broadcast at that
// point

syncRes, err := broadcaster.BroadcastTxSync(ctx, tx)
if err != nil {
if syncRes == nil {
// There are some cases where BroadcastTxSync will return an error but the associated
// ResultBroadcastTx will be nil.
return nil, err
}
return &sdk.TxResponse{
Code: syncRes.Code,
Codespace: syncRes.Codespace,
TxHash: syncRes.Hash.String(),
}, err
}

// ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace
// This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go
err = errors.Unwrap(sdkerrors.ABCIError(syncRes.Codespace, syncRes.Code, "error broadcasting transaction"))
if err.Error() != errUnknown {
return nil, err
}

// TODO: maybe we need to check if the node has tx indexing enabled?
// if not, we need to find a new way to block until inclusion in a block

// wait for tx to be included in a block
exitAfter := time.After(waitTimeout)
for {
select {
case <-exitAfter:
return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, ErrTimeoutAfterWaitingForTxBroadcast)
// TODO: this is potentially less than optimal and may
// be better as something configurable
case <-time.After(time.Millisecond * 100):
resTx, err := broadcaster.Tx(ctx, syncRes.Hash, false)
if err == nil {
return mkTxResult(txDecoder, resTx)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

func mkTxResult(txDecoder sdk.TxDecoder, resTx *ctypes.ResultTx) (*sdk.TxResponse, error) {
txb, err := txDecoder(resTx.Tx)
if err != nil {
return nil, err
}
p, ok := txb.(intoAny)
if !ok {
return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txb)
}
any := p.AsAny()
// TODO: maybe don't make up the time here?
// we can fetch the block for the block time buts thats
// more round trips
// TODO: logs get rendered as base64 encoded, need to fix this somehow
return sdk.NewResponseResultTx(resTx, any, time.Now().Format(time.RFC3339)), nil
}

// Deprecated: this interface is used only internally for scenario we are
// deprecating (StdTxConfig support)
type intoAny interface {
Expand Down
46 changes: 43 additions & 3 deletions relayer/chains/cosmos/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"fmt"
"reflect"
"strconv"
"sync"
"time"

abci "github.com/cometbft/cometbft/abci/types"
gogogrpc "github.com/cosmos/gogoproto/grpc"
"github.com/cosmos/relayer/v2/relayer/provider"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
Expand All @@ -16,6 +19,7 @@ import (
"google.golang.org/grpc/status"

"github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
Expand Down Expand Up @@ -148,13 +152,49 @@ func (cc *CosmosProvider) TxServiceBroadcast(ctx context.Context, req *tx.Broadc
return nil, status.Error(codes.InvalidArgument, "invalid empty tx")
}

resp, err := cc.BroadcastTx(ctx, req.TxBytes)
if err != nil {
var (
blockTimeout = defaultBroadcastWaitTimeout
err error
rlyResp *provider.RelayerTxResponse
callbackErr error
wg sync.WaitGroup
)

if cc.PCfg.BlockTimeout != "" {
blockTimeout, err = time.ParseDuration(cc.PCfg.BlockTimeout)
if err != nil {
// Did you call Validate() method on CosmosProviderConfig struct
// before coming here?
return nil, err
}
}

callback := func(rtr *provider.RelayerTxResponse, err error) {
rlyResp = rtr
callbackErr = err
wg.Done()
}

wg.Add(1)

if err := cc.broadcastTx(ctx, req.TxBytes, nil, nil, ctx, blockTimeout, callback); err != nil {
return nil, err
}

wg.Wait()

if callbackErr != nil {
return nil, callbackErr
}

return &tx.BroadcastTxResponse{
TxResponse: resp,
TxResponse: &sdk.TxResponse{
Height: rlyResp.Height,
TxHash: rlyResp.TxHash,
Codespace: rlyResp.Codespace,
Code: rlyResp.Code,
Data: rlyResp.Data,
},
}, nil
}

Expand Down
Loading

0 comments on commit 1270279

Please sign in to comment.