diff --git a/go.mod b/go.mod index da86bb21b0c1..6d1c95f1a07a 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.3 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 - github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 + github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc diff --git a/go.sum b/go.sum index 62b957a7ce01..23b284612fe1 100644 --- a/go.sum +++ b/go.sum @@ -1042,8 +1042,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= -github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 h1:a4O59OU3FJZ+EJUVnlvvNTvdAc4uRN1P6EaGwqL9CnA= -github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa h1:qn6vlh4ZPBgyMTGUoSS6LewOmZk4JJDeLIfU3o6mYHo= +github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/tools/lambda-promtail/go.mod b/tools/lambda-promtail/go.mod index 961038477cdf..294450eb1167 100644 --- a/tools/lambda-promtail/go.mod +++ b/tools/lambda-promtail/go.mod @@ -10,7 +10,7 @@ require ( github.com/go-kit/log v0.2.1 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 - github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf + github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa github.com/grafana/loki/v3 v3.0.0-20240809103847-9315b3d03d79 github.com/prometheus/common v0.55.0 github.com/stretchr/testify v1.9.0 diff --git a/tools/lambda-promtail/go.sum b/tools/lambda-promtail/go.sum index f7251a0d3370..0a360b51dc41 100644 --- a/tools/lambda-promtail/go.sum +++ b/tools/lambda-promtail/go.sum @@ -216,8 +216,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf h1:ZafqZwIpdCCMifH9Ok6C98rYaCh5OZeyyHLbU0FPedg= -github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa h1:qn6vlh4ZPBgyMTGUoSS6LewOmZk4JJDeLIfU3o6mYHo= +github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 h1:NznuPwItog+rwdVg8hAuGKP29ndRSzJAwhxKldkP8oQ= diff --git a/vendor/github.com/grafana/dskit/concurrency/worker.go b/vendor/github.com/grafana/dskit/concurrency/worker.go index f40f0334800b..10a59e60023e 100644 --- a/vendor/github.com/grafana/dskit/concurrency/worker.go +++ b/vendor/github.com/grafana/dskit/concurrency/worker.go @@ -5,12 +5,18 @@ package concurrency // If all workers are busy, Go() will spawn a new goroutine to run the workload. func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { p := &ReusableGoroutinesPool{ - jobs: make(chan func()), + jobs: make(chan func()), + closed: make(chan struct{}), } for i := 0; i < size; i++ { go func() { - for f := range p.jobs { - f() + for { + select { + case f := <-p.jobs: + f() + case <-p.closed: + return + } } }() } @@ -18,7 +24,8 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { } type ReusableGoroutinesPool struct { - jobs chan func() + jobs chan func() + closed chan struct{} } // Go will run the given function in a worker of the pool. @@ -32,7 +39,9 @@ func (p *ReusableGoroutinesPool) Go(f func()) { } // Close stops the workers of the pool. -// No new Do() calls should be performed after calling Close(). +// No new Go() calls should be performed after calling Close(). // Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. // Close is intended to be used in tests to ensure that no goroutines are leaked. -func (p *ReusableGoroutinesPool) Close() { close(p.jobs) } +func (p *ReusableGoroutinesPool) Close() { + close(p.closed) +} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 1d96363fe3fa..452798e04e7f 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -137,6 +137,7 @@ type KVConfig struct { GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` EnableCompression bool `yaml:"compression_enabled" category:"advanced"` + NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") + f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") @@ -251,6 +253,10 @@ type KV struct { watchers map[string][]chan string prefixWatchers map[string][]chan string + // Delayed notifications for watchers + notifMu sync.Mutex + keyNotifications map[string]struct{} + // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - workersChannels: make(map[string]chan valueUpdate), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + keyNotifications: make(map[string]struct{}), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error { return errFailedToJoinCluster } + if m.cfg.NotifyInterval > 0 { + // Start delayed key notifications. + notifTicker := time.NewTicker(m.cfg.NotifyInterval) + defer notifTicker.Stop() + go m.monitorKeyNotifications(ctx, notifTicker.C) + } + var tickerChan <-chan time.Time if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 { t := time.NewTicker(m.cfg.RejoinInterval) @@ -905,7 +919,59 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st } } +// notifyWatchers sends notification to all watchers of given key. If delay is +// enabled, it accumulates them for later sending. func (m *KV) notifyWatchers(key string) { + if m.cfg.NotifyInterval <= 0 { + m.notifyWatchersSync(key) + return + } + + m.notifMu.Lock() + defer m.notifMu.Unlock() + m.keyNotifications[key] = struct{}{} +} + +// monitorKeyNotifications sends accumulated notifications to all watchers of +// respective keys when the given channel ticks. +func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { + if m.cfg.NotifyInterval <= 0 { + panic("sendNotifications called with NotifyInterval <= 0") + } + + for { + select { + case <-tickChan: + m.sendKeyNotifications() + case <-ctx.Done(): + return + } + } +} + +// sendKeyNotifications sends accumulated notifications to watchers of respective keys. +func (m *KV) sendKeyNotifications() { + newNotifs := func() map[string]struct{} { + // Grab and clear accumulated notifications. + m.notifMu.Lock() + defer m.notifMu.Unlock() + + if len(m.keyNotifications) == 0 { + return nil + } + newMap := make(map[string]struct{}) + notifs := m.keyNotifications + m.keyNotifications = newMap + return notifs + } + + for key := range newNotifs() { + m.notifyWatchersSync(key) + } +} + +// notifyWatcherSync immediately sends notification to all watchers of given key. +func (m *KV) notifyWatchersSync(key string) { m.watchersMu.Lock() defer m.watchersMu.Unlock() diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index 751ad1163a91..241d25b71740 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" @@ -52,7 +51,13 @@ type TCPTransportConfig struct { // Timeout for writing packet data. Zero = no timeout. PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` - // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on + // Maximum number of concurrent writes to other nodes. + MaxConcurrentWrites int `yaml:"max_concurrent_writes" category:"advanced"` + + // Timeout for acquiring one of the concurrent write slots. + AcquireWriterTimeout time.Duration `yaml:"acquire_writer_timeout" category:"advanced"` + + // Transport logs lots of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-" category:"advanced"` // Where to put custom metrics. nil = don't register. @@ -73,12 +78,19 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.") f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 2*time.Second, "Timeout used when connecting to other nodes to send packet.") f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.") + f.IntVar(&cfg.MaxConcurrentWrites, prefix+"memberlist.max-concurrent-writes", 3, "Maximum number of concurrent writes to other nodes.") + f.DurationVar(&cfg.AcquireWriterTimeout, prefix+"memberlist.acquire-writer-timeout", 250*time.Millisecond, "Timeout for acquiring one of the concurrent write slots. After this time, the message will be dropped.") f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.") f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.") cfg.TLS.RegisterFlagsWithPrefix(prefix+"memberlist", f) } +type writeRequest struct { + b []byte + addr string +} + // TCPTransport is a memberlist.Transport implementation that uses TCP for both packet and stream // operations ("packet" and "stream" are terms used by memberlist). // It uses a new TCP connections for each operation. There is no connection reuse. @@ -91,7 +103,11 @@ type TCPTransport struct { tcpListeners []net.Listener tlsConfig *tls.Config - shutdown atomic.Int32 + shutdownMu sync.RWMutex + shutdown bool + writeCh chan writeRequest // this channel is protected by shutdownMu + + writeWG sync.WaitGroup advertiseMu sync.RWMutex advertiseAddr string @@ -107,6 +123,7 @@ type TCPTransport struct { sentPackets prometheus.Counter sentPacketsBytes prometheus.Counter sentPacketsErrors prometheus.Counter + droppedPackets prometheus.Counter unknownConnections prometheus.Counter } @@ -119,11 +136,21 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr // Build out the new transport. var ok bool + concurrentWrites := config.MaxConcurrentWrites + if concurrentWrites <= 0 { + concurrentWrites = 1 + } t := TCPTransport{ cfg: config, logger: log.With(logger, "component", "memberlist TCPTransport"), packetCh: make(chan *memberlist.Packet), connCh: make(chan net.Conn), + writeCh: make(chan writeRequest), + } + + for i := 0; i < concurrentWrites; i++ { + t.writeWG.Add(1) + go t.writeWorker() } var err error @@ -205,7 +232,10 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) { for { conn, err := tcpLn.Accept() if err != nil { - if s := t.shutdown.Load(); s == 1 { + t.shutdownMu.RLock() + isShuttingDown := t.shutdown + t.shutdownMu.RUnlock() + if isShuttingDown { break } @@ -424,29 +454,50 @@ func (t *TCPTransport) getAdvertisedAddr() string { // WriteTo is a packet-oriented interface that fires off the given // payload to the given address. func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { - t.sentPackets.Inc() - t.sentPacketsBytes.Add(float64(len(b))) - - err := t.writeTo(b, addr) - if err != nil { - t.sentPacketsErrors.Inc() - - logLevel := level.Warn(t.logger) - if strings.Contains(err.Error(), "connection refused") { - // The connection refused is a common error that could happen during normal operations when a node - // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. - logLevel = t.debugLog() - } - logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) + t.shutdownMu.RLock() + defer t.shutdownMu.RUnlock() // Unlock at the end to protect the chan + if t.shutdown { + return time.Time{}, errors.New("transport is shutting down") + } + // Send the packet to the write workers + // If this blocks for too long (as configured), abort and log an error. + select { + case <-time.After(t.cfg.AcquireWriterTimeout): + // Dropped packets are not an issue, the memberlist protocol will retry later. + level.Debug(t.logger).Log("msg", "WriteTo failed to acquire a writer. Dropping message", "timeout", t.cfg.AcquireWriterTimeout, "addr", addr) + t.droppedPackets.Inc() // WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors, // but memberlist library doesn't seem to cope with that very well. That is why we return nil instead. return time.Now(), nil + case t.writeCh <- writeRequest{b: b, addr: addr}: + // OK } return time.Now(), nil } +func (t *TCPTransport) writeWorker() { + defer t.writeWG.Done() + for req := range t.writeCh { + b, addr := req.b, req.addr + t.sentPackets.Inc() + t.sentPacketsBytes.Add(float64(len(b))) + err := t.writeTo(b, addr) + if err != nil { + t.sentPacketsErrors.Inc() + + logLevel := level.Warn(t.logger) + if strings.Contains(err.Error(), "connection refused") { + // The connection refused is a common error that could happen during normal operations when a node + // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. + logLevel = t.debugLog() + } + logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) + } + } +} + func (t *TCPTransport) writeTo(b []byte, addr string) error { // Open connection, write packet header and data, data hash, close. Simple. c, err := t.getConnection(addr, t.cfg.PacketDialTimeout) @@ -559,17 +610,31 @@ func (t *TCPTransport) StreamCh() <-chan net.Conn { // Shutdown is called when memberlist is shutting down; this gives the // transport a chance to clean up any listeners. +// This will avoid log spam about errors when we shut down. func (t *TCPTransport) Shutdown() error { + t.shutdownMu.Lock() // This will avoid log spam about errors when we shut down. - t.shutdown.Store(1) + if t.shutdown { + t.shutdownMu.Unlock() + return nil // already shut down + } + + // Set the shutdown flag and close the write channel. + t.shutdown = true + close(t.writeCh) + t.shutdownMu.Unlock() // Rip through all the connections and shut them down. for _, conn := range t.tcpListeners { _ = conn.Close() } + // Wait until all write workers have finished. + t.writeWG.Wait() + // Block until all the listener threads have died. t.wg.Wait() + return nil } @@ -618,6 +683,13 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) { Help: "Number of errors when receiving memberlist packets", }) + t.droppedPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: t.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: "packets_dropped_total", + Help: "Number of dropped memberlist packets. These packets were not sent due to timeout waiting for a writer.", + }) + t.sentPackets = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: t.cfg.MetricsNamespace, Subsystem: subsystem, diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index c8db7da50c61..d47eb8fe256e 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -215,13 +215,13 @@ type Ring struct { // Number of registered instances per zone. instancesCountPerZone map[string]int - // Nubmber of registered instances with tokens per zone. + // Number of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int // Number of registered instances are writable and have tokens. writableInstancesWithTokensCount int - // Nubmber of registered instances with tokens per zone that are writable. + // Number of registered instances with tokens per zone that are writable. writableInstancesWithTokensCountPerZone map[string]int // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 8daad995c95c..f32bce6f6bc0 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -1,8 +1,13 @@ +// Provenance-includes-location: https://github.com/go-kit/log/blob/main/value.go +// Provenance-includes-license: MIT +// Provenance-includes-copyright: Go kit + package spanlogger import ( "context" "runtime" + "strconv" "strings" "go.uber.org/atomic" // Really just need sync/atomic but there is a lint rule preventing it. @@ -163,9 +168,6 @@ func (s *SpanLogger) getLogger() log.Logger { logger = log.With(logger, "trace_id", traceID) } - // Replace the default valuer for the 'caller' attribute with one that gets the caller of the methods in this file. - logger = log.With(logger, "caller", spanLoggerAwareCaller()) - // If the value has been set by another goroutine, fetch that other value and discard the one we made. if !s.logger.CompareAndSwap(nil, &logger) { pLogger := s.logger.Load() @@ -188,46 +190,64 @@ func (s *SpanLogger) SetSpanAndLogTag(key string, value interface{}) { s.logger.Store(&wrappedLogger) } -// spanLoggerAwareCaller is like log.Caller, but ensures that the caller information is -// that of the caller to SpanLogger, not SpanLogger itself. -func spanLoggerAwareCaller() log.Valuer { - valuer := atomic.NewPointer[log.Valuer](nil) - +// Caller is like github.com/go-kit/log's Caller, but ensures that the caller information is +// that of the caller to SpanLogger (if SpanLogger is being used), not SpanLogger itself. +// +// defaultStackDepth should be the number of stack frames to skip by default, as would be +// passed to github.com/go-kit/log's Caller method. +func Caller(defaultStackDepth int) log.Valuer { return func() interface{} { - // If we've already determined the correct stack depth, use it. - existingValuer := valuer.Load() - if existingValuer != nil { - return (*existingValuer)() - } - - // We haven't been called before, determine the correct stack depth to - // skip the configured logger's internals and the SpanLogger's internals too. - // - // Note that we can't do this in spanLoggerAwareCaller() directly because we - // need to do this when invoked by the configured logger - otherwise we cannot - // measure the stack depth of the logger's internals. - - stackDepth := 3 // log.DefaultCaller uses a stack depth of 3, so start searching for the correct stack depth there. + stackDepth := defaultStackDepth + 1 // +1 to account for this method. + seenSpanLogger := false + pc := make([]uintptr, 1) for { - _, file, _, ok := runtime.Caller(stackDepth) + function, file, line, ok := caller(stackDepth, pc) if !ok { // We've run out of possible stack frames. Give up. - valuer.Store(&unknownCaller) - return unknownCaller() + return "" } - if strings.HasSuffix(file, "spanlogger/spanlogger.go") { - stackValuer := log.Caller(stackDepth + 2) // Add one to skip the stack frame for the SpanLogger method, and another to skip the stack frame for the valuer which we'll invoke below. - valuer.Store(&stackValuer) - return stackValuer() + // If we're in a SpanLogger method, we need to continue searching. + // + // Matching on the exact function name like this does mean this will break if we rename or refactor SpanLogger, but + // the tests should catch this. In the worst case scenario, we'll log incorrect caller information, which isn't the + // end of the world. + if function == "github.com/grafana/dskit/spanlogger.(*SpanLogger).Log" || function == "github.com/grafana/dskit/spanlogger.(*SpanLogger).DebugLog" { + seenSpanLogger = true + stackDepth++ + continue } - stackDepth++ + // We need to check for go-kit/log stack frames like this because using log.With, log.WithPrefix or log.WithSuffix + // (including the various level methods like level.Debug, level.Info etc.) to wrap a SpanLogger introduce an + // additional context.Log stack frame that calls into the SpanLogger. This is because the use of SpanLogger + // as the logger means the optimisation to avoid creating a new logger in + // https://github.com/go-kit/log/blob/c7bf81493e581feca11e11a7672b14be3591ca43/log.go#L141-L146 used by those methods + // can't be used, and so the SpanLogger is wrapped in a new logger. + if seenSpanLogger && function == "github.com/go-kit/log.(*context).Log" { + stackDepth++ + continue + } + + return formatCallerInfoForLog(file, line) } } } -var unknownCaller log.Valuer = func() interface{} { - return "" +// caller is like runtime.Caller, but modified to allow reuse of the uintptr slice and return the function name. +func caller(stackDepth int, pc []uintptr) (function string, file string, line int, ok bool) { + n := runtime.Callers(stackDepth+1, pc) + if n < 1 { + return "", "", 0, false + } + + frame, _ := runtime.CallersFrames(pc).Next() + return frame.Function, frame.File, frame.Line, frame.PC != 0 +} + +// This is based on github.com/go-kit/log's Caller, but modified for use by Caller above. +func formatCallerInfoForLog(file string, line int) string { + idx := strings.LastIndexByte(file, '/') + return file[idx+1:] + ":" + strconv.Itoa(line) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 8e8e074487f9..792c2ee4062b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -980,7 +980,7 @@ github.com/gorilla/websocket # github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 ## explicit; go 1.17 github.com/grafana/cloudflare-go -# github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 +# github.com/grafana/dskit v0.0.0-20241015200741-21f60cf427aa ## explicit; go 1.21 github.com/grafana/dskit/aws github.com/grafana/dskit/backoff