Skip to content

Commit

Permalink
monitor: add StreamingDisabled option to force disable streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Oct 6, 2024
1 parent a0af007 commit 2923bdc
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ var DefaultOptions = Options{
Logger: logger.NewLogger(logger.LogLevel_WARN),
PollingInterval: 1500 * time.Millisecond,
ExpectedBlockInterval: 15 * time.Second,
StreamingErrorResetInterval: 15 * time.Minute,
StreamingRetryAfter: 20 * time.Minute,
StreamingErrorResetInterval: 10 * time.Minute,
StreamingRetryAfter: 15 * time.Minute,
StreamingErrNumToSwitchToPolling: 3,
StreamingDisabled: false,
UnsubscribeOnStop: false,
Timeout: 20 * time.Second,
StartBlockNumber: nil, // latest
Expand Down Expand Up @@ -63,6 +64,9 @@ type Options struct {
// StreamingErrNumToSwitchToPolling is the number of errors before switching to polling
StreamingErrNumToSwitchToPolling int

// StreamingDisabled flag to force disable streaming even if the provider supports it
StreamingDisabled bool

// Auto-unsubscribe on monitor stop or error
UnsubscribeOnStop bool

Expand Down Expand Up @@ -304,21 +308,37 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
nextBlock := make(chan uint64)

go func() {
var streamingErrorCount int
var streamingErrorLastTime time.Time
var streamingErrCount int
var streamingErrLastTime time.Time

reconnect:
// reset the latest head block
latestHeadBlock.Store(0)

// if we have too many streaming errors, we'll switch to polling
streamingErrorCount++
if time.Since(streamingErrorLastTime) > m.options.StreamingErrorResetInterval {
streamingErrorCount = 0
streamingErrCount++
if time.Since(streamingErrLastTime) > m.options.StreamingErrorResetInterval {
streamingErrCount = 0
}

// TODO: even if streaming is enabled, and its running, we still need to add a
// "streamHealthCheck" that checks if the stream is still running, as perhaps the
// upstream service has a problem (which happens).
//
// The way to check this, is every 10 seconds we ask the node for the latest block
// and if the latest block from the stream is different from the latest block from the
// node, then we switch to polling mode. And after the reset interval we will try back
// again. We also need to call alerter.Alert() to notify the user that the stream is down.
//
// TODO: maybe we should add to RawInterface() to "inform" / "notify" that the provider
// is producing errors.. ie. for the Node or WS provider.. and so, we can tell the upstream
// that the provider is behaving issues... ReportFault(err), ReportFaultWS(err)
//
// NOTE, finally, the node-gateway does a similar check already, but we could do it in
// the monitor directly too.

// listen for new heads either via streaming or polling
if m.provider.IsStreamingEnabled() && streamingErrorCount < m.options.StreamingErrNumToSwitchToPolling {
if !m.options.StreamingDisabled && m.provider.IsStreamingEnabled() && streamingErrCount < m.options.StreamingErrNumToSwitchToPolling {
// Streaming mode if available, where we listen for new heads
// and push the new block number to the nextBlock channel.
m.log.Info("ethmonitor: starting stream head listener")
Expand All @@ -329,8 +349,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
m.log.Warnf("ethmonitor (chain %s): websocket connect failed: %v", m.chainID.String(), err)
m.alert.Alert(context.Background(), "ethmonitor (chain %s): websocket connect failed: %v", m.chainID.String(), err)
time.Sleep(2000 * time.Millisecond)

streamingErrorLastTime = time.Now()
streamingErrLastTime = time.Now()
goto reconnect
}

Expand All @@ -351,7 +370,9 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
m.alert.Alert(context.Background(), "ethmonitor (chain %s): websocket subscription error: %v", m.chainID.String(), err)
sub.Unsubscribe()

streamingErrorLastTime = time.Now()
// TODO: call provider.ReportFaultWS(err)

streamingErrLastTime = time.Now()
blockTimer.Stop()
goto reconnect

Expand All @@ -360,7 +381,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..")
sub.Unsubscribe()

streamingErrorLastTime = time.Now()
streamingErrLastTime = time.Now()
goto reconnect

case newHead := <-newHeads:
Expand All @@ -386,7 +407,7 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
case <-retryStreamingTimer.C:
// retry streaming
m.log.Info("ethmonitor: retrying streaming...")
streamingErrorLastTime = time.Now().Add(-m.options.StreamingErrorResetInterval * 2)
streamingErrLastTime = time.Now().Add(-m.options.StreamingErrorResetInterval * 2)
goto reconnect
default:
// non-blocking
Expand Down

0 comments on commit 2923bdc

Please sign in to comment.