Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lndmon: cache closedChannels response #104

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions collectors/channels_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/lndclient"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/prometheus/client_golang/prometheus"
)

// Cache refresh interval magic number.
const cacheRefreshInterval = 10 * time.Minute

// ChannelsCollector is a collector that keeps track of channel information.
type ChannelsCollector struct {
channelBalanceDesc *prometheus.Desc
Expand Down Expand Up @@ -51,17 +56,25 @@ type ChannelsCollector struct {
// errChan is a channel that we send any errors that we encounter into.
// This channel should be buffered so that it does not block sends.
errChan chan<- error

// quit is a channel that we use to signal for graceful shutdown.
quit chan struct{}

// cache is for storing results from a ticker to reduce grpc server
// load on lnd.
closedChannelsCache []lndclient.ClosedChannel
cacheMutex sync.RWMutex
}

// NewChannelsCollector returns a new instance of the ChannelsCollector for the
// target lnd client.
func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error,
cfg *MonitoringConfig) *ChannelsCollector {
quitChan chan struct{}, cfg *MonitoringConfig) *ChannelsCollector {

// Our set of labels, status should either be active or inactive. The
// initiator is "true" if we are the initiator, and "false" otherwise.
labels := []string{"chan_id", "status", "initiator", "peer"}
return &ChannelsCollector{
collector := &ChannelsCollector{
channelBalanceDesc: prometheus.NewDesc(
"lnd_channels_open_balance_sat",
"total balance of channels in satoshis",
Expand Down Expand Up @@ -174,10 +187,49 @@ func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error,
[]string{"amount"}, nil,
),

lnd: lnd,
primaryNode: cfg.PrimaryNode,
errChan: errChan,
lnd: lnd,
primaryNode: cfg.PrimaryNode,
closedChannelsCache: nil,
errChan: errChan,
djkazic marked this conversation as resolved.
Show resolved Hide resolved
quit: quitChan,
}

// Start a ticker to update the cache once per 10m
go func() {
ticker := time.NewTicker(cacheRefreshInterval)
defer ticker.Stop()

for {
err := collector.refreshClosedChannelsCache()
if err != nil {
errChan <- err
}

select {
case <-ticker.C:
continue

case <-collector.quit:
djkazic marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}()

return collector
}

// refreshClosedChannelsCache acquires a mutex write lock to update
// the closedChannelsCache.
func (c *ChannelsCollector) refreshClosedChannelsCache() error {
data, err := c.lnd.ClosedChannels(context.Background())
if err != nil {
return err
}
c.cacheMutex.Lock()
c.closedChannelsCache = data
c.cacheMutex.Unlock()

return nil
}

// Describe sends the super-set of all possible descriptors of metrics
Expand Down Expand Up @@ -452,12 +504,9 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {
)

// Get the list of closed channels.
closedChannelsResp, err := c.lnd.ClosedChannels(context.Background())
if err != nil {
c.errChan <- fmt.Errorf("ChannelsCollector ClosedChannels "+
"failed with: %v", err)
return
}
c.cacheMutex.RLock()
closedChannelsResp := c.closedChannelsCache
c.cacheMutex.RUnlock()
closeCounts := make(map[string]int)
for _, channel := range closedChannelsResp {
typeString, ok := closeTypeLabelMap[channel.CloseType]
Expand Down
10 changes: 6 additions & 4 deletions collectors/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func DefaultConfig() *PrometheusConfig {
// NewPrometheusExporter makes a new instance of the PrometheusExporter given
// the address to listen for Prometheus on and an lnd gRPC client.
func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices,
monitoringCfg *MonitoringConfig) *PrometheusExporter {
monitoringCfg *MonitoringConfig, quitChan chan struct{}) *PrometheusExporter {

// We have six collectors and a htlc monitor running, so we buffer our
// error channel by 7 so that we do not need to consume all errors from
Expand All @@ -94,11 +94,13 @@ func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices,

htlcMonitor := newHtlcMonitor(lnd.Router, errChan)

chanCollector := NewChannelsCollector(
lnd.Client, errChan, quitChan, monitoringCfg,
)

collectors := []prometheus.Collector{
NewChainCollector(lnd.Client, errChan),
NewChannelsCollector(
lnd.Client, errChan, monitoringCfg,
),
chanCollector,
NewWalletCollector(lnd, errChan),
NewPeerCollector(lnd.Client, errChan),
NewInfoCollector(lnd.Client, errChan),
Expand Down
4 changes: 3 additions & 1 deletion lndmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func start() error {
return err
}

quit := make(chan struct{})
interceptor, err := signal.Intercept()
if err != nil {
return fmt.Errorf("could not intercept signal: %v", err)
Expand Down Expand Up @@ -72,7 +73,7 @@ func start() error {
// Start our Prometheus exporter. This exporter spawns a goroutine
// that pulls metrics from our lnd client on a set interval.
exporter := collectors.NewPrometheusExporter(
cfg.Prometheus, &lnd.LndServices, &monitoringCfg,
cfg.Prometheus, &lnd.LndServices, &monitoringCfg, quit,
)
if err := exporter.Start(); err != nil {
return err
Expand All @@ -83,6 +84,7 @@ func start() error {
var stopErr error
select {
case <-interceptor.ShutdownChannel():
close(quit)
fmt.Println("Exiting lndmon.")

case stopErr = <-exporter.Errors():
Expand Down
Loading