From 7bfb7000b525f1103970df160befd6d40aca0067 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 2 Oct 2024 12:50:17 -0400 Subject: [PATCH] Update client.go --- pkg/solana/client/client.go | 48 +++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index 32454a82c..7a5e0d04f 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -8,14 +8,14 @@ import ( "sync" "time" - mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" - "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "golang.org/x/sync/singleflight" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" ) @@ -158,8 +158,10 @@ func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, m func (c *Client) LatestBlock(ctx context.Context) (*Head, error) { // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle - //ctx, cancel, chStopInFlight, _, _ := c.acquireQueryCtx(ctx, c.rpcTimeout) - result, err := c.rpc.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed) + ctx, cancel, chStopInFlight, rawRPC := c.acquireQueryCtx(ctx, c.contextDuration) + defer cancel() + + result, err := rawRPC.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed) if err != nil { return nil, err } @@ -168,16 +170,15 @@ func (c *Client) LatestBlock(ctx context.Context) (*Head, error) { BlockHeight: &result.Value.LastValidBlockHeight, BlockHash: &result.Value.Blockhash, } - c.onNewHead(ctx, c.chStopInFlight, head) + c.onNewHead(ctx, chStopInFlight, head) return head, nil } func (c *Client) LatestFinalizedBlock(ctx context.Context) (*Head, error) { - // TODO: Do we need this? - // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle - //ctx, cancel, chStopInFlight, _, _ := c.acquireQueryCtx(ctx, c.rpcTimeout) + ctx, cancel, chStopInFlight, rawRPC := c.acquireQueryCtx(ctx, c.contextDuration) + defer cancel() - result, err := c.rpc.GetLatestBlockhash(ctx, rpc.CommitmentFinalized) + result, err := rawRPC.GetLatestBlockhash(ctx, rpc.CommitmentFinalized) if err != nil { return nil, err } @@ -186,7 +187,7 @@ func (c *Client) LatestFinalizedBlock(ctx context.Context) (*Head, error) { BlockHeight: &result.Value.LastValidBlockHeight, BlockHash: &result.Value.Blockhash, } - c.onNewFinalizedHead(ctx, c.chStopInFlight, head) + c.onNewFinalizedHead(ctx, chStopInFlight, head) return head, nil } @@ -227,6 +228,33 @@ func (c *Client) onNewFinalizedHead(ctx context.Context, requestCh <-chan struct } } +// makeQueryCtx returns a context that cancels if: +// 1. Passed in ctx cancels +// 2. Passed in channel is closed +// 3. Default timeout is reached (queryTimeout) +func makeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) { + var chCancel, timeoutCancel context.CancelFunc + ctx, chCancel = ch.Ctx(ctx) + ctx, timeoutCancel = context.WithTimeout(ctx, timeout) + cancel := func() { + chCancel() + timeoutCancel() + } + return ctx, cancel +} + +func (c *Client) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, + chStopInFlight chan struct{}, raw *rpc.Client) { + // Need to wrap in mutex because state transition can cancel and replace context + c.stateMu.RLock() + chStopInFlight = c.chStopInFlight + cp := *c.rpc + raw = &cp + c.stateMu.RUnlock() + ctx, cancel = makeQueryCtx(parentCtx, chStopInFlight, timeout) + return +} + func (c *Client) Ping(ctx context.Context) error { version, err := c.rpc.GetVersion(ctx) if err != nil {