From eb19156f4c466f4ab4a0c5fe472aec02c52da7c1 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 5 Aug 2023 16:09:24 +0000 Subject: [PATCH 01/11] Add metrics to Kafka Source Signed-off-by: Krishna Kondaka --- .../consumer/KafkaSourceCustomConsumer.java | 95 +++++++++++---- .../plugins/kafka/source/KafkaSource.java | 4 +- .../util/KafkaSourceSecurityConfigurer.java | 2 +- .../plugins/kafka/util/KafkaTopicMetrics.java | 109 ++++++++++++++++++ 4 files changed, 183 insertions(+), 27 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index e1a071ffcb..5f818997d0 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -24,6 +24,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -34,17 +35,20 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Map; -import java.util.HashMap; +import java.time.Instant; import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import org.apache.commons.lang3.Range; @@ -55,9 +59,15 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; - static final String POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME = "positiveAcknowledgementSetCounter"; - static final String NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME = "negativeAcknowledgementSetCounter"; + static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements"; + static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements"; + static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; + static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors"; + static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; + static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; + static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; static final String DEFAULT_KEY = "message"; + static final int METRICS_UPDATE_INTERVAL = 60; private volatile long lastCommitTime; private KafkaConsumer consumer= null; @@ -74,10 +84,16 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private Set partitionsToReset; private final AcknowledgementSetManager acknowledgementSetManager; private final Map partitionCommitTrackerMap; - private final Counter positiveAcknowledgementSetCounter; - private final Counter negativeAcknowledgementSetCounter; + private Integer numberOfPositiveAcknowledgements; + private Integer numberOfNegativeAcknowledgements; + private Integer numberOfRecordsFailedToParse; + private Integer numberOfDeserializationErrors; + private Integer numberOfBufferSizeOverflows; + private Integer numberOfPollAuthErrors; private final boolean acknowledgementsEnabled; private final Duration acknowledgementsTimeout; + private final KafkaTopicMetrics topicMetrics; + private long metricsUpdatedTime; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -86,26 +102,33 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final TopicConfig topicConfig, final String schemaType, final AcknowledgementSetManager acknowledgementSetManager, - final PluginMetrics pluginMetrics) { + KafkaTopicMetrics topicMetrics) { this.topicName = topicConfig.getName(); this.topicConfig = topicConfig; this.shutdownInProgress = shutdownInProgress; this.consumer = consumer; this.buffer = buffer; + this.numberOfRecordsFailedToParse = 0; + this.numberOfDeserializationErrors = 0; + this.numberOfBufferSizeOverflows = 0; + this.numberOfPollAuthErrors = 0; + this.numberOfPositiveAcknowledgements = 0; + this.numberOfNegativeAcknowledgements = 0; + this.topicMetrics = topicMetrics; this.offsetsToCommit = new HashMap<>(); + this.metricsUpdatedTime = Instant.now().getEpochSecond(); + this.topicMetrics.register(consumer); this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); // If the timeout value is different from default value, then enable acknowledgements automatically. this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; this.acknowledgementSetManager = acknowledgementSetManager; this.pluginMetrics = pluginMetrics; this.partitionCommitTrackerMap = new HashMap<>(); - this.partitionsToReset = new HashSet<>(); + this.partitionsToReset = Collections.synchronizedSet(new HashSet<>()); this.schema = MessageFormat.getByMessageFormatByName(schemaType); Duration bufferTimeout = Duration.ofSeconds(1); this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); this.lastCommitTime = System.currentTimeMillis(); - this.positiveAcknowledgementSetCounter = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME); - this.negativeAcknowledgementSetCounter = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME); } public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) { @@ -121,7 +144,7 @@ private AcknowledgementSet createAcknowledgementSet(Map { if (result == true) { - positiveAcknowledgementSetCounter.increment(); + numberOfPositiveAcknowledgements++; offsets.forEach((partition, offsetRange) -> { try { int partitionId = partition.partition(); @@ -137,21 +160,15 @@ private AcknowledgementSet createAcknowledgementSet(Map { - synchronized(partitionsToReset) { - partitionsToReset.add(partition); - } + partitionsToReset.add(partition); }); } }, acknowledgementsTimeout); return acknowledgementSet; } - double getPositiveAcknowledgementsCount() { - return positiveAcknowledgementSetCounter.count(); - } - public void consumeRecords() throws Exception { try { ConsumerRecords records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); @@ -170,17 +187,19 @@ public void consumeRecords() throws Exception { } } } catch (AuthenticationException e) { - LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e); + LOG.warn("Authentication error while doing poll(). Will retry after 10 seconds", e); + numberOfPollAuthErrors++; Thread.sleep(10000); } catch (RecordDeserializationException e) { - LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record", + LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record", e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); + numberOfDeserializationErrors++; consumer.seek(e.topicPartition(), e.offset()+1); } } private void resetOrCommitOffsets() { - synchronized(partitionsToReset) { + if (partitionsToReset.size() > 0) { partitionsToReset.forEach(partition -> { try { final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); @@ -216,6 +235,22 @@ Map getOffsetsToCommit() { return offsetsToCommit; } + public void updateMetrics() { + long curTime = Instant.now().getEpochSecond(); + if (curTime - metricsUpdatedTime >= METRICS_UPDATE_INTERVAL) { + topicMetrics.update(consumer); + topicMetrics.update(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors); + topicMetrics.update(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows); + topicMetrics.update(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse); + topicMetrics.update(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors); + topicMetrics.update(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); + topicMetrics.update(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); + topicMetrics.update(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); + + metricsUpdatedTime = curTime; + } + } + @Override public void run() { consumer.subscribe(Arrays.asList(topicName)); @@ -223,8 +258,9 @@ public void run() { try { resetOrCommitOffsets(); consumeRecords(); + updateMetrics(); } catch (Exception exp) { - LOG.error("Error while reading the records from the topic...", exp); + LOG.error("Error while reading the records from the topic {}", topicName, exp); } } } @@ -251,6 +287,7 @@ private Record getRecord(ConsumerRecord consumerRecord, in } } catch (Exception e){ LOG.error("Failed to parse JSON or AVRO record", e); + numberOfRecordsFailedToParse++; } if (!plainTextMode) { if (!(value instanceof Map)) { @@ -293,7 +330,15 @@ private void iterateRecordPartitions(ConsumerRecords records, fin if (acknowledgementSet != null) { acknowledgementSet.add(record.getData()); } - bufferAccumulator.add(record); + while (true) { + try { + bufferAccumulator.add(record); + break; + } catch (SizeOverflowException e) { + numberOfBufferSizeOverflows++; + Thread.sleep(100); + } + } } } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 21af54160d..e41906870e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -45,6 +45,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; @@ -118,6 +119,7 @@ public void start(Buffer> buffer) { KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); + KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); Properties consumerProperties = getConsumerProperties(topic, authProperties); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { @@ -136,7 +138,7 @@ public void start(Buffer> buffer) { kafkaConsumer = new KafkaConsumer(consumerProperties); break; } - consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, pluginMetrics); + consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); executorService.submit(consumer); }); } catch (Exception e) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index 245056e06e..4a6aaf30da 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -212,7 +212,7 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth try { result = kafkaClient.getBootstrapBrokers(request); } catch (KafkaException | StsException e) { - LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e); + LOG.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e); try { Thread.sleep(10000); } catch (InterruptedException exp) {} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java new file mode 100644 index 0000000000..809ae9477a --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Instant; +import java.util.Objects; +import java.util.Map; +import java.util.HashMap; + +public class KafkaTopicMetrics { + private int metricUpdateInterval; + private final String topicName; + private long updateTime; + private Map> consumerMetricsMap; + private Map camelCaseMap; + private final PluginMetrics pluginMetrics; + + public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { + this.pluginMetrics = pluginMetrics; + this.topicName = topicName; + this.consumerMetricsMap = new HashMap<>(); + this.updateTime = Instant.now().getEpochSecond(); + this.metricUpdateInterval = 60; //seconds + this.camelCaseMap = new HashMap<>(); + camelCaseMap.put("bytes-consumed-total", "bytesConsumedTotal"); + camelCaseMap.put("records-consumed-total", "recordsConsumedTotal"); + camelCaseMap.put("bytes-consumed-rate", "bytesConsumedRate"); + camelCaseMap.put("records-consumed-rate", "recordsConsumedRate"); + camelCaseMap.put("records-lag-max", "recordsLagMax"); + camelCaseMap.put("records-lead-min", "recordsLeadMin"); + camelCaseMap.put("commit-rate", "commitRate"); + camelCaseMap.put("join-rate", "joinRate"); + camelCaseMap.put("incoming-byte-rate", "incomingByteRate"); + camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate"); + camelCaseMap.put("assigned-partitions", "outgoingByteRate"); + } + + public void register(KafkaConsumer consumer) { + this.consumerMetricsMap.put(consumer, new HashMap<>()); + } + + private String getCamelCaseName(final String name) { + String camelCaseName = camelCaseMap.get(name); + if (Objects.isNull(camelCaseName)) { + return name; + } + return camelCaseName; + } + + public void update(final KafkaConsumer consumer, final String metricName, Integer metricValue) { + synchronized(consumerMetricsMap) { + Map cmetrics = consumerMetricsMap.get(consumer); + if (cmetrics != null) { + cmetrics.put(metricName, (double)metricValue); + } + } + } + + public void update(final KafkaConsumer consumer) { + Map cmetrics = null; + synchronized(consumerMetricsMap) { + cmetrics = consumerMetricsMap.get(consumer); + } + if (cmetrics == null) { + return; + } + Map metrics = consumer.metrics(); + for (Map.Entry entry : metrics.entrySet()) { + MetricName metric = entry.getKey(); + Metric value = entry.getValue(); + String metricName = metric.name(); + String metricGroup = metric.group(); + if ((metricName.contains("consumed")) || + ((!metric.tags().containsKey("partition")) && + (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) || + (metricName.equals("commit-rate") || metricName.equals("join-rate")) || + (metricName.equals("incoming-byte-rate") || metricName.equals("outgoing-byte-rate"))) { + cmetrics.put(metricName, value.metricValue()); + } + } + synchronized (consumerMetricsMap) { + long curTime = Instant.now().getEpochSecond(); + if (curTime - updateTime > metricUpdateInterval) { + final Map aggregatedMetrics = new HashMap<>(); + consumerMetricsMap.forEach((c, metricsMap) -> { + Double value = 0.0; + metricsMap.forEach((metricName, metricValue) -> { + if (metricValue instanceof Double) { + aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0)); + } + }); + }); + aggregatedMetrics.forEach((name, value) -> { + System.out.println("__METRIC__topic."+topicName+"."+getCamelCaseName(name)+"___VALUE__"+value); + pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); + }); + updateTime = curTime; + } + } + } +} From 7190bee9e8bcc2e74752cc1ac0f5de10eb871839 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 5 Aug 2023 16:13:21 +0000 Subject: [PATCH 02/11] Removed debug print statement Signed-off-by: Krishna Kondaka --- .../dataprepper/plugins/kafka/util/KafkaTopicMetrics.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 809ae9477a..8aec89c864 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -99,7 +99,6 @@ public void update(final KafkaConsumer consumer) { }); }); aggregatedMetrics.forEach((name, value) -> { - System.out.println("__METRIC__topic."+topicName+"."+getCamelCaseName(name)+"___VALUE__"+value); pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); }); updateTime = curTime; From 6e91aac274218352fde9c9c4de741f9ea7aa87d0 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 7 Aug 2023 23:14:20 +0000 Subject: [PATCH 03/11] Fixed failing test case Signed-off-by: Krishna Kondaka --- .../kafka/consumer/KafkaSourceCustomConsumerTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index da84536a82..d2cb989aae 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -23,6 +22,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import io.micrometer.core.instrument.Counter; @@ -77,7 +77,7 @@ public class KafkaSourceCustomConsumerTest { private TopicConfig topicConfig; @Mock - private PluginMetrics pluginMetrics; + private KafkaTopicMetrics topicMetrics; private KafkaSourceCustomConsumer consumer; @@ -100,7 +100,7 @@ public class KafkaSourceCustomConsumerTest { @BeforeEach public void setUp() { kafkaConsumer = mock(KafkaConsumer.class); - pluginMetrics = mock(PluginMetrics.class); + topicMetrics = mock(KafkaTopicMetrics.class); counter = mock(Counter.class); topicConfig = mock(TopicConfig.class); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); @@ -108,7 +108,6 @@ public void setUp() { when(topicConfig.getAutoCommit()).thenReturn(false); when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null); - when(pluginMetrics.counter(anyString())).thenReturn(counter); doAnswer((i)-> {return null;}).when(counter).increment(); callbackExecutor = Executors.newFixedThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000)); @@ -122,7 +121,7 @@ public void setUp() { public KafkaSourceCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled); when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); - return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, pluginMetrics); + return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics); } private BlockingBuffer> getBuffer() { From f4a727d81bc590dd1370b564e5b0461d153df3fe Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 8 Aug 2023 00:32:40 +0000 Subject: [PATCH 04/11] Added total committed metric and fixed tests Signed-off-by: Krishna Kondaka --- .../plugins/kafka/source/KafkaSourceJsonTypeIT.java | 1 + .../plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java | 1 + .../dataprepper/plugins/kafka/util/KafkaTopicMetrics.java | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 584ee24eb5..dff3d2b943 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -119,6 +119,7 @@ public void setup() { when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); when(jsonTopic.getWorkers()).thenReturn(1); + when(jsonTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(jsonTopic.getAutoCommit()).thenReturn(false); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index 6d52bba0ea..6179ba4f57 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -130,6 +130,7 @@ public void setup() { when(plainTextTopic.getName()).thenReturn(testTopic); when(plainTextTopic.getGroupId()).thenReturn(testGroup); when(plainTextTopic.getWorkers()).thenReturn(1); + when(plainTextTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(plainTextTopic.getAutoCommit()).thenReturn(false); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 8aec89c864..6b240635d2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -37,6 +37,7 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri camelCaseMap.put("records-lag-max", "recordsLagMax"); camelCaseMap.put("records-lead-min", "recordsLeadMin"); camelCaseMap.put("commit-rate", "commitRate"); + camelCaseMap.put("commit-total", "commitTotal"); camelCaseMap.put("join-rate", "joinRate"); camelCaseMap.put("incoming-byte-rate", "incomingByteRate"); camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate"); @@ -81,7 +82,7 @@ public void update(final KafkaConsumer consumer) { if ((metricName.contains("consumed")) || ((!metric.tags().containsKey("partition")) && (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) || - (metricName.equals("commit-rate") || metricName.equals("join-rate")) || + (metricName.equals("commit-rate") || metricName.equals("join-rate") || metricName.equals("commit-total")) || (metricName.equals("incoming-byte-rate") || metricName.equals("outgoing-byte-rate"))) { cmetrics.put(metricName, value.metricValue()); } From 3272a8fd6c3dfac49c9a1b255a9b8b0434c2806f Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 8 Aug 2023 15:02:48 +0000 Subject: [PATCH 05/11] Addressed review comments Signed-off-by: Krishna Kondaka --- .../configuration/KafkaSourceConfig.java | 8 +++ .../consumer/KafkaSourceCustomConsumer.java | 19 +++--- .../plugins/kafka/source/KafkaSource.java | 2 +- .../plugins/kafka/util/KafkaTopicMetrics.java | 62 ++++++++++--------- 4 files changed, 53 insertions(+), 38 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index 922c8f0d08..d1d6dbc19f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -37,6 +37,7 @@ public boolean getInsecure() { } public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30); + public static final Duration DEFAULT_METRICS_UPDATE_INTERVAL = Duration.ofSeconds(60); @JsonProperty("bootstrap_servers") private List bootStrapServers; @@ -67,6 +68,9 @@ public boolean getInsecure() { @JsonProperty("acknowledgments_timeout") private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; + @JsonProperty("metrics_update_interval") + private Duration metricsUpdateInterval = DEFAULT_METRICS_UPDATE_INTERVAL; + @JsonProperty("client_dns_lookup") private String clientDnsLookup; @@ -82,6 +86,10 @@ public Duration getAcknowledgementsTimeout() { return acknowledgementsTimeout; } + public Duration getMetricsUpdateInterval() { + return metricsUpdateInterval; + } + public List getTopics() { return topics; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 5f818997d0..3ef562a50c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -67,7 +67,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; static final String DEFAULT_KEY = "message"; - static final int METRICS_UPDATE_INTERVAL = 60; private volatile long lastCommitTime; private KafkaConsumer consumer= null; @@ -94,6 +93,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private final Duration acknowledgementsTimeout; private final KafkaTopicMetrics topicMetrics; private long metricsUpdatedTime; + private long metricsUpdateInterval; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -119,6 +119,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.metricsUpdatedTime = Instant.now().getEpochSecond(); this.topicMetrics.register(consumer); this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); + this.metricsUpdateInterval = sourceConfig.getMetricsUpdateInterval().getSeconds(); // If the timeout value is different from default value, then enable acknowledgements automatically. this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; this.acknowledgementSetManager = acknowledgementSetManager; @@ -237,15 +238,15 @@ Map getOffsetsToCommit() { public void updateMetrics() { long curTime = Instant.now().getEpochSecond(); - if (curTime - metricsUpdatedTime >= METRICS_UPDATE_INTERVAL) { + if (curTime - metricsUpdatedTime >= metricsUpdateInterval) { topicMetrics.update(consumer); - topicMetrics.update(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors); - topicMetrics.update(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows); - topicMetrics.update(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse); - topicMetrics.update(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors); - topicMetrics.update(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); - topicMetrics.update(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); - topicMetrics.update(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); + topicMetrics.setMetric(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors); + topicMetrics.setMetric(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows); + topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse); + topicMetrics.setMetric(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors); + topicMetrics.setMetric(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); + topicMetrics.setMetric(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); + topicMetrics.setMetric(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); metricsUpdatedTime = curTime; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index e41906870e..1e23b2be02 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -119,7 +119,7 @@ public void start(Buffer> buffer) { KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); - KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); + KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics, sourceConfig.getMetricsUpdateInterval().getSeconds()); Properties consumerProperties = getConsumerProperties(topic, authProperties); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 6b240635d2..3c8cc7b6ed 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -16,19 +16,19 @@ import java.util.HashMap; public class KafkaTopicMetrics { - private int metricUpdateInterval; + private long metricUpdateInterval; private final String topicName; private long updateTime; private Map> consumerMetricsMap; private Map camelCaseMap; private final PluginMetrics pluginMetrics; - public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { + public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics, final long metricUpdateInterval) { this.pluginMetrics = pluginMetrics; this.topicName = topicName; this.consumerMetricsMap = new HashMap<>(); this.updateTime = Instant.now().getEpochSecond(); - this.metricUpdateInterval = 60; //seconds + this.metricUpdateInterval = metricUpdateInterval; this.camelCaseMap = new HashMap<>(); camelCaseMap.put("bytes-consumed-total", "bytesConsumedTotal"); camelCaseMap.put("records-consumed-total", "recordsConsumedTotal"); @@ -56,7 +56,7 @@ private String getCamelCaseName(final String name) { return camelCaseName; } - public void update(final KafkaConsumer consumer, final String metricName, Integer metricValue) { + public void setMetric(final KafkaConsumer consumer, final String metricName, Integer metricValue) { synchronized(consumerMetricsMap) { Map cmetrics = consumerMetricsMap.get(consumer); if (cmetrics != null) { @@ -65,12 +65,30 @@ public void update(final KafkaConsumer consumer, final String metricName, Intege } } + private void aggregateMetrics() { + synchronized (consumerMetricsMap) { + final Map aggregatedMetrics = new HashMap<>(); + consumerMetricsMap.forEach((c, metricsMap) -> { + Double value = 0.0; + metricsMap.forEach((metricName, metricValue) -> { + if (metricValue instanceof Double) { + aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0)); + } + }); + }); + aggregatedMetrics.forEach((name, value) -> { + pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); + }); + } + } + public void update(final KafkaConsumer consumer) { Map cmetrics = null; synchronized(consumerMetricsMap) { cmetrics = consumerMetricsMap.get(consumer); } - if (cmetrics == null) { + // This should not happen... + if (Objects.isNull(cmetrics)) { return; } Map metrics = consumer.metrics(); @@ -79,31 +97,19 @@ public void update(final KafkaConsumer consumer) { Metric value = entry.getValue(); String metricName = metric.name(); String metricGroup = metric.group(); - if ((metricName.contains("consumed")) || - ((!metric.tags().containsKey("partition")) && - (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) || - (metricName.equals("commit-rate") || metricName.equals("join-rate") || metricName.equals("commit-total")) || - (metricName.equals("incoming-byte-rate") || metricName.equals("outgoing-byte-rate"))) { - cmetrics.put(metricName, value.metricValue()); + if (Objects.nonNull(camelCaseMap.get(metricName))) { + if (metric.tags().containsKey("partition") && + (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { + continue; + } + cmetrics.put(metricName, value.metricValue()); } } - synchronized (consumerMetricsMap) { - long curTime = Instant.now().getEpochSecond(); - if (curTime - updateTime > metricUpdateInterval) { - final Map aggregatedMetrics = new HashMap<>(); - consumerMetricsMap.forEach((c, metricsMap) -> { - Double value = 0.0; - metricsMap.forEach((metricName, metricValue) -> { - if (metricValue instanceof Double) { - aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0)); - } - }); - }); - aggregatedMetrics.forEach((name, value) -> { - pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); - }); - updateTime = curTime; - } + + long curTime = Instant.now().getEpochSecond(); + if (curTime - updateTime > metricUpdateInterval) { + aggregateMetrics(); + updateTime = curTime; } } } From b4cca7d18f73f86296084e378e3fbe95ebe3eba8 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 8 Aug 2023 16:53:38 +0000 Subject: [PATCH 06/11] Fixed number of committed records stat. Also fixed bug when acknowledgements enabled Signed-off-by: Krishna Kondaka --- .../consumer/KafkaSourceCustomConsumer.java | 62 +++++++++++++------ .../plugins/kafka/util/KafkaTopicMetrics.java | 5 +- .../KafkaSourceCustomConsumerTest.java | 2 +- 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 3ef562a50c..b1794dec1f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; -import io.micrometer.core.instrument.Counter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -66,6 +65,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; + static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted"; static final String DEFAULT_KEY = "message"; private volatile long lastCommitTime; @@ -83,12 +83,14 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private Set partitionsToReset; private final AcknowledgementSetManager acknowledgementSetManager; private final Map partitionCommitTrackerMap; + private List>> acknowledgedOffsets; private Integer numberOfPositiveAcknowledgements; private Integer numberOfNegativeAcknowledgements; private Integer numberOfRecordsFailedToParse; private Integer numberOfDeserializationErrors; private Integer numberOfBufferSizeOverflows; private Integer numberOfPollAuthErrors; + private long numberOfRecordsCommitted; private final boolean acknowledgementsEnabled; private final Duration acknowledgementsTimeout; private final KafkaTopicMetrics topicMetrics; @@ -108,6 +110,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.shutdownInProgress = shutdownInProgress; this.consumer = consumer; this.buffer = buffer; + this.numberOfRecordsCommitted = 0; this.numberOfRecordsFailedToParse = 0; this.numberOfDeserializationErrors = 0; this.numberOfBufferSizeOverflows = 0; @@ -118,6 +121,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.offsetsToCommit = new HashMap<>(); this.metricsUpdatedTime = Instant.now().getEpochSecond(); this.topicMetrics.register(consumer); + this.acknowledgedOffsets = new ArrayList<>(); this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); this.metricsUpdateInterval = sourceConfig.getMetricsUpdateInterval().getSeconds(); // If the timeout value is different from default value, then enable acknowledgements automatically. @@ -132,7 +136,10 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.lastCommitTime = System.currentTimeMillis(); } - public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) { + public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range offsetRange) { + long min = offsetRange.getMinimum(); + long max = offsetRange.getMaximum(); + numberOfRecordsCommitted += (max - min + 1); if (Objects.isNull(offsetAndMetadata)) { return; } @@ -146,20 +153,9 @@ private AcknowledgementSet createAcknowledgementSet(Map { if (result == true) { numberOfPositiveAcknowledgements++; - offsets.forEach((partition, offsetRange) -> { - try { - int partitionId = partition.partition(); - if (!partitionCommitTrackerMap.containsKey(partitionId)) { - OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); - Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; - partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); - } - OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); - updateOffsetsToCommit(partition, offsetAndMetadata); - } catch (Exception e) { - LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e); - } - }); + synchronized(acknowledgedOffsets) { + acknowledgedOffsets.add(offsets); + } } else { numberOfNegativeAcknowledgements++; offsets.forEach((partition, offsetRange) -> { @@ -182,7 +178,7 @@ public void consumeRecords() throws Exception { iterateRecordPartitions(records, acknowledgementSet, offsets); if (!acknowledgementsEnabled) { offsets.forEach((partition, offsetRange) -> - updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1))); + updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange)); } else { acknowledgementSet.complete(); } @@ -199,7 +195,7 @@ public void consumeRecords() throws Exception { } } - private void resetOrCommitOffsets() { + private void resetOffsets() { if (partitionsToReset.size() > 0) { partitionsToReset.forEach(partition -> { try { @@ -211,9 +207,35 @@ private void resetOrCommitOffsets() { }); partitionsToReset.clear(); } + } + + void processAcknowledgedOffsets() { + synchronized(acknowledgedOffsets) { + acknowledgedOffsets.forEach(offsets -> { + offsets.forEach((partition, offsetRange) -> { + try { + int partitionId = partition.partition(); + if (!partitionCommitTrackerMap.containsKey(partitionId)) { + OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); + Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; + partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); + } + OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); + updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange); + } catch (Exception e) { + LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e); + } + }); + }); + acknowledgedOffsets.clear(); + } + } + + private void commitOffsets() { if (topicConfig.getAutoCommit()) { return; } + processAcknowledgedOffsets(); long currentTimeMillis = System.currentTimeMillis(); if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { return; @@ -247,6 +269,7 @@ public void updateMetrics() { topicMetrics.setMetric(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); topicMetrics.setMetric(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); topicMetrics.setMetric(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); + topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_COMMITTED, numberOfRecordsCommitted); metricsUpdatedTime = curTime; } @@ -257,7 +280,8 @@ public void run() { consumer.subscribe(Arrays.asList(topicName)); while (!shutdownInProgress.get()) { try { - resetOrCommitOffsets(); + resetOffsets(); + commitOffsets(); consumeRecords(); updateMetrics(); } catch (Exception exp) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 3c8cc7b6ed..fe336c1d19 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -37,7 +37,6 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri camelCaseMap.put("records-lag-max", "recordsLagMax"); camelCaseMap.put("records-lead-min", "recordsLeadMin"); camelCaseMap.put("commit-rate", "commitRate"); - camelCaseMap.put("commit-total", "commitTotal"); camelCaseMap.put("join-rate", "joinRate"); camelCaseMap.put("incoming-byte-rate", "incomingByteRate"); camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate"); @@ -56,11 +55,11 @@ private String getCamelCaseName(final String name) { return camelCaseName; } - public void setMetric(final KafkaConsumer consumer, final String metricName, Integer metricValue) { + public void setMetric(final KafkaConsumer consumer, final String metricName, Number metricValue) { synchronized(consumerMetricsMap) { Map cmetrics = consumerMetricsMap.get(consumer); if (cmetrics != null) { - cmetrics.put(metricName, (double)metricValue); + cmetrics.put(metricName, metricValue.doubleValue()); } } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index d2cb989aae..1495a5bbcd 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -32,7 +32,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.any; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.MatcherAssert.assertThat; @@ -202,6 +201,7 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted Thread.sleep(10000); } catch (Exception e){} + consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit(); Assertions.assertEquals(offsetsToCommit.size(), 1); offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> { From 0b45113da92ade9bd2e11aa4f6db567e7ad37528 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 9 Aug 2023 14:12:32 +0000 Subject: [PATCH 07/11] Addressed review comments. Fixed acknowledgements related bug Signed-off-by: Krishna Kondaka --- .../configuration/KafkaSourceConfig.java | 7 - .../consumer/KafkaSourceCustomConsumer.java | 62 +----- .../consumer/TopicPartitionCommitTracker.java | 2 +- .../plugins/kafka/source/KafkaSource.java | 4 +- .../plugins/kafka/util/KafkaTopicMetrics.java | 210 ++++++++++++------ .../KafkaSourceCustomConsumerTest.java | 5 + .../TopicPartitionCommitTrackerTest.java | 2 +- 7 files changed, 157 insertions(+), 135 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index d1d6dbc19f..b88f2051fb 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -68,9 +68,6 @@ public boolean getInsecure() { @JsonProperty("acknowledgments_timeout") private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; - @JsonProperty("metrics_update_interval") - private Duration metricsUpdateInterval = DEFAULT_METRICS_UPDATE_INTERVAL; - @JsonProperty("client_dns_lookup") private String clientDnsLookup; @@ -86,10 +83,6 @@ public Duration getAcknowledgementsTimeout() { return acknowledgementsTimeout; } - public Duration getMetricsUpdateInterval() { - return metricsUpdateInterval; - } - public List getTopics() { return topics; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index b1794dec1f..d083b4e98b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.avro.generic.GenericRecord; import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -58,14 +57,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; - static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements"; - static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements"; - static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; - static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors"; - static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; - static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; - static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; - static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted"; static final String DEFAULT_KEY = "message"; private volatile long lastCommitTime; @@ -73,7 +64,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private AtomicBoolean shutdownInProgress; private final String topicName; private final TopicConfig topicConfig; - private PluginMetrics pluginMetrics= null; private MessageFormat schema; private final BufferAccumulator> bufferAccumulator; private final Buffer> buffer; @@ -84,18 +74,10 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private final AcknowledgementSetManager acknowledgementSetManager; private final Map partitionCommitTrackerMap; private List>> acknowledgedOffsets; - private Integer numberOfPositiveAcknowledgements; - private Integer numberOfNegativeAcknowledgements; - private Integer numberOfRecordsFailedToParse; - private Integer numberOfDeserializationErrors; - private Integer numberOfBufferSizeOverflows; - private Integer numberOfPollAuthErrors; - private long numberOfRecordsCommitted; private final boolean acknowledgementsEnabled; private final Duration acknowledgementsTimeout; private final KafkaTopicMetrics topicMetrics; private long metricsUpdatedTime; - private long metricsUpdateInterval; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -110,24 +92,15 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.shutdownInProgress = shutdownInProgress; this.consumer = consumer; this.buffer = buffer; - this.numberOfRecordsCommitted = 0; - this.numberOfRecordsFailedToParse = 0; - this.numberOfDeserializationErrors = 0; - this.numberOfBufferSizeOverflows = 0; - this.numberOfPollAuthErrors = 0; - this.numberOfPositiveAcknowledgements = 0; - this.numberOfNegativeAcknowledgements = 0; this.topicMetrics = topicMetrics; + this.topicMetrics.register(consumer); this.offsetsToCommit = new HashMap<>(); this.metricsUpdatedTime = Instant.now().getEpochSecond(); - this.topicMetrics.register(consumer); this.acknowledgedOffsets = new ArrayList<>(); this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); - this.metricsUpdateInterval = sourceConfig.getMetricsUpdateInterval().getSeconds(); // If the timeout value is different from default value, then enable acknowledgements automatically. this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; this.acknowledgementSetManager = acknowledgementSetManager; - this.pluginMetrics = pluginMetrics; this.partitionCommitTrackerMap = new HashMap<>(); this.partitionsToReset = Collections.synchronizedSet(new HashSet<>()); this.schema = MessageFormat.getByMessageFormatByName(schemaType); @@ -139,7 +112,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range offsetRange) { long min = offsetRange.getMinimum(); long max = offsetRange.getMaximum(); - numberOfRecordsCommitted += (max - min + 1); + topicMetrics.getNumberOfRecordsCommitted().increment(max - min + 1); if (Objects.isNull(offsetAndMetadata)) { return; } @@ -152,12 +125,12 @@ private AcknowledgementSet createAcknowledgementSet(Map { if (result == true) { - numberOfPositiveAcknowledgements++; + topicMetrics.getNumberOfPositiveAcknowledgements().increment(); synchronized(acknowledgedOffsets) { acknowledgedOffsets.add(offsets); } } else { - numberOfNegativeAcknowledgements++; + topicMetrics.getNumberOfNegativeAcknowledgements().increment(); offsets.forEach((partition, offsetRange) -> { partitionsToReset.add(partition); }); @@ -185,12 +158,12 @@ public void consumeRecords() throws Exception { } } catch (AuthenticationException e) { LOG.warn("Authentication error while doing poll(). Will retry after 10 seconds", e); - numberOfPollAuthErrors++; + topicMetrics.getNumberOfPollAuthErrors().increment(); Thread.sleep(10000); } catch (RecordDeserializationException e) { LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record", e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); - numberOfDeserializationErrors++; + topicMetrics.getNumberOfDeserializationErrors().increment(); consumer.seek(e.topicPartition(), e.offset()+1); } } @@ -258,23 +231,6 @@ Map getOffsetsToCommit() { return offsetsToCommit; } - public void updateMetrics() { - long curTime = Instant.now().getEpochSecond(); - if (curTime - metricsUpdatedTime >= metricsUpdateInterval) { - topicMetrics.update(consumer); - topicMetrics.setMetric(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors); - topicMetrics.setMetric(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows); - topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse); - topicMetrics.setMetric(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors); - topicMetrics.setMetric(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); - topicMetrics.setMetric(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); - topicMetrics.setMetric(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); - topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_COMMITTED, numberOfRecordsCommitted); - - metricsUpdatedTime = curTime; - } - } - @Override public void run() { consumer.subscribe(Arrays.asList(topicName)); @@ -283,7 +239,7 @@ public void run() { resetOffsets(); commitOffsets(); consumeRecords(); - updateMetrics(); + topicMetrics.update(consumer); } catch (Exception exp) { LOG.error("Error while reading the records from the topic {}", topicName, exp); } @@ -312,7 +268,7 @@ private Record getRecord(ConsumerRecord consumerRecord, in } } catch (Exception e){ LOG.error("Failed to parse JSON or AVRO record", e); - numberOfRecordsFailedToParse++; + topicMetrics.getNumberOfRecordsFailedToParse().increment(); } if (!plainTextMode) { if (!(value instanceof Map)) { @@ -360,7 +316,7 @@ private void iterateRecordPartitions(ConsumerRecords records, fin bufferAccumulator.add(record); break; } catch (SizeOverflowException e) { - numberOfBufferSizeOverflows++; + topicMetrics.getNumberOfBufferSizeOverflows().increment(); Thread.sleep(100); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java index 9d10b46611..7799327481 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java @@ -21,7 +21,7 @@ public class TopicPartitionCommitTracker { public TopicPartitionCommitTracker(final TopicPartition topicPartition, Long committedOffset) { this.topicPartition = topicPartition; - this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset : -1L; + this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset-1 : -1L; this.offsetMaxMap = new HashMap<>(); this.offsetMinMap = new HashMap<>(); this.offsetMaxMap.put(this.committedOffset, Range.between(this.committedOffset, this.committedOffset)); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 1e23b2be02..92cf2527f8 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -119,13 +119,13 @@ public void start(Buffer> buffer) { KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); - KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics, sourceConfig.getMetricsUpdateInterval().getSeconds()); + KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); Properties consumerProperties = getConsumerProperties(topic, authProperties); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { int numWorkers = topic.getWorkers(); executorService = Executors.newFixedThreadPool(numWorkers); - IntStream.range(0, numWorkers + 1).forEach(index -> { + IntStream.range(0, numWorkers).forEach(index -> { switch (schema) { case JSON: kafkaConsumer = new KafkaConsumer(consumerProperties); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index fe336c1d19..9cae81a4ce 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.kafka.util; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -16,99 +17,166 @@ import java.util.HashMap; public class KafkaTopicMetrics { - private long metricUpdateInterval; + static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements"; + static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements"; + static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; + static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors"; + static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; + static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; + static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; + static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted"; + static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed"; + static final String NUMBER_OF_BYTES_CONSUMED = "numberOfBytesConsumed"; + private final String topicName; private long updateTime; - private Map> consumerMetricsMap; - private Map camelCaseMap; + private Map metricsNameMap; + private Map> metricValues; private final PluginMetrics pluginMetrics; - - public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics, final long metricUpdateInterval) { + private final Counter numberOfPositiveAcknowledgements; + private final Counter numberOfNegativeAcknowledgements; + private final Counter numberOfRecordsFailedToParse; + private final Counter numberOfDeserializationErrors; + private final Counter numberOfBufferSizeOverflows; + private final Counter numberOfPollAuthErrors; + private final Counter numberOfRecordsCommitted; + private final Counter numberOfNonConsumers; + + public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { this.pluginMetrics = pluginMetrics; this.topicName = topicName; - this.consumerMetricsMap = new HashMap<>(); this.updateTime = Instant.now().getEpochSecond(); - this.metricUpdateInterval = metricUpdateInterval; - this.camelCaseMap = new HashMap<>(); - camelCaseMap.put("bytes-consumed-total", "bytesConsumedTotal"); - camelCaseMap.put("records-consumed-total", "recordsConsumedTotal"); - camelCaseMap.put("bytes-consumed-rate", "bytesConsumedRate"); - camelCaseMap.put("records-consumed-rate", "recordsConsumedRate"); - camelCaseMap.put("records-lag-max", "recordsLagMax"); - camelCaseMap.put("records-lead-min", "recordsLeadMin"); - camelCaseMap.put("commit-rate", "commitRate"); - camelCaseMap.put("join-rate", "joinRate"); - camelCaseMap.put("incoming-byte-rate", "incomingByteRate"); - camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate"); - camelCaseMap.put("assigned-partitions", "outgoingByteRate"); + this.metricValues = new HashMap<>(); + initializeMetricNamesMap(); + this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED)); + this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE)); + this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS)); + this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS)); + this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS)); + this.numberOfNonConsumers = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NON_CONSUMERS)); + this.numberOfPositiveAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS)); + this.numberOfNegativeAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS)); + } + + private void initializeMetricNamesMap() { + this.metricsNameMap = new HashMap<>(); + metricsNameMap.put("bytes-consumed-total", "bytesConsumedTotal"); + metricsNameMap.put("records-consumed-total", "recordsConsumedTotal"); + metricsNameMap.put("bytes-consumed-rate", "bytesConsumedRate"); + metricsNameMap.put("records-consumed-rate", "recordsConsumedRate"); + metricsNameMap.put("records-lag-max", "recordsLagMax"); + metricsNameMap.put("records-lead-min", "recordsLeadMin"); + metricsNameMap.put("commit-rate", "commitRate"); + metricsNameMap.put("join-rate", "joinRate"); + metricsNameMap.put("incoming-byte-rate", "incomingByteRate"); + metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate"); + metricsNameMap.forEach((metricName, camelCaseName) -> { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + synchronized(metricValues) { + if (metricName.equals("records-lag-max")) { + double max = 0.0; + for (Map.Entry> entry : metricValues.entrySet()) { + if (entry.getValue().get(metricName) > max) { + max = entry.getValue().get(metricName); + } + } + return max; + } else if (metricName.equals("records-lead-min")) { + double min = Double.MAX_VALUE; + for (Map.Entry> entry : metricValues.entrySet()) { + if (entry.getValue().get(metricName) < min) { + min = entry.getValue().get(metricName); + } + } + return min; + } else { + double sum = 0; + for (Map.Entry> entry : metricValues.entrySet()) { + sum += entry.getValue().get(metricName); + } + return sum; + } + } + }); + }); + } + + public void register(final KafkaConsumer consumer) { + metricValues.put(consumer, new HashMap<>()); + final Map consumerMetrics = metricValues.get(consumer); + metricsNameMap.forEach((k, name) -> { + consumerMetrics.put(k, 0.0); + }); + } + + public Counter getNumberOfRecordsCommitted() { + return numberOfRecordsCommitted; + } + + public Counter getNumberOfNonConsumers() { + return numberOfNonConsumers; + } + + public Counter getNumberOfPollAuthErrors() { + return numberOfPollAuthErrors; + } + + public Counter getNumberOfBufferSizeOverflows() { + return numberOfBufferSizeOverflows; + } + + public Counter getNumberOfDeserializationErrors() { + return numberOfDeserializationErrors; + } + + public Counter getNumberOfRecordsFailedToParse() { + return numberOfRecordsFailedToParse; } - public void register(KafkaConsumer consumer) { - this.consumerMetricsMap.put(consumer, new HashMap<>()); + public Counter getNumberOfNegativeAcknowledgements() { + return numberOfNegativeAcknowledgements; + } + + public Counter getNumberOfPositiveAcknowledgements() { + return numberOfPositiveAcknowledgements; + } + + public String getTopicMetricName(final String metricName) { + return "topic."+topicName+"."+metricName; } private String getCamelCaseName(final String name) { - String camelCaseName = camelCaseMap.get(name); + String camelCaseName = metricsNameMap.get(name); if (Objects.isNull(camelCaseName)) { return name; } return camelCaseName; } - public void setMetric(final KafkaConsumer consumer, final String metricName, Number metricValue) { - synchronized(consumerMetricsMap) { - Map cmetrics = consumerMetricsMap.get(consumer); - if (cmetrics != null) { - cmetrics.put(metricName, metricValue.doubleValue()); - } - } - } + public void update(final KafkaConsumer consumer) { + Map metrics = consumer.metrics(); - private void aggregateMetrics() { - synchronized (consumerMetricsMap) { - final Map aggregatedMetrics = new HashMap<>(); - consumerMetricsMap.forEach((c, metricsMap) -> { - Double value = 0.0; - metricsMap.forEach((metricName, metricValue) -> { - if (metricValue instanceof Double) { - aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0)); + synchronized(metricValues) { + Map consumerMetrics = metricValues.get(consumer); + for (Map.Entry entry : metrics.entrySet()) { + MetricName metric = entry.getKey(); + Metric value = entry.getValue(); + String metricName = metric.name(); + if (Objects.nonNull(metricsNameMap.get(metricName))) { + if (metric.tags().containsKey("partition") && + (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { + continue; } - }); - }); - aggregatedMetrics.forEach((name, value) -> { - pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); - }); - } - } - public void update(final KafkaConsumer consumer) { - Map cmetrics = null; - synchronized(consumerMetricsMap) { - cmetrics = consumerMetricsMap.get(consumer); - } - // This should not happen... - if (Objects.isNull(cmetrics)) { - return; - } - Map metrics = consumer.metrics(); - for (Map.Entry entry : metrics.entrySet()) { - MetricName metric = entry.getKey(); - Metric value = entry.getValue(); - String metricName = metric.name(); - String metricGroup = metric.group(); - if (Objects.nonNull(camelCaseMap.get(metricName))) { - if (metric.tags().containsKey("partition") && - (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { - continue; + if (metricName.contains("consumed-total") && !metric.tags().containsKey("topic")) { + continue; + } + if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) { + continue; + } + consumerMetrics.put(metricName, (Double)value.metricValue()); } - cmetrics.put(metricName, value.metricValue()); } } - - long curTime = Instant.now().getEpochSecond(); - if (curTime - updateTime > metricUpdateInterval) { - aggregateMetrics(); - updateTime = curTime; - } } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 1495a5bbcd..47080515d3 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -102,6 +102,10 @@ public void setUp() { topicMetrics = mock(KafkaTopicMetrics.class); counter = mock(Counter.class); topicConfig = mock(TopicConfig.class); + when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topicConfig.getAutoCommit()).thenReturn(false); @@ -245,6 +249,7 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int Thread.sleep(10000); } catch (Exception e){} + consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit(); Assertions.assertEquals(offsetsToCommit.size(), 0); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java index 387ffb909e..1f2e2ae243 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java @@ -37,7 +37,7 @@ public TopicPartitionCommitTracker createObjectUnderTest(String topic, int parti @ParameterizedTest @MethodSource("getInputOrder") public void test(List order) { - topicPartitionCommitTracker = createObjectUnderTest(testTopic, testPartition, -1L); + topicPartitionCommitTracker = createObjectUnderTest(testTopic, testPartition, 0L); List> ranges = new ArrayList<>(); for (int i = 0; i < 10; i++) { ranges.add(Range.between(i*10L, i*10L+9L)); From e6c8e14fffea45fd6b7b37914173a5fe65f49726 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 9 Aug 2023 16:21:34 +0000 Subject: [PATCH 08/11] Fixed to use counters for records/bytes consumed metrics Signed-off-by: Krishna Kondaka --- .../plugins/kafka/util/KafkaTopicMetrics.java | 63 ++++++++++++------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 9cae81a4ce..886cb5146a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -40,6 +40,8 @@ public class KafkaTopicMetrics { private final Counter numberOfBufferSizeOverflows; private final Counter numberOfPollAuthErrors; private final Counter numberOfRecordsCommitted; + private final Counter numberOfRecordsConsumed; + private final Counter numberOfBytesConsumed; private final Counter numberOfNonConsumers; public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { @@ -48,6 +50,8 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri this.updateTime = Instant.now().getEpochSecond(); this.metricValues = new HashMap<>(); initializeMetricNamesMap(); + this.numberOfRecordsConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_CONSUMED)); + this.numberOfBytesConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BYTES_CONSUMED)); this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED)); this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE)); this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS)); @@ -71,33 +75,35 @@ private void initializeMetricNamesMap() { metricsNameMap.put("incoming-byte-rate", "incomingByteRate"); metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate"); metricsNameMap.forEach((metricName, camelCaseName) -> { - pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { - synchronized(metricValues) { - if (metricName.equals("records-lag-max")) { - double max = 0.0; - for (Map.Entry> entry : metricValues.entrySet()) { - if (entry.getValue().get(metricName) > max) { - max = entry.getValue().get(metricName); + if (!metricName.contains("-total")) { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + synchronized(metricValues) { + if (metricName.equals("records-lag-max")) { + double max = 0.0; + for (Map.Entry> entry : metricValues.entrySet()) { + if (entry.getValue().get(metricName) > max) { + max = entry.getValue().get(metricName); + } } - } - return max; - } else if (metricName.equals("records-lead-min")) { - double min = Double.MAX_VALUE; - for (Map.Entry> entry : metricValues.entrySet()) { - if (entry.getValue().get(metricName) < min) { - min = entry.getValue().get(metricName); + return max; + } else if (metricName.equals("records-lead-min")) { + double min = Double.MAX_VALUE; + for (Map.Entry> entry : metricValues.entrySet()) { + if (entry.getValue().get(metricName) < min) { + min = entry.getValue().get(metricName); + } } + return min; + } else { + double sum = 0; + for (Map.Entry> entry : metricValues.entrySet()) { + sum += entry.getValue().get(metricName); + } + return sum; } - return min; - } else { - double sum = 0; - for (Map.Entry> entry : metricValues.entrySet()) { - sum += entry.getValue().get(metricName); - } - return sum; } - } - }); + }); + } }); } @@ -174,7 +180,16 @@ public void update(final KafkaConsumer consumer) { if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) { continue; } - consumerMetrics.put(metricName, (Double)value.metricValue()); + double newValue = (Double)value.metricValue(); + if (metricName.equals("records-consumed-total")) { + double prevValue = consumerMetrics.get(metricName); + numberOfRecordsConsumed.increment(newValue - prevValue); + } else if (metricName.equals("bytes-consumed-total")) { + double prevValue = consumerMetrics.get(metricName); + numberOfBytesConsumed.increment(newValue - prevValue); + } + + consumerMetrics.put(metricName, newValue); } } } From fec2a10ba894d13e6b77b7f75ef4dd55b0eb3cf3 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 9 Aug 2023 16:39:21 +0000 Subject: [PATCH 09/11] Removed unused code Signed-off-by: Krishna Kondaka --- .../plugins/kafka/configuration/KafkaSourceConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index b88f2051fb..922c8f0d08 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -37,7 +37,6 @@ public boolean getInsecure() { } public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30); - public static final Duration DEFAULT_METRICS_UPDATE_INTERVAL = Duration.ofSeconds(60); @JsonProperty("bootstrap_servers") private List bootStrapServers; From b9fc87f2d05d845fde95eda86d454337b847b03e Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 9 Aug 2023 18:26:34 +0000 Subject: [PATCH 10/11] Added a metric for keeping track of number of consumers without any partitions assigned Signed-off-by: Krishna Kondaka --- .../plugins/kafka/util/KafkaTopicMetrics.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 886cb5146a..81447489ac 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -23,7 +23,6 @@ public class KafkaTopicMetrics { static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors"; static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; - static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted"; static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed"; static final String NUMBER_OF_BYTES_CONSUMED = "numberOfBytesConsumed"; @@ -42,7 +41,6 @@ public class KafkaTopicMetrics { private final Counter numberOfRecordsCommitted; private final Counter numberOfRecordsConsumed; private final Counter numberOfBytesConsumed; - private final Counter numberOfNonConsumers; public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { this.pluginMetrics = pluginMetrics; @@ -57,7 +55,6 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS)); this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS)); this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS)); - this.numberOfNonConsumers = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NON_CONSUMERS)); this.numberOfPositiveAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS)); this.numberOfNegativeAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS)); } @@ -74,6 +71,7 @@ private void initializeMetricNamesMap() { metricsNameMap.put("join-rate", "joinRate"); metricsNameMap.put("incoming-byte-rate", "incomingByteRate"); metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate"); + metricsNameMap.put("assigned-partitions", "numberOfNonConsumers"); metricsNameMap.forEach((metricName, camelCaseName) -> { if (!metricName.contains("-total")) { pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { @@ -119,10 +117,6 @@ public Counter getNumberOfRecordsCommitted() { return numberOfRecordsCommitted; } - public Counter getNumberOfNonConsumers() { - return numberOfNonConsumers; - } - public Counter getNumberOfPollAuthErrors() { return numberOfPollAuthErrors; } @@ -188,8 +182,12 @@ public void update(final KafkaConsumer consumer) { double prevValue = consumerMetrics.get(metricName); numberOfBytesConsumed.increment(newValue - prevValue); } - - consumerMetrics.put(metricName, newValue); + // Keep the count of number of consumers without any assigned partitions. This value can go up or down. So, it is made as Guage metric + if (metricName.equals("assigned-partitions")) { + consumerMetrics.put(metricName, ((Double)value.metricValue() == 0.0) ? 1.0 : 0.0); + } else { + consumerMetrics.put(metricName, newValue); + } } } } From b2c540aa48a25f3510813661f230f899f5724895 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 10 Aug 2023 07:40:33 +0000 Subject: [PATCH 11/11] Added unit test for KafkaTopicMetrics Signed-off-by: Krishna Kondaka --- .../plugins/kafka/util/KafkaTopicMetrics.java | 124 +++++---- .../kafka/util/KafkaTopicMetricsTests.java | 250 ++++++++++++++++++ 2 files changed, 322 insertions(+), 52 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 81447489ac..df4b22a61f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -73,33 +73,38 @@ private void initializeMetricNamesMap() { metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate"); metricsNameMap.put("assigned-partitions", "numberOfNonConsumers"); metricsNameMap.forEach((metricName, camelCaseName) -> { - if (!metricName.contains("-total")) { + if (metricName.equals("records-lag-max")) { pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { - synchronized(metricValues) { - if (metricName.equals("records-lag-max")) { - double max = 0.0; - for (Map.Entry> entry : metricValues.entrySet()) { - if (entry.getValue().get(metricName) > max) { - max = entry.getValue().get(metricName); - } - } - return max; - } else if (metricName.equals("records-lead-min")) { - double min = Double.MAX_VALUE; - for (Map.Entry> entry : metricValues.entrySet()) { - if (entry.getValue().get(metricName) < min) { - min = entry.getValue().get(metricName); - } - } - return min; - } else { - double sum = 0; - for (Map.Entry> entry : metricValues.entrySet()) { - sum += entry.getValue().get(metricName); - } - return sum; + double max = 0.0; + for (Map.Entry> entry : metricValues.entrySet()) { + Map consumerMetrics = entry.getValue(); + synchronized(consumerMetrics) { + max = Math.max(max, consumerMetrics.get(metricName)); } } + return max; + }); + } else if (metricName.equals("records-lead-min")) { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + double min = Double.MAX_VALUE; + for (Map.Entry> entry : metricValues.entrySet()) { + Map consumerMetrics = entry.getValue(); + synchronized(consumerMetrics) { + min = Math.min(min, consumerMetrics.get(metricName)); + } + } + return min; + }); + } else if (!metricName.contains("-total")) { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + double sum = 0; + for (Map.Entry> entry : metricValues.entrySet()) { + Map consumerMetrics = entry.getValue(); + synchronized(consumerMetrics) { + sum += consumerMetrics.get(metricName); + } + } + return sum; }); } }); @@ -113,6 +118,14 @@ public void register(final KafkaConsumer consumer) { }); } + Counter getNumberOfRecordsConsumed() { + return numberOfRecordsConsumed; + } + + Counter getNumberOfBytesConsumed() { + return numberOfBytesConsumed; + } + public Counter getNumberOfRecordsCommitted() { return numberOfRecordsCommitted; } @@ -141,7 +154,7 @@ public Counter getNumberOfPositiveAcknowledgements() { return numberOfPositiveAcknowledgements; } - public String getTopicMetricName(final String metricName) { + private String getTopicMetricName(final String metricName) { return "topic."+topicName+"."+metricName; } @@ -153,41 +166,48 @@ private String getCamelCaseName(final String name) { return camelCaseName; } + Map> getMetricValues() { + return metricValues; + } + public void update(final KafkaConsumer consumer) { - Map metrics = consumer.metrics(); + Map consumerMetrics = metricValues.get(consumer); - synchronized(metricValues) { - Map consumerMetrics = metricValues.get(consumer); - for (Map.Entry entry : metrics.entrySet()) { - MetricName metric = entry.getKey(); - Metric value = entry.getValue(); - String metricName = metric.name(); - if (Objects.nonNull(metricsNameMap.get(metricName))) { - if (metric.tags().containsKey("partition") && - (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { - continue; - } + Map metrics = consumer.metrics(); + for (Map.Entry entry : metrics.entrySet()) { + MetricName metric = entry.getKey(); + Metric value = entry.getValue(); + String metricName = metric.name(); + if (Objects.nonNull(metricsNameMap.get(metricName))) { + if (metric.tags().containsKey("partition") && + (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { + continue; + } - if (metricName.contains("consumed-total") && !metric.tags().containsKey("topic")) { - continue; - } - if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) { - continue; - } - double newValue = (Double)value.metricValue(); - if (metricName.equals("records-consumed-total")) { + if (metricName.contains("consumed-total") && !metric.tags().containsKey("topic")) { + continue; + } + if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) { + continue; + } + double newValue = (Double)value.metricValue(); + if (metricName.equals("records-consumed-total")) { + synchronized(consumerMetrics) { double prevValue = consumerMetrics.get(metricName); numberOfRecordsConsumed.increment(newValue - prevValue); - } else if (metricName.equals("bytes-consumed-total")) { + } + } else if (metricName.equals("bytes-consumed-total")) { + synchronized(consumerMetrics) { double prevValue = consumerMetrics.get(metricName); numberOfBytesConsumed.increment(newValue - prevValue); } - // Keep the count of number of consumers without any assigned partitions. This value can go up or down. So, it is made as Guage metric - if (metricName.equals("assigned-partitions")) { - consumerMetrics.put(metricName, ((Double)value.metricValue() == 0.0) ? 1.0 : 0.0); - } else { - consumerMetrics.put(metricName, newValue); - } + } + // Keep the count of number of consumers without any assigned partitions. This value can go up or down. So, it is made as Guage metric + if (metricName.equals("assigned-partitions")) { + newValue = (newValue == 0.0) ? 1.0 : 0.0; + } + synchronized(consumerMetrics) { + consumerMetrics.put(metricName, newValue); } } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java new file mode 100644 index 0000000000..ea31c216db --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java @@ -0,0 +1,250 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Metric; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.commons.lang3.RandomStringUtils; + +import io.micrometer.core.instrument.Counter; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.util.Map; +import java.util.HashMap; +import java.util.Random; +import java.util.function.ToDoubleFunction; + +@ExtendWith(MockitoExtension.class) +public class KafkaTopicMetricsTests { + public final class KafkaTestMetric implements Metric { + private final Double value; + private final MetricName name; + + public KafkaTestMetric(final double value, final MetricName name) { + this.value = value; + this.name = name; + } + + @Override + public MetricName metricName() { + return name; + } + + @Override + public Object metricValue() { + return value; + } + } + + private String topicName; + + @Mock + private PluginMetrics pluginMetrics; + + private Map pluginMetricsMap; + + private Random random; + + private KafkaTopicMetrics topicMetrics; + + private double bytesConsumed; + private double recordsConsumed; + private double bytesConsumedRate; + private double recordsConsumedRate; + private double recordsLagMax; + private double recordsLeadMin; + private double commitRate; + private double joinRate; + private double incomingByteRate; + private double outgoingByteRate; + + @Mock + private Counter bytesConsumedCounter; + + @Mock + private Counter recordsConsumedCounter; + private double bytesConsumedCount; + private double recordsConsumedCount; + + @BeforeEach + void setUp() { + topicName = RandomStringUtils.randomAlphabetic(8); + bytesConsumed = 0.0; + recordsConsumed = 0.0; + bytesConsumedRate = 0.0; + recordsConsumedRate = 0.0; + recordsLagMax = 0.0; + recordsLeadMin = Double.MAX_VALUE; + commitRate = 0.0; + joinRate = 0.0; + incomingByteRate = 0.0; + outgoingByteRate = 0.0; + + bytesConsumedCount = 0.0; + recordsConsumedCount = 0.0; + + random = new Random(); + pluginMetrics = mock(PluginMetrics.class); + pluginMetricsMap = new HashMap<>(); + doAnswer((i) -> { + ToDoubleFunction f = (ToDoubleFunction)i.getArgument(2); + Object arg = (Object)i.getArgument(1); + String name = (String)i.getArgument(0); + pluginMetricsMap.put(name, f); + return f.applyAsDouble(arg); + }).when(pluginMetrics).gauge(any(String.class), any(Object.class), any()); + bytesConsumedCounter = mock(Counter.class); + recordsConsumedCounter = mock(Counter.class); + + doAnswer((i) -> { + String arg = (String)i.getArgument(0); + if (arg.contains("Bytes")) { + return bytesConsumedCounter; + } else { + return recordsConsumedCounter; + } + }).when(pluginMetrics).counter(any(String.class)); + doAnswer((i) -> { + bytesConsumedCount += (double)i.getArgument(0); + return null; + }).when(bytesConsumedCounter).increment(any(Double.class)); + doAnswer((i) -> { + recordsConsumedCount += (double)i.getArgument(0); + return null; + }).when(recordsConsumedCounter).increment(any(Double.class)); + } + + public KafkaTopicMetrics createObjectUnderTest() { + return new KafkaTopicMetrics(topicName, pluginMetrics); + } + + private KafkaTestMetric getMetric(final String name, final double value, Map tags) { + MetricName metricName = new MetricName(name, "group", "metric", tags); + return new KafkaTestMetric(value, metricName); + } + + + private void populateKafkaMetrics(Map metrics, double numAssignedPartitions) { + Integer tmpBytesConsumed = random.nextInt() % 100 + 1; + if (tmpBytesConsumed < 0) { + tmpBytesConsumed = -tmpBytesConsumed; + } + bytesConsumed += tmpBytesConsumed; + Integer tmpRecordsConsumed = random.nextInt() % 10 + 1; + if (tmpRecordsConsumed < 0) { + tmpRecordsConsumed = -tmpRecordsConsumed; + } + recordsConsumed += tmpRecordsConsumed; + + double tmpBytesConsumedRate = random.nextDouble()*100; + bytesConsumedRate += tmpBytesConsumedRate; + + double tmpRecordsConsumedRate = random.nextDouble()*10; + recordsConsumedRate += tmpRecordsConsumedRate; + + double tmpRecordsLagMax = random.nextDouble()*2; + recordsLagMax = Math.max(recordsLagMax, tmpRecordsLagMax); + + double tmpRecordsLeadMin = random.nextDouble()*3; + recordsLeadMin = Math.min(recordsLeadMin, tmpRecordsLeadMin); + + double tmpCommitRate = random.nextDouble(); + commitRate += tmpCommitRate; + + double tmpJoinRate = random.nextDouble(); + joinRate += tmpJoinRate; + + double tmpIncomingByteRate = random.nextDouble(); + incomingByteRate += tmpIncomingByteRate; + + double tmpOutgoingByteRate = random.nextDouble(); + outgoingByteRate += tmpOutgoingByteRate; + + Map metricsMap = new HashMap<>(); + metricsMap.put("bytes-consumed-total", (double)tmpBytesConsumed); + metricsMap.put("records-consumed-total", (double)tmpRecordsConsumed); + metricsMap.put("bytes-consumed-rate", tmpBytesConsumedRate); + metricsMap.put("records-consumed-rate", tmpRecordsConsumedRate); + metricsMap.put("records-lag-max", tmpRecordsLagMax); + metricsMap.put("records-lead-min", tmpRecordsLeadMin); + metricsMap.put("commit-rate", tmpCommitRate); + metricsMap.put("join-rate", tmpJoinRate); + metricsMap.put("incoming-byte-rate", tmpIncomingByteRate); + metricsMap.put("outgoing-byte-rate", tmpOutgoingByteRate); + metricsMap.put("assigned-partitions", numAssignedPartitions); + + metricsMap.forEach((name, value) -> { + Map tags = new HashMap<>(); + if (name.contains("-total")) { + tags.put("topic", topicName); + } + KafkaTestMetric metric = getMetric(name, value, tags); + metrics.put(metric.metricName(), metric); + }); + } + + @ParameterizedTest + @ValueSource(ints = {1, 5, 10}) + //@ValueSource(ints = {2}) + public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) { + topicMetrics = createObjectUnderTest(); + for (int i = 0; i < numConsumers; i++) { + KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class); + topicMetrics.register(kafkaConsumer); + Map metrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(metrics); + populateKafkaMetrics(metrics, (i %2 == 1) ? 0.0 : 1.0); + topicMetrics.update(kafkaConsumer); + } + when(recordsConsumedCounter.count()).thenReturn(recordsConsumedCount); + when(bytesConsumedCounter.count()).thenReturn(bytesConsumedCount); + assertThat(topicMetrics.getNumberOfRecordsConsumed().count(), equalTo(recordsConsumed)); + assertThat(topicMetrics.getNumberOfBytesConsumed().count(), equalTo(bytesConsumed)); + pluginMetricsMap.forEach((k, v) -> { + double result = v.applyAsDouble(topicMetrics.getMetricValues()); + if (k.contains("bytesConsumedRate")) { + assertEquals(result, bytesConsumedRate, 0.01d); + } else if (k.contains("recordsConsumedRate")) { + assertEquals(result, recordsConsumedRate, 0.01d); + } else if (k.contains("recordsLagMax")) { + assertEquals(result, recordsLagMax, 0.01d); + } else if (k.contains("recordsLeadMin")) { + assertEquals(result, recordsLeadMin, 0.01d); + } else if (k.contains("commitRate")) { + assertEquals(result, commitRate, 0.01d); + } else if (k.contains("joinRate")) { + assertEquals(result, joinRate, 0.01d); + } else if (k.contains("incomingByteRate")) { + assertEquals(result, incomingByteRate, 0.01d); + } else if (k.contains("outgoingByteRate")) { + assertEquals(result, outgoingByteRate, 0.01d); + } else if (k.contains("numberOfNonConsumers")) { + int expectedValue = numConsumers/2; + assertThat(result, equalTo((double)expectedValue)); + } else { + assertThat(result, equalTo(k+": Unknown Metric")); + } + }); + + } + +}