diff --git a/.golangci.yml b/.golangci.yml index 2780090..db2cd9a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -33,7 +33,7 @@ run: skip-files: - .*mock.*\.go$ - version.go - - example_test.go + - .*\_test\.go$ - generate.go modules-download-mode: readonly linters-settings: diff --git a/xkafka/middleware/prometheus/consumer_middleware.go b/xkafka/middleware/prometheus/consumer_middleware.go deleted file mode 100644 index 4bbbcc0..0000000 --- a/xkafka/middleware/prometheus/consumer_middleware.go +++ /dev/null @@ -1,74 +0,0 @@ -package prometheus - -import ( - "context" - "time" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/gojekfarm/xtools/xkafka" -) - -var consumerLabels = []string{ - LabelGroup, - LabelStatus, - LabelTopic, -} - -var consumerLag = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: MetricMessageLagSeconds, - Subsystem: SubsystemConsumer, - Help: "What is the lag of the consumer, partitioned by group, topic and partition.", -}, []string{LabelGroup, LabelTopic, LabelPartition}) - -var consumerCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: MetricMessagesTotal, - Subsystem: SubsystemConsumer, - Help: "How many messages consumed, partitioned by group and status.", -}, consumerLabels) - -var consumerInflightMessages = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: MetricMessagesInFlight, - Subsystem: SubsystemConsumer, - Help: "Current number of messages being processed by consumer, partitioned by group.", -}, []string{LabelGroup, LabelTopic}) - -var consumerProcessingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: MetricMessageDurationSeconds, - Subsystem: SubsystemConsumer, - Help: "How long it took to process the message, partitioned by group and status.", -}, consumerLabels) - -var allConsumerCollectors = []prometheus.Collector{ - consumerLag, - consumerCounter, - consumerInflightMessages, - consumerProcessingDuration, -} - -// ConsumerMiddleware adds prometheus instrumentation for xkafka.Consumer. -func ConsumerMiddleware(next xkafka.Handler) xkafka.Handler { - return xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { - startTime := time.Now() - - consumerLag.WithLabelValues(m.Group, m.Topic, string(m.Partition)). - Observe(time.Since(m.Timestamp).Seconds()) - - inflight := consumerInflightMessages.WithLabelValues(m.Group, m.Topic) - inflight.Inc() - - m.AddCallback(func(ackMsg *xkafka.Message) { - consumerCounter. - WithLabelValues(ackMsg.Group, ackMsg.Status.String(), ackMsg.Topic). - Inc() - - consumerProcessingDuration. - WithLabelValues(ackMsg.Group, ackMsg.Status.String(), ackMsg.Topic). - Observe(time.Since(startTime).Seconds()) - - inflight.Dec() - }) - - return next.Handle(ctx, m) - }) -} diff --git a/xkafka/middleware/prometheus/consumer_middleware_test.go b/xkafka/middleware/prometheus/consumer_middleware_test.go deleted file mode 100644 index c8f95f1..0000000 --- a/xkafka/middleware/prometheus/consumer_middleware_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package prometheus - -import ( - "context" - "strings" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - - "github.com/gojekfarm/xtools/xkafka" -) - -func TestConsumerMiddleware(t *testing.T) { - msg := &xkafka.Message{ - Topic: "test-topic", - Group: "test-group", - Partition: 2, - Key: []byte("key"), - Value: []byte("value"), - } - - consumerHandler := xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { - time.Sleep(1 * time.Second) - - m.AckSuccess() - - return nil - }) - - reg := prometheus.NewRegistry() - err := RegisterConsumerMetrics(reg) - assert.NoError(t, err) - - instrumentedHandler := ConsumerMiddleware(consumerHandler) - err = instrumentedHandler.Handle(context.TODO(), msg) - assert.NoError(t, err) - - time.Sleep(50 * time.Millisecond) - - expected := ` - # HELP kafka_consumer_messages_in_flight Current number of messages being processed by consumer, partitioned by group. - # TYPE kafka_consumer_messages_in_flight gauge - kafka_consumer_messages_in_flight{group="test-group",topic="test-topic"} 0 - # HELP kafka_consumer_messages_total How many messages consumed, partitioned by group and status. - # TYPE kafka_consumer_messages_total counter - kafka_consumer_messages_total{group="test-group",status="SUCCESS",topic="test-topic"} 1 -` - expectedMetrics := []string{ - "kafka_consumer_messages_in_flight", - "kafka_consumer_messages_total", - } - - err = testutil.GatherAndCompare(reg, strings.NewReader(expected), expectedMetrics...) - assert.NoError(t, err) -} diff --git a/xkafka/middleware/prometheus/doc.go b/xkafka/middleware/prometheus/doc.go deleted file mode 100644 index 52b41f8..0000000 --- a/xkafka/middleware/prometheus/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package prometheus provides a Prometheus metrics middleware for -// xkafka.Producer and xkafka.Consumer. -package prometheus diff --git a/xkafka/middleware/prometheus/metrics.go b/xkafka/middleware/prometheus/metrics.go deleted file mode 100644 index c7008f6..0000000 --- a/xkafka/middleware/prometheus/metrics.go +++ /dev/null @@ -1,27 +0,0 @@ -package prometheus - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -// RegisterConsumerMetrics registers all prometheus.Collector(s) for consumer with given prometheus.Registerer. -func RegisterConsumerMetrics(r prometheus.Registerer) error { - for _, c := range allConsumerCollectors { - if err := r.Register(c); err != nil { - return err - } - } - - return nil -} - -// RegisterProducerMetrics registers all prometheus.Collector(s) for producer with given prometheus.Registerer. -func RegisterProducerMetrics(r prometheus.Registerer) error { - for _, p := range allProducerCollectors { - if err := r.Register(p); err != nil { - return err - } - } - - return nil -} diff --git a/xkafka/middleware/prometheus/metrics_test.go b/xkafka/middleware/prometheus/metrics_test.go deleted file mode 100644 index 237183d..0000000 --- a/xkafka/middleware/prometheus/metrics_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package prometheus - -import ( - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" -) - -func TestRegisterConsumerMetrics(t *testing.T) { - r := prometheus.NewRegistry() - - assert.NoError(t, RegisterConsumerMetrics(r)) - assert.Error(t, RegisterConsumerMetrics(r)) -} - -func TestRegisterProducerMetrics(t *testing.T) { - r := prometheus.NewRegistry() - - assert.NoError(t, RegisterProducerMetrics(r)) - assert.Error(t, RegisterProducerMetrics(r)) -} diff --git a/xkafka/middleware/prometheus/producer_middleware.go b/xkafka/middleware/prometheus/producer_middleware.go deleted file mode 100644 index 7fa25cd..0000000 --- a/xkafka/middleware/prometheus/producer_middleware.go +++ /dev/null @@ -1,64 +0,0 @@ -package prometheus - -import ( - "context" - "time" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/gojekfarm/xtools/xkafka" -) - -var producerLabels = []string{ - LabelGroup, - LabelStatus, - LabelTopic, -} - -var producerCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: MetricMessagesTotal, - Subsystem: SubsystemProducer, - Help: "How many messages produced, partitioned by group, status and topic.", -}, producerLabels) - -var producerRunDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: MetricMessageDurationSeconds, - Subsystem: SubsystemProducer, - Help: "How long it took to process the message, partitioned by group, status and topic.", -}, producerLabels) - -var producerInflightMessages = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: MetricMessagesInFlight, - Subsystem: SubsystemProducer, - Help: "Current number of messages being processed by producer, partitioned by group and topic.", -}, []string{LabelGroup, LabelTopic}) - -var allProducerCollectors = []prometheus.Collector{ - producerCounter, - producerRunDuration, - producerInflightMessages, -} - -// ProducerMiddleware adds prometheus instrumentation for xkafka.Producer. -func ProducerMiddleware(next xkafka.Handler) xkafka.Handler { - return xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { - start := time.Now() - - inflight := producerInflightMessages.WithLabelValues(m.Group, m.Topic) - inflight.Inc() - - m.AddCallback(func(ackMsg *xkafka.Message) { - producerCounter. - WithLabelValues(ackMsg.Group, ackMsg.Status.String(), ackMsg.Topic). - Inc() - - producerRunDuration. - WithLabelValues(ackMsg.Group, ackMsg.Status.String(), ackMsg.Topic). - Observe(time.Since(start).Seconds()) - - inflight.Dec() - }) - - return next.Handle(ctx, m) - }) -} diff --git a/xkafka/middleware/prometheus/producer_middleware_test.go b/xkafka/middleware/prometheus/producer_middleware_test.go deleted file mode 100644 index 11b4559..0000000 --- a/xkafka/middleware/prometheus/producer_middleware_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package prometheus - -import ( - "context" - "strings" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - - "github.com/gojekfarm/xtools/xkafka" -) - -func TestProducerMiddleware(t *testing.T) { - msg := &xkafka.Message{ - Topic: "test-topic", - Group: "test-group", - Key: []byte("key"), - Value: []byte("value"), - } - - producerHandler := xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { - m.AckSuccess() - - return nil - }) - - reg := prometheus.NewRegistry() - err := RegisterProducerMetrics(reg) - assert.NoError(t, err) - - instrumentedHandler := ProducerMiddleware(producerHandler) - err = instrumentedHandler.Handle(context.TODO(), msg) - assert.NoError(t, err) - - time.Sleep(50 * time.Millisecond) - - expected := ` - # HELP kafka_producer_messages_in_flight Current number of messages being processed by producer, partitioned by group and topic. - # TYPE kafka_producer_messages_in_flight gauge - kafka_producer_messages_in_flight{group="test-group",topic="test-topic"} 0 - # HELP kafka_producer_messages_total How many messages produced, partitioned by group, status and topic. - # TYPE kafka_producer_messages_total counter - kafka_producer_messages_total{group="test-group",status="SUCCESS",topic="test-topic"} 1 -` - expectedMetrics := []string{ - "kafka_producer_messages_in_flight", - "kafka_producer_messages_total", - } - - err = testutil.GatherAndCompare(reg, strings.NewReader(expected), expectedMetrics...) - assert.NoError(t, err) -} diff --git a/xkafka/middleware/prometheus/semconv.go b/xkafka/middleware/prometheus/semconv.go deleted file mode 100644 index 09ee2ea..0000000 --- a/xkafka/middleware/prometheus/semconv.go +++ /dev/null @@ -1,25 +0,0 @@ -package prometheus - -// This file contains the semantic conventions for the Prometheus metrics. - -// Labels -const ( - LabelTopic = "topic" - LabelPartition = "partition" - LabelGroup = "group" - LabelStatus = "status" -) - -// Metric names -const ( - MetricMessageLagSeconds = "message_lag_seconds" - MetricMessagesTotal = "messages_total" - MetricMessagesInFlight = "messages_in_flight" - MetricMessageDurationSeconds = "message_duration_seconds" -) - -// Subsystem names -const ( - SubsystemConsumer = "kafka_consumer" - SubsystemProducer = "kafka_producer" -) diff --git a/xprom/doc.go b/xprom/doc.go new file mode 100644 index 0000000..2d0f66f --- /dev/null +++ b/xprom/doc.go @@ -0,0 +1,3 @@ +// Package xprom provides a prometheus metrics that follow +// the OpenTelemetry semantic conventions. +package xprom diff --git a/xprom/go.mod b/xprom/go.mod new file mode 100644 index 0000000..49f5ee5 --- /dev/null +++ b/xprom/go.mod @@ -0,0 +1,3 @@ +module github.com/gojekfarm/xtools/xprom + +go 1.21 diff --git a/xprom/semconv/common.go b/xprom/semconv/common.go new file mode 100644 index 0000000..741c14b --- /dev/null +++ b/xprom/semconv/common.go @@ -0,0 +1,12 @@ +package semconv + +// Server labels. +const ( + ServerAddress = "server_address" + ServerPort = "server_port" +) + +// Error labels. +const ( + ErrorType = "error_type" +) diff --git a/xprom/semconv/doc.go b/xprom/semconv/doc.go new file mode 100644 index 0000000..b463b0f --- /dev/null +++ b/xprom/semconv/doc.go @@ -0,0 +1,3 @@ +// Package semconv provides standard OpenTelemetry semantic +// conventions for Prometheus metrics. +package semconv diff --git a/xprom/semconv/go.mod b/xprom/semconv/go.mod new file mode 100644 index 0000000..90209e8 --- /dev/null +++ b/xprom/semconv/go.mod @@ -0,0 +1,3 @@ +module github.com/gojekfarm/xtools/xprom/semconv + +go 1.21 diff --git a/xprom/semconv/kafka.go b/xprom/semconv/kafka.go new file mode 100644 index 0000000..5b5695d --- /dev/null +++ b/xprom/semconv/kafka.go @@ -0,0 +1,43 @@ +package semconv + +// System names. +const ( + SystemKafka = "kafka" +) + +// Metric names. +// https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md +const ( + MessagingClientOperationDuration = "messaging_client_operation_duration" + MessagingClientPublishedMessages = "messaging_client_published_messages" + MessagingClientConsumedMessages = "messaging_client_consumed_messages" +) + +// Custom metrics. +const ( + MessagingInflightMessages = "messaging_inflight_messages" +) + +// Labels. +// https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md +const ( + MessagingSystem = "messaging_system" + MessagingOperationName = "messaging_operation_name" + MessagingDestinationName = "messaging_destination_name" + MessagingConsumerGroupName = "messaging_consumer_group_name" + MessagingDestinationPartitionID = "messaging_destination_partition_id" +) + +// Kafka aliases. +const ( + MessagingKafkaTopic = MessagingDestinationName + MessagingKafkaPartition = MessagingDestinationPartitionID + MessagingKafkaConsumerGroup = MessagingConsumerGroupName + MessagingKafkaMessageStatus = "messaging_kafka_message_status" +) + +// Operation names. +const ( + OperationPublish = "publish" + OperationConsume = "consume" +) diff --git a/xprom/xpromkafka/collector.go b/xprom/xpromkafka/collector.go new file mode 100644 index 0000000..0b99b3b --- /dev/null +++ b/xprom/xpromkafka/collector.go @@ -0,0 +1,242 @@ +package xpromkafka + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/gojekfarm/xtools/xkafka" + "github.com/gojekfarm/xtools/xprom/semconv" +) + +var ( + defaultLatencyBuckets = []float64{ + 0.001, // 1ms + 0.002, // 2ms + 0.005, // 5ms + 0.01, // 10ms + 0.02, // 20ms + 0.05, // 50ms + 0.1, // 100ms + 0.2, // 200ms + 0.5, // 500ms + 1, // 1s + 2, // 2s + 5, // 5s + 10, // 10s + } +) + +// Collector provides metrics for xkafka.Producer and xkafka.Consumer. +type Collector struct { + opts options + duration *prometheus.HistogramVec + inflight *prometheus.GaugeVec + published *prometheus.CounterVec + consumed *prometheus.CounterVec +} + +// NewCollector creates a new Collector. +func NewCollector(opts ...Option) *Collector { + o := options{ + latencyBuckets: defaultLatencyBuckets, + } + + for _, opt := range opts { + opt.apply(&o) + } + + constLabels := prometheus.Labels{ + semconv.MessagingSystem: semconv.SystemKafka, + } + + return &Collector{ + opts: o, + duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: semconv.MessagingClientOperationDuration, + Help: "Message processing duration.", + ConstLabels: constLabels, + Buckets: o.latencyBuckets, + }, []string{ + semconv.MessagingOperationName, + semconv.ServerAddress, + semconv.ServerPort, + semconv.MessagingKafkaConsumerGroup, + semconv.MessagingKafkaTopic, + semconv.MessagingKafkaPartition, + semconv.MessagingKafkaMessageStatus, + semconv.ErrorType, + }), + inflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: semconv.MessagingInflightMessages, + Help: "Messages currently being processed.", + ConstLabels: constLabels, + }, []string{ + semconv.MessagingOperationName, + semconv.ServerAddress, + semconv.ServerPort, + semconv.MessagingKafkaConsumerGroup, + semconv.MessagingKafkaTopic, + semconv.MessagingKafkaPartition, + }), + published: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: semconv.MessagingClientPublishedMessages, + Help: "Messages published.", + ConstLabels: constLabels, + }, []string{ + semconv.MessagingOperationName, + semconv.ServerAddress, + semconv.ServerPort, + semconv.MessagingKafkaConsumerGroup, + semconv.MessagingKafkaPartition, + semconv.MessagingKafkaTopic, + semconv.MessagingKafkaMessageStatus, + semconv.ErrorType, + }), + consumed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: semconv.MessagingClientConsumedMessages, + Help: "Messages consumed.", + ConstLabels: constLabels, + }, []string{ + semconv.MessagingOperationName, + semconv.ServerAddress, + semconv.ServerPort, + semconv.MessagingKafkaConsumerGroup, + semconv.MessagingKafkaTopic, + semconv.MessagingKafkaPartition, + semconv.MessagingKafkaMessageStatus, + semconv.ErrorType, + }), + } +} + +// Register registers the metrics with the provided registry. +func (c *Collector) Register(registry prometheus.Registerer) error { + if err := registry.Register(c.duration); err != nil { + return err + } + + if err := registry.Register(c.inflight); err != nil { + return err + } + + if err := registry.Register(c.published); err != nil { + return err + } + + if err := registry.Register(c.consumed); err != nil { + return err + } + + return nil +} + +// ConsumerMiddleware returns a middleware that instruments xkafka.Consumer. +// Options passed to this function will override the Collector options. +func (c *Collector) ConsumerMiddleware(opts ...Option) xkafka.MiddlewareFunc { + mwopts := &options{ + errFn: c.opts.errFn, + address: c.opts.address, + port: c.opts.port, + } + + for _, opt := range opts { + opt.apply(mwopts) + } + + return func(next xkafka.Handler) xkafka.Handler { + return xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error { + start := time.Now() + labels := prometheus.Labels{ + semconv.MessagingOperationName: semconv.OperationConsume, + semconv.ServerAddress: mwopts.address, + semconv.MessagingKafkaConsumerGroup: msg.Group, + semconv.MessagingKafkaTopic: msg.Topic, + semconv.MessagingKafkaPartition: fmt.Sprintf("%d", msg.Partition), + } + + if mwopts.port != 0 { + labels[semconv.ServerPort] = fmt.Sprintf("%d", mwopts.port) + } + + inflight := c.inflight.With(labels) + + inflight.Inc() + defer inflight.Dec() + + msg.AddCallback(func(ackMsg *xkafka.Message) { + labels := labels + + labels[semconv.MessagingKafkaMessageStatus] = ackMsg.Status.String() + labels[semconv.ErrorType] = "" + + if ackMsg.Err() != nil && mwopts.errFn != nil { + labels[semconv.ErrorType] = mwopts.errFn(ackMsg.Err()) + } + + c.duration.With(labels).Observe(time.Since(start).Seconds()) + c.consumed.With(labels).Inc() + }) + + return next.Handle(ctx, msg) + }) + } +} + +// ProducerMiddleware returns a middleware that instruments xkafka.Producer. +// Options passed to this function will override the Collector options. +func (c *Collector) ProducerMiddleware(opts ...Option) xkafka.MiddlewareFunc { + mwopts := &options{ + errFn: c.opts.errFn, + address: c.opts.address, + port: c.opts.port, + } + + for _, opt := range opts { + opt.apply(mwopts) + } + + return func(next xkafka.Handler) xkafka.Handler { + return xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error { + start := time.Now() + labels := prometheus.Labels{ + semconv.MessagingOperationName: semconv.OperationPublish, + semconv.ServerAddress: mwopts.address, + semconv.MessagingKafkaTopic: msg.Topic, + semconv.MessagingKafkaPartition: "", + semconv.MessagingKafkaConsumerGroup: msg.Group, + } + + if mwopts.port != 0 { + labels[semconv.ServerPort] = fmt.Sprintf("%d", mwopts.port) + } + + inflight := c.inflight.With(labels) + + inflight.Inc() + defer inflight.Dec() + + msg.AddCallback(func(ackMsg *xkafka.Message) { + labels := labels + + if ackMsg.Partition != -1 { + labels[semconv.MessagingKafkaPartition] = fmt.Sprintf("%d", ackMsg.Partition) + } + + labels[semconv.MessagingKafkaMessageStatus] = ackMsg.Status.String() + labels[semconv.ErrorType] = "" + + if ackMsg.Err() != nil && mwopts.errFn != nil { + labels[semconv.ErrorType] = mwopts.errFn(ackMsg.Err()) + } + + c.duration.With(labels).Observe(time.Since(start).Seconds()) + c.published.With(labels).Inc() + }) + + return next.Handle(ctx, msg) + }) + } +} diff --git a/xprom/xpromkafka/collector_test.go b/xprom/xpromkafka/collector_test.go new file mode 100644 index 0000000..26b715e --- /dev/null +++ b/xprom/xpromkafka/collector_test.go @@ -0,0 +1,115 @@ +package xpromkafka + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + + "github.com/gojekfarm/xtools/xkafka" +) + +func TestConsumerMiddleware(t *testing.T) { + msg := &xkafka.Message{ + Topic: "test-topic", + Group: "test-group", + Key: []byte("key"), + Value: []byte("value"), + Partition: 12, + } + + handler := xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { + err := errors.New("some error") + + m.AckFail(err) + + return err + }) + + reg := prometheus.NewRegistry() + collector := NewCollector( + LatencyBuckets{0.1, 0.5, 1, 2, 5}, + ) + + collector.Register(reg) + + instrumentedHandler := collector.ConsumerMiddleware( + Address("localhost"), + Port(9092), + ErrorClassifer(func(err error) string { + return "CustomError" + }), + ).Middleware(handler) + + err := instrumentedHandler.Handle(context.TODO(), msg) + assert.Error(t, err) + + expectedMetrics := []string{ + "messaging_inflight_messages", + "messaging_client_consumed_messages", + } + expected := ` + # HELP messaging_client_consumed_messages Messages consumed. + # TYPE messaging_client_consumed_messages counter + messaging_client_consumed_messages{error_type="CustomError",messaging_consumer_group_name="test-group",messaging_destination_name="test-topic",messaging_destination_partition_id="12",messaging_kafka_message_status="FAIL",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 1 + # HELP messaging_inflight_messages Messages currently being processed. + # TYPE messaging_inflight_messages gauge + messaging_inflight_messages{messaging_consumer_group_name="test-group",messaging_destination_name="test-topic",messaging_destination_partition_id="12",messaging_operation_name="consume",messaging_system="kafka",server_address="localhost",server_port="9092"} 0 + ` + + err = testutil.GatherAndCompare(reg, strings.NewReader(expected), expectedMetrics...) + assert.NoError(t, err) +} + +func TestProducerMiddleware(t *testing.T) { + msg := &xkafka.Message{ + Topic: "test-topic", + Group: "test-group", + Key: []byte("key"), + Value: []byte("value"), + } + + handler := xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { + m.Partition = 12 + err := errors.New("some error") + m.AckFail(err) + return err + }) + + reg := prometheus.NewRegistry() + collector := NewCollector( + LatencyBuckets{0.1, 0.5, 1, 2, 5}, + ) + + collector.Register(reg) + + instrumentedHandler := collector.ProducerMiddleware( + Address("localhost"), + Port(9092), + ErrorClassifer(func(err error) string { + return "CustomError" + }), + ).Middleware(handler) + + err := instrumentedHandler.Handle(context.TODO(), msg) + assert.Error(t, err) + + expectedMetrics := []string{ + "messaging_inflight_messages", + "messaging_client_published_messages", + } + expected := `# HELP messaging_client_published_messages Messages published. + # TYPE messaging_client_published_messages counter + messaging_client_published_messages{error_type="CustomError",messaging_consumer_group_name="test-group",messaging_destination_name="test-topic",messaging_destination_partition_id="12",messaging_kafka_message_status="FAIL",messaging_operation_name="publish",messaging_system="kafka",server_address="localhost",server_port="9092"} 1 + # HELP messaging_inflight_messages Messages currently being processed. + # TYPE messaging_inflight_messages gauge + messaging_inflight_messages{messaging_consumer_group_name="test-group",messaging_destination_name="test-topic",messaging_destination_partition_id="",messaging_operation_name="publish",messaging_system="kafka",server_address="localhost",server_port="9092"} 0 + ` + + err = testutil.GatherAndCompare(reg, strings.NewReader(expected), expectedMetrics...) + assert.NoError(t, err) +} diff --git a/xprom/xpromkafka/doc.go b/xprom/xpromkafka/doc.go new file mode 100644 index 0000000..730ed39 --- /dev/null +++ b/xprom/xpromkafka/doc.go @@ -0,0 +1,3 @@ +// Package xpromkafka provides a Prometheus metrics middleware for +// xkafka.Producer and xkafka.Consumer. +package xpromkafka diff --git a/xprom/xpromkafka/example_test.go b/xprom/xpromkafka/example_test.go new file mode 100644 index 0000000..01e2b6a --- /dev/null +++ b/xprom/xpromkafka/example_test.go @@ -0,0 +1,132 @@ +package xpromkafka_test + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/gojekfarm/xtools/xkafka" + "github.com/gojekfarm/xtools/xprom/xpromkafka" +) + +var handler = xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { + // Handle message. + return nil +}) + +func ExampleCollector_ConsumerMiddleware() { + consumer, _ := xkafka.NewConsumer( + "test-group", + handler, + xkafka.Brokers{"localhost:9092"}, + xkafka.Topics{"test-topic"}, + ) + + reg := prometheus.NewRegistry() + collector := xpromkafka.NewCollector( + xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, + xpromkafka.Address("localhost:9092"), + xpromkafka.ErrorClassifer(func(err error) string { + // Classify errors. + return "CustomError" + }), + ) + + collector.Register(reg) + consumer.Use(collector.ConsumerMiddleware()) + + // Start consuming messages. +} + +func ExampleCollector_ConsumerMiddleware_multipleConsumers() { + reg := prometheus.NewRegistry() + collector := xpromkafka.NewCollector( + xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, + xpromkafka.ErrorClassifer(func(err error) string { + // Classify errors. + return "CustomError" + }), + ) + + collector.Register(reg) + + consumer1, _ := xkafka.NewConsumer( + "test-group-1", + handler, + xkafka.Brokers{"localhost:9092"}, + xkafka.Topics{"test-topic-1"}, + ) + + consumer1.Use(collector.ConsumerMiddleware( + xpromkafka.Address("localhost:9092"), + )) + + consumer2, _ := xkafka.NewConsumer( + "test-group-2", + handler, + xkafka.Brokers{"another-host:9092"}, + xkafka.Topics{"test-topic-2"}, + ) + + consumer2.Use(collector.ConsumerMiddleware( + xpromkafka.Address("another-host:9092"), + )) + + // Start consuming messages. +} + +func ExampleCollector_ProducerMiddleware() { + producer, _ := xkafka.NewProducer( + "test-publisher", + xkafka.Brokers{"localhost:9092"}, + ) + + reg := prometheus.NewRegistry() + collector := xpromkafka.NewCollector( + xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, + xpromkafka.Address("localhost:9092"), + xpromkafka.ErrorClassifer(func(err error) string { + // Classify errors. + return "CustomError" + }), + ) + + collector.Register(reg) + + producer.Use(collector.ProducerMiddleware()) + + // Produce messages. +} + +func ExampleCollector_ProducerMiddleware_multipleProducers() { + reg := prometheus.NewRegistry() + collector := xpromkafka.NewCollector( + xpromkafka.LatencyBuckets{0.1, 0.5, 1, 2, 5}, + xpromkafka.ErrorClassifer(func(err error) string { + // Classify errors. + return "CustomError" + }), + ) + + collector.Register(reg) + + producer1, _ := xkafka.NewProducer( + "test-publisher-1", + xkafka.Brokers{"localhost:9092"}, + ) + + producer1.Use(collector.ProducerMiddleware( + xpromkafka.Address("localhost:9092"), + )) + + producer2, _ := xkafka.NewProducer( + "test-publisher-2", + xkafka.Brokers{"another-host:9092"}, + ) + + producer2.Use(collector.ProducerMiddleware( + xpromkafka.Address("another-host:9092"), + )) + + // Produce messages. +} diff --git a/xkafka/middleware/prometheus/go.mod b/xprom/xpromkafka/go.mod similarity index 80% rename from xkafka/middleware/prometheus/go.mod rename to xprom/xpromkafka/go.mod index f23a356..ee25b8f 100644 --- a/xkafka/middleware/prometheus/go.mod +++ b/xprom/xpromkafka/go.mod @@ -1,13 +1,15 @@ -module github.com/gojekfarm/xtools/xkafka/middleware/prometheus +module github.com/gojekfarm/xtools/xprom/xpromkafka go 1.21 -toolchain go1.21.0 - -replace github.com/gojekfarm/xtools/xkafka => ../../ +replace ( + github.com/gojekfarm/xtools/xkafka => ../../xkafka + github.com/gojekfarm/xtools/xprom/semconv => ../semconv +) require ( github.com/gojekfarm/xtools/xkafka v0.8.1 + github.com/gojekfarm/xtools/xprom/semconv v0.0.0-00010101000000-000000000000 github.com/prometheus/client_golang v1.14.0 github.com/stretchr/testify v1.8.1 ) diff --git a/xkafka/middleware/prometheus/go.sum b/xprom/xpromkafka/go.sum similarity index 100% rename from xkafka/middleware/prometheus/go.sum rename to xprom/xpromkafka/go.sum diff --git a/xprom/xpromkafka/options.go b/xprom/xpromkafka/options.go new file mode 100644 index 0000000..6ac890d --- /dev/null +++ b/xprom/xpromkafka/options.go @@ -0,0 +1,43 @@ +package xpromkafka + +// Option configures the collector. +type Option interface { + apply(*options) +} + +type optionFunc func(*options) + +func (f optionFunc) apply(o *options) { f(o) } + +type errorClassifier func(error) string + +type options struct { + latencyBuckets []float64 + errFn errorClassifier + address string + port int +} + +// LatencyBuckets configures the latency buckets. +type LatencyBuckets []float64 + +func (l LatencyBuckets) apply(o *options) { + o.latencyBuckets = l +} + +// Address sets `server_address` label. +type Address string + +func (a Address) apply(o *options) { o.address = string(a) } + +// Port sets `server_port` label. +type Port int + +func (p Port) apply(o *options) { o.port = int(p) } + +// ErrorClassifer classifies errors types for `error_type` label. +func ErrorClassifer(fn func(error) string) Option { + return optionFunc(func(o *options) { + o.errFn = fn + }) +}