diff --git a/pkg/pubsub/kafka/partitionconsumer.go b/pkg/pubsub/kafka/partitionconsumer.go index 824f66c..cae8f68 100644 --- a/pkg/pubsub/kafka/partitionconsumer.go +++ b/pkg/pubsub/kafka/partitionconsumer.go @@ -2,7 +2,9 @@ package kafka import ( "context" + "time" + "github.com/getsentry/sentry-go" "github.com/twmb/franz-go/pkg/kgo" sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka" @@ -18,6 +20,14 @@ type pconsumer struct { } func (pc *pconsumer) consume(cl *kgo.Client, logger sdklogger.Logger, shouldCommit bool, handler func(*kgo.Record)) { + defer func() { + if rec := recover(); rec != nil { + sentry.CurrentHub().Recover(rec) + sentry.Flush(time.Second * 5) + logger.Fatalf("kafka consumer: panic error: %v", rec) + } + }() + defer close(pc.done) for {