Skip to content

Commit

Permalink
Not omit error in logger (#1096)
Browse files Browse the repository at this point in the history
Add explicit errors as arguments so anyone can handle them in the logger implementation.
  • Loading branch information
itechdima authored May 5, 2023
1 parent ceb95ed commit fcf85ac
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {

commit := func() {
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
} else {
offsets.reset()
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (r *Reader) run(cg *ConsumerGroup) {
}
r.stats.errors.observe(1)
r.withErrorLogger(func(l Logger) {
l.Printf(err.Error())
l.Printf("%v", err)
})
// Continue with next attempt...
}
Expand Down Expand Up @@ -1346,7 +1346,7 @@ func (r *reader) run(ctx context.Context, offset int64) {

case errors.Is(err, UnknownTopicOrPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err)
})

conn.Close()
Expand All @@ -1358,7 +1358,7 @@ func (r *reader) run(ctx context.Context, offset int64) {

case errors.Is(err, NotLeaderForPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset))
log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
})

conn.Close()
Expand All @@ -1372,7 +1372,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
// Timeout on the kafka side, this can be safely retried.
errcount = 0
r.withLogger(func(log Logger) {
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
})
r.stats.timeouts.observe(1)
continue
Expand Down

0 comments on commit fcf85ac

Please sign in to comment.