Skip to content

Commit

Permalink
Update client.go
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Oct 2, 2024
1 parent fe2f291 commit 7bfb700
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"sync"
"time"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"golang.org/x/sync/singleflight"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)
Expand Down Expand Up @@ -158,8 +158,10 @@ func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, m

func (c *Client) LatestBlock(ctx context.Context) (*Head, error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
//ctx, cancel, chStopInFlight, _, _ := c.acquireQueryCtx(ctx, c.rpcTimeout)
result, err := c.rpc.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed)
ctx, cancel, chStopInFlight, rawRPC := c.acquireQueryCtx(ctx, c.contextDuration)
defer cancel()

result, err := rawRPC.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, err
}
Expand All @@ -168,16 +170,15 @@ func (c *Client) LatestBlock(ctx context.Context) (*Head, error) {
BlockHeight: &result.Value.LastValidBlockHeight,
BlockHash: &result.Value.Blockhash,
}
c.onNewHead(ctx, c.chStopInFlight, head)
c.onNewHead(ctx, chStopInFlight, head)
return head, nil
}

func (c *Client) LatestFinalizedBlock(ctx context.Context) (*Head, error) {
// TODO: Do we need this?
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
//ctx, cancel, chStopInFlight, _, _ := c.acquireQueryCtx(ctx, c.rpcTimeout)
ctx, cancel, chStopInFlight, rawRPC := c.acquireQueryCtx(ctx, c.contextDuration)
defer cancel()

result, err := c.rpc.GetLatestBlockhash(ctx, rpc.CommitmentFinalized)
result, err := rawRPC.GetLatestBlockhash(ctx, rpc.CommitmentFinalized)
if err != nil {
return nil, err
}
Expand All @@ -186,7 +187,7 @@ func (c *Client) LatestFinalizedBlock(ctx context.Context) (*Head, error) {
BlockHeight: &result.Value.LastValidBlockHeight,
BlockHash: &result.Value.Blockhash,
}
c.onNewFinalizedHead(ctx, c.chStopInFlight, head)
c.onNewFinalizedHead(ctx, chStopInFlight, head)
return head, nil
}

Expand Down Expand Up @@ -227,6 +228,33 @@ func (c *Client) onNewFinalizedHead(ctx context.Context, requestCh <-chan struct
}
}

// makeQueryCtx returns a context that cancels if:
// 1. Passed in ctx cancels
// 2. Passed in channel is closed
// 3. Default timeout is reached (queryTimeout)
func makeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) {
var chCancel, timeoutCancel context.CancelFunc
ctx, chCancel = ch.Ctx(ctx)
ctx, timeoutCancel = context.WithTimeout(ctx, timeout)
cancel := func() {
chCancel()
timeoutCancel()
}
return ctx, cancel
}

func (c *Client) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, raw *rpc.Client) {
// Need to wrap in mutex because state transition can cancel and replace context
c.stateMu.RLock()
chStopInFlight = c.chStopInFlight
cp := *c.rpc
raw = &cp
c.stateMu.RUnlock()
ctx, cancel = makeQueryCtx(parentCtx, chStopInFlight, timeout)
return
}

func (c *Client) Ping(ctx context.Context) error {
version, err := c.rpc.GetVersion(ctx)
if err != nil {
Expand Down

0 comments on commit 7bfb700

Please sign in to comment.