Skip to content

Commit

Permalink
Improve handling the CrowdSec logrus logs with zap
Browse files Browse the repository at this point in the history
The CrowdSec `go-cs-bouncer` uses `logrus` as the logging package.
We're now using a hook to capture all `logrus` log entries, and turn
them into `zap` log entries. The original `logrus` output is set
to be discarded.
  • Loading branch information
hslatman committed Aug 18, 2023
1 parent 272820b commit dc19d1c
Showing 1 changed file with 72 additions and 33 deletions.
105 changes: 72 additions & 33 deletions internal/bouncer/bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bouncer

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/sirupsen/logrus"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Bouncer is a custom CrowdSec bouncer backed by an immutable radix tree
Expand All @@ -37,24 +39,64 @@ type Bouncer struct {
shouldFailHard bool
}

type logrusErrorHook struct {
fn func(entry *logrus.Entry) error
type zapAdapterHook struct {
logger *zap.Logger
shouldFailHard bool
address string
}

func (lh *logrusErrorHook) Levels() []logrus.Level {
return []logrus.Level{logrus.ErrorLevel, logrus.FatalLevel, logrus.PanicLevel}
func (zh *zapAdapterHook) Levels() []logrus.Level {
return logrus.AllLevels
}

func (lh *logrusErrorHook) Fire(entry *logrus.Entry) error {
if lh.fn == nil {
func (zh *zapAdapterHook) Fire(entry *logrus.Entry) error {
if zh == nil || zh.logger == nil {
return nil
}
return lh.fn(entry)

if entry == nil {
return nil
}

// TODO: extract details from entry.Data? But doesn't seem to be used by CrowdSec today.

msg := entry.Message
fields := []zapcore.Field{zap.String("address", zh.address)}
switch {
case entry.Level <= logrus.ErrorLevel: // error, fatal, panic
fields = append(fields, zap.Error(errors.New(msg)))
if zh.shouldFailHard {
// TODO: if we keep this Fatal and the "shouldFailhard" around, ensure we
// shut the bouncer down nicely
zh.logger.Fatal(msg, fields...)
} else {
zh.logger.Error(msg, fields...)
}
default:
level := zapcore.DebugLevel
if l, ok := levelAdapter[entry.Level]; ok {
level = l
}
zh.logger.Log(level, msg, fields...)
}

return nil
}

var _ logrus.Hook = (*logrusErrorHook)(nil)
var levelAdapter = map[logrus.Level]zapcore.Level{
logrus.TraceLevel: zapcore.DebugLevel, // no trace level in zap
logrus.DebugLevel: zapcore.DebugLevel,
logrus.InfoLevel: zapcore.InfoLevel,
logrus.WarnLevel: zapcore.WarnLevel,
logrus.ErrorLevel: zapcore.ErrorLevel,
logrus.FatalLevel: zapcore.FatalLevel,
logrus.PanicLevel: zapcore.PanicLevel,
}

var _ logrus.Hook = (*zapAdapterHook)(nil)

// New creates a new (streaming) Bouncer with a storage based on immutable radix tree
// TODO: take a configuration struct instead, because more options will be added.
func New(apiKey, apiURL, tickerInterval string, logger *zap.Logger) (*Bouncer, error) {
userAgent := "caddy-cs-bouncer/v0.3.2"
insecureSkipVerify := false
Expand Down Expand Up @@ -96,24 +138,11 @@ func (b *Bouncer) Init() error {
logrus.SetOutput(io.Discard)

// catch error log entries and log them using the *zap.Logger instead
errorHook := &logrusErrorHook{
fn: func(entry *logrus.Entry) error {
// TODO: extract from entry.Data? But doesn't seem to be used by CrowdSec today.
if entry == nil {
return nil
}
msg := entry.Message
if b.shouldFailHard {
b.logger.Fatal(msg, zap.String("address", b.streamingBouncer.APIUrl))
} else {
b.logger.Error(msg, zap.String("address", b.streamingBouncer.APIUrl))
}
return nil
},
}
logrus.AddHook(errorHook)

// TODO: catch other logrus levels too?
logrus.AddHook(&zapAdapterHook{
logger: b.logger,
shouldFailHard: b.shouldFailHard,
address: b.streamingBouncer.APIUrl,
})

// initialize the CrowdSec streaming bouncer
return b.streamingBouncer.Init()
Expand All @@ -130,6 +159,10 @@ func (b *Bouncer) Run() {
return
}

// TODO: wait with processing until we know we're successfully connected to
// the CrowdSec API? The bouncer/client doesn't seem to give us that information
// directly, but we could use the heartbeat service before starting to run?
// That can also be useful for testing the LiveBouncer at startup.
go func() {
b.logger.Info("start processing new and deleted decisions")
for decisions := range b.streamingBouncer.Stream {
Expand All @@ -140,20 +173,22 @@ func (b *Bouncer) Run() {
// TODO: process in separate goroutines/waitgroup?
for _, decision := range decisions.Deleted {
if err := b.delete(decision); err != nil {
b.logger.Error(fmt.Sprintf("unable to delete decision for '%s': %s", *decision.Value, err))
b.logger.Error(fmt.Sprintf("unable to delete decision for %q: %s", *decision.Value, err))
} else {
b.logger.Debug(fmt.Sprintf("deleted '%s' (scope: %s)", *decision.Value, *decision.Scope))
b.logger.Debug(fmt.Sprintf("deleted %q (scope: %s)", *decision.Value, *decision.Scope))
}
}
if len(decisions.New) > 0 {
b.logger.Debug(fmt.Sprintf("processing %d new decisions", len(decisions.New)))
}
// TODO: process in separate goroutines/waitgroup?
// TODO: don't log all additions separately when there's a large number "X" of them to not
// clutter the logs
for _, decision := range decisions.New {
if err := b.add(decision); err != nil {
b.logger.Error(fmt.Sprintf("unable to insert decision for '%s': %s", *decision.Value, err))
b.logger.Error(fmt.Sprintf("unable to insert decision for %q: %s", *decision.Value, err))
} else {
b.logger.Debug(fmt.Sprintf("adding '%s' (scope: %s) for '%s'", *decision.Value, *decision.Scope, *decision.Duration))
b.logger.Debug(fmt.Sprintf("adding %q (scope: %s) for %q", *decision.Value, *decision.Scope, *decision.Duration))
}
}
}
Expand All @@ -165,6 +200,7 @@ func (b *Bouncer) Run() {
// ShutDown stops the Bouncer
func (b *Bouncer) ShutDown() error {
// TODO: persist the current state of the radix tree in some way, so that it can be used in startup again?
// TODO: clean shutdown of the streaming bouncer channel reading
b.store = nil
return nil
}
Expand Down Expand Up @@ -216,10 +252,14 @@ func (b *Bouncer) retrieveDecision(ip net.IP) (*models.Decision, error) {

decision, err := b.liveBouncer.Get(ip.String())
if err != nil {
fields := []zapcore.Field{
zap.String("address", b.streamingBouncer.APIUrl),
zap.Error(err),
}
if b.shouldFailHard {
b.logger.Fatal(err.Error())
b.logger.Fatal(err.Error(), fields...)
} else {
b.logger.Error(err.Error())
b.logger.Error(err.Error(), fields...)
}
return nil, nil // when not failing hard, we return no error
}
Expand All @@ -229,5 +269,4 @@ func (b *Bouncer) retrieveDecision(ip net.IP) (*models.Decision, error) {
}

return nil, nil

}

0 comments on commit dc19d1c

Please sign in to comment.