diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 584ee24eb5..dff3d2b943 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -119,6 +119,7 @@ public void setup() { when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); when(jsonTopic.getWorkers()).thenReturn(1); + when(jsonTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(jsonTopic.getAutoCommit()).thenReturn(false); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index 6d52bba0ea..6179ba4f57 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -130,6 +130,7 @@ public void setup() { when(plainTextTopic.getName()).thenReturn(testTopic); when(plainTextTopic.getGroupId()).thenReturn(testGroup); when(plainTextTopic.getWorkers()).thenReturn(1); + when(plainTextTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(plainTextTopic.getAutoCommit()).thenReturn(false); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index e1a071ffcb..d083b4e98b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; -import io.micrometer.core.instrument.Counter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -19,11 +18,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.avro.generic.GenericRecord; import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -34,17 +33,20 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Map; -import java.util.HashMap; +import java.time.Instant; import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import org.apache.commons.lang3.Range; @@ -55,8 +57,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; - static final String POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME = "positiveAcknowledgementSetCounter"; - static final String NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME = "negativeAcknowledgementSetCounter"; static final String DEFAULT_KEY = "message"; private volatile long lastCommitTime; @@ -64,7 +64,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private AtomicBoolean shutdownInProgress; private final String topicName; private final TopicConfig topicConfig; - private PluginMetrics pluginMetrics= null; private MessageFormat schema; private final BufferAccumulator> bufferAccumulator; private final Buffer> buffer; @@ -74,10 +73,11 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private Set partitionsToReset; private final AcknowledgementSetManager acknowledgementSetManager; private final Map partitionCommitTrackerMap; - private final Counter positiveAcknowledgementSetCounter; - private final Counter negativeAcknowledgementSetCounter; + private List>> acknowledgedOffsets; private final boolean acknowledgementsEnabled; private final Duration acknowledgementsTimeout; + private final KafkaTopicMetrics topicMetrics; + private long metricsUpdatedTime; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -86,29 +86,33 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final TopicConfig topicConfig, final String schemaType, final AcknowledgementSetManager acknowledgementSetManager, - final PluginMetrics pluginMetrics) { + KafkaTopicMetrics topicMetrics) { this.topicName = topicConfig.getName(); this.topicConfig = topicConfig; this.shutdownInProgress = shutdownInProgress; this.consumer = consumer; this.buffer = buffer; + this.topicMetrics = topicMetrics; + this.topicMetrics.register(consumer); this.offsetsToCommit = new HashMap<>(); + this.metricsUpdatedTime = Instant.now().getEpochSecond(); + this.acknowledgedOffsets = new ArrayList<>(); this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); // If the timeout value is different from default value, then enable acknowledgements automatically. this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; this.acknowledgementSetManager = acknowledgementSetManager; - this.pluginMetrics = pluginMetrics; this.partitionCommitTrackerMap = new HashMap<>(); - this.partitionsToReset = new HashSet<>(); + this.partitionsToReset = Collections.synchronizedSet(new HashSet<>()); this.schema = MessageFormat.getByMessageFormatByName(schemaType); Duration bufferTimeout = Duration.ofSeconds(1); this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); this.lastCommitTime = System.currentTimeMillis(); - this.positiveAcknowledgementSetCounter = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME); - this.negativeAcknowledgementSetCounter = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME); } - public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) { + public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range offsetRange) { + long min = offsetRange.getMinimum(); + long max = offsetRange.getMaximum(); + topicMetrics.getNumberOfRecordsCommitted().increment(max - min + 1); if (Objects.isNull(offsetAndMetadata)) { return; } @@ -121,37 +125,20 @@ private AcknowledgementSet createAcknowledgementSet(Map { if (result == true) { - positiveAcknowledgementSetCounter.increment(); - offsets.forEach((partition, offsetRange) -> { - try { - int partitionId = partition.partition(); - if (!partitionCommitTrackerMap.containsKey(partitionId)) { - OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); - Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; - partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); - } - OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); - updateOffsetsToCommit(partition, offsetAndMetadata); - } catch (Exception e) { - LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e); - } - }); + topicMetrics.getNumberOfPositiveAcknowledgements().increment(); + synchronized(acknowledgedOffsets) { + acknowledgedOffsets.add(offsets); + } } else { - negativeAcknowledgementSetCounter.increment(); + topicMetrics.getNumberOfNegativeAcknowledgements().increment(); offsets.forEach((partition, offsetRange) -> { - synchronized(partitionsToReset) { - partitionsToReset.add(partition); - } + partitionsToReset.add(partition); }); } }, acknowledgementsTimeout); return acknowledgementSet; } - double getPositiveAcknowledgementsCount() { - return positiveAcknowledgementSetCounter.count(); - } - public void consumeRecords() throws Exception { try { ConsumerRecords records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); @@ -164,23 +151,25 @@ public void consumeRecords() throws Exception { iterateRecordPartitions(records, acknowledgementSet, offsets); if (!acknowledgementsEnabled) { offsets.forEach((partition, offsetRange) -> - updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1))); + updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange)); } else { acknowledgementSet.complete(); } } } catch (AuthenticationException e) { - LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e); + LOG.warn("Authentication error while doing poll(). Will retry after 10 seconds", e); + topicMetrics.getNumberOfPollAuthErrors().increment(); Thread.sleep(10000); } catch (RecordDeserializationException e) { - LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record", + LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record", e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); + topicMetrics.getNumberOfDeserializationErrors().increment(); consumer.seek(e.topicPartition(), e.offset()+1); } } - private void resetOrCommitOffsets() { - synchronized(partitionsToReset) { + private void resetOffsets() { + if (partitionsToReset.size() > 0) { partitionsToReset.forEach(partition -> { try { final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); @@ -191,9 +180,35 @@ private void resetOrCommitOffsets() { }); partitionsToReset.clear(); } + } + + void processAcknowledgedOffsets() { + synchronized(acknowledgedOffsets) { + acknowledgedOffsets.forEach(offsets -> { + offsets.forEach((partition, offsetRange) -> { + try { + int partitionId = partition.partition(); + if (!partitionCommitTrackerMap.containsKey(partitionId)) { + OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); + Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; + partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); + } + OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); + updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange); + } catch (Exception e) { + LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e); + } + }); + }); + acknowledgedOffsets.clear(); + } + } + + private void commitOffsets() { if (topicConfig.getAutoCommit()) { return; } + processAcknowledgedOffsets(); long currentTimeMillis = System.currentTimeMillis(); if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { return; @@ -221,10 +236,12 @@ public void run() { consumer.subscribe(Arrays.asList(topicName)); while (!shutdownInProgress.get()) { try { - resetOrCommitOffsets(); + resetOffsets(); + commitOffsets(); consumeRecords(); + topicMetrics.update(consumer); } catch (Exception exp) { - LOG.error("Error while reading the records from the topic...", exp); + LOG.error("Error while reading the records from the topic {}", topicName, exp); } } } @@ -251,6 +268,7 @@ private Record getRecord(ConsumerRecord consumerRecord, in } } catch (Exception e){ LOG.error("Failed to parse JSON or AVRO record", e); + topicMetrics.getNumberOfRecordsFailedToParse().increment(); } if (!plainTextMode) { if (!(value instanceof Map)) { @@ -293,7 +311,15 @@ private void iterateRecordPartitions(ConsumerRecords records, fin if (acknowledgementSet != null) { acknowledgementSet.add(record.getData()); } - bufferAccumulator.add(record); + while (true) { + try { + bufferAccumulator.add(record); + break; + } catch (SizeOverflowException e) { + topicMetrics.getNumberOfBufferSizeOverflows().increment(); + Thread.sleep(100); + } + } } } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java index 9d10b46611..7799327481 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java @@ -21,7 +21,7 @@ public class TopicPartitionCommitTracker { public TopicPartitionCommitTracker(final TopicPartition topicPartition, Long committedOffset) { this.topicPartition = topicPartition; - this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset : -1L; + this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset-1 : -1L; this.offsetMaxMap = new HashMap<>(); this.offsetMinMap = new HashMap<>(); this.offsetMaxMap.put(this.committedOffset, Range.between(this.committedOffset, this.committedOffset)); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 21af54160d..92cf2527f8 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -45,6 +45,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; @@ -118,12 +119,13 @@ public void start(Buffer> buffer) { KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); + KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); Properties consumerProperties = getConsumerProperties(topic, authProperties); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { int numWorkers = topic.getWorkers(); executorService = Executors.newFixedThreadPool(numWorkers); - IntStream.range(0, numWorkers + 1).forEach(index -> { + IntStream.range(0, numWorkers).forEach(index -> { switch (schema) { case JSON: kafkaConsumer = new KafkaConsumer(consumerProperties); @@ -136,7 +138,7 @@ public void start(Buffer> buffer) { kafkaConsumer = new KafkaConsumer(consumerProperties); break; } - consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, pluginMetrics); + consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); executorService.submit(consumer); }); } catch (Exception e) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index 245056e06e..4a6aaf30da 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -212,7 +212,7 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth try { result = kafkaClient.getBootstrapBrokers(request); } catch (KafkaException | StsException e) { - LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e); + LOG.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e); try { Thread.sleep(10000); } catch (InterruptedException exp) {} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java new file mode 100644 index 0000000000..df4b22a61f --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -0,0 +1,215 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Instant; +import java.util.Objects; +import java.util.Map; +import java.util.HashMap; + +public class KafkaTopicMetrics { + static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements"; + static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements"; + static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; + static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors"; + static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; + static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; + static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted"; + static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed"; + static final String NUMBER_OF_BYTES_CONSUMED = "numberOfBytesConsumed"; + + private final String topicName; + private long updateTime; + private Map metricsNameMap; + private Map> metricValues; + private final PluginMetrics pluginMetrics; + private final Counter numberOfPositiveAcknowledgements; + private final Counter numberOfNegativeAcknowledgements; + private final Counter numberOfRecordsFailedToParse; + private final Counter numberOfDeserializationErrors; + private final Counter numberOfBufferSizeOverflows; + private final Counter numberOfPollAuthErrors; + private final Counter numberOfRecordsCommitted; + private final Counter numberOfRecordsConsumed; + private final Counter numberOfBytesConsumed; + + public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { + this.pluginMetrics = pluginMetrics; + this.topicName = topicName; + this.updateTime = Instant.now().getEpochSecond(); + this.metricValues = new HashMap<>(); + initializeMetricNamesMap(); + this.numberOfRecordsConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_CONSUMED)); + this.numberOfBytesConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BYTES_CONSUMED)); + this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED)); + this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE)); + this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS)); + this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS)); + this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS)); + this.numberOfPositiveAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS)); + this.numberOfNegativeAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS)); + } + + private void initializeMetricNamesMap() { + this.metricsNameMap = new HashMap<>(); + metricsNameMap.put("bytes-consumed-total", "bytesConsumedTotal"); + metricsNameMap.put("records-consumed-total", "recordsConsumedTotal"); + metricsNameMap.put("bytes-consumed-rate", "bytesConsumedRate"); + metricsNameMap.put("records-consumed-rate", "recordsConsumedRate"); + metricsNameMap.put("records-lag-max", "recordsLagMax"); + metricsNameMap.put("records-lead-min", "recordsLeadMin"); + metricsNameMap.put("commit-rate", "commitRate"); + metricsNameMap.put("join-rate", "joinRate"); + metricsNameMap.put("incoming-byte-rate", "incomingByteRate"); + metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate"); + metricsNameMap.put("assigned-partitions", "numberOfNonConsumers"); + metricsNameMap.forEach((metricName, camelCaseName) -> { + if (metricName.equals("records-lag-max")) { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + double max = 0.0; + for (Map.Entry> entry : metricValues.entrySet()) { + Map consumerMetrics = entry.getValue(); + synchronized(consumerMetrics) { + max = Math.max(max, consumerMetrics.get(metricName)); + } + } + return max; + }); + } else if (metricName.equals("records-lead-min")) { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + double min = Double.MAX_VALUE; + for (Map.Entry> entry : metricValues.entrySet()) { + Map consumerMetrics = entry.getValue(); + synchronized(consumerMetrics) { + min = Math.min(min, consumerMetrics.get(metricName)); + } + } + return min; + }); + } else if (!metricName.contains("-total")) { + pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> { + double sum = 0; + for (Map.Entry> entry : metricValues.entrySet()) { + Map consumerMetrics = entry.getValue(); + synchronized(consumerMetrics) { + sum += consumerMetrics.get(metricName); + } + } + return sum; + }); + } + }); + } + + public void register(final KafkaConsumer consumer) { + metricValues.put(consumer, new HashMap<>()); + final Map consumerMetrics = metricValues.get(consumer); + metricsNameMap.forEach((k, name) -> { + consumerMetrics.put(k, 0.0); + }); + } + + Counter getNumberOfRecordsConsumed() { + return numberOfRecordsConsumed; + } + + Counter getNumberOfBytesConsumed() { + return numberOfBytesConsumed; + } + + public Counter getNumberOfRecordsCommitted() { + return numberOfRecordsCommitted; + } + + public Counter getNumberOfPollAuthErrors() { + return numberOfPollAuthErrors; + } + + public Counter getNumberOfBufferSizeOverflows() { + return numberOfBufferSizeOverflows; + } + + public Counter getNumberOfDeserializationErrors() { + return numberOfDeserializationErrors; + } + + public Counter getNumberOfRecordsFailedToParse() { + return numberOfRecordsFailedToParse; + } + + public Counter getNumberOfNegativeAcknowledgements() { + return numberOfNegativeAcknowledgements; + } + + public Counter getNumberOfPositiveAcknowledgements() { + return numberOfPositiveAcknowledgements; + } + + private String getTopicMetricName(final String metricName) { + return "topic."+topicName+"."+metricName; + } + + private String getCamelCaseName(final String name) { + String camelCaseName = metricsNameMap.get(name); + if (Objects.isNull(camelCaseName)) { + return name; + } + return camelCaseName; + } + + Map> getMetricValues() { + return metricValues; + } + + public void update(final KafkaConsumer consumer) { + Map consumerMetrics = metricValues.get(consumer); + + Map metrics = consumer.metrics(); + for (Map.Entry entry : metrics.entrySet()) { + MetricName metric = entry.getKey(); + Metric value = entry.getValue(); + String metricName = metric.name(); + if (Objects.nonNull(metricsNameMap.get(metricName))) { + if (metric.tags().containsKey("partition") && + (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { + continue; + } + + if (metricName.contains("consumed-total") && !metric.tags().containsKey("topic")) { + continue; + } + if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) { + continue; + } + double newValue = (Double)value.metricValue(); + if (metricName.equals("records-consumed-total")) { + synchronized(consumerMetrics) { + double prevValue = consumerMetrics.get(metricName); + numberOfRecordsConsumed.increment(newValue - prevValue); + } + } else if (metricName.equals("bytes-consumed-total")) { + synchronized(consumerMetrics) { + double prevValue = consumerMetrics.get(metricName); + numberOfBytesConsumed.increment(newValue - prevValue); + } + } + // Keep the count of number of consumers without any assigned partitions. This value can go up or down. So, it is made as Guage metric + if (metricName.equals("assigned-partitions")) { + newValue = (newValue == 0.0) ? 1.0 : 0.0; + } + synchronized(consumerMetrics) { + consumerMetrics.put(metricName, newValue); + } + } + } + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index da84536a82..47080515d3 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -23,6 +22,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import io.micrometer.core.instrument.Counter; @@ -32,7 +32,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.any; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.MatcherAssert.assertThat; @@ -77,7 +76,7 @@ public class KafkaSourceCustomConsumerTest { private TopicConfig topicConfig; @Mock - private PluginMetrics pluginMetrics; + private KafkaTopicMetrics topicMetrics; private KafkaSourceCustomConsumer consumer; @@ -100,15 +99,18 @@ public class KafkaSourceCustomConsumerTest { @BeforeEach public void setUp() { kafkaConsumer = mock(KafkaConsumer.class); - pluginMetrics = mock(PluginMetrics.class); + topicMetrics = mock(KafkaTopicMetrics.class); counter = mock(Counter.class); topicConfig = mock(TopicConfig.class); + when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topicConfig.getAutoCommit()).thenReturn(false); when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null); - when(pluginMetrics.counter(anyString())).thenReturn(counter); doAnswer((i)-> {return null;}).when(counter).increment(); callbackExecutor = Executors.newFixedThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000)); @@ -122,7 +124,7 @@ public void setUp() { public KafkaSourceCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled); when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); - return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, pluginMetrics); + return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics); } private BlockingBuffer> getBuffer() { @@ -203,6 +205,7 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted Thread.sleep(10000); } catch (Exception e){} + consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit(); Assertions.assertEquals(offsetsToCommit.size(), 1); offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> { @@ -246,6 +249,7 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int Thread.sleep(10000); } catch (Exception e){} + consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit(); Assertions.assertEquals(offsetsToCommit.size(), 0); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java index 387ffb909e..1f2e2ae243 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTrackerTest.java @@ -37,7 +37,7 @@ public TopicPartitionCommitTracker createObjectUnderTest(String topic, int parti @ParameterizedTest @MethodSource("getInputOrder") public void test(List order) { - topicPartitionCommitTracker = createObjectUnderTest(testTopic, testPartition, -1L); + topicPartitionCommitTracker = createObjectUnderTest(testTopic, testPartition, 0L); List> ranges = new ArrayList<>(); for (int i = 0; i < 10; i++) { ranges.add(Range.between(i*10L, i*10L+9L)); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java new file mode 100644 index 0000000000..ea31c216db --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetricsTests.java @@ -0,0 +1,250 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Metric; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.commons.lang3.RandomStringUtils; + +import io.micrometer.core.instrument.Counter; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.util.Map; +import java.util.HashMap; +import java.util.Random; +import java.util.function.ToDoubleFunction; + +@ExtendWith(MockitoExtension.class) +public class KafkaTopicMetricsTests { + public final class KafkaTestMetric implements Metric { + private final Double value; + private final MetricName name; + + public KafkaTestMetric(final double value, final MetricName name) { + this.value = value; + this.name = name; + } + + @Override + public MetricName metricName() { + return name; + } + + @Override + public Object metricValue() { + return value; + } + } + + private String topicName; + + @Mock + private PluginMetrics pluginMetrics; + + private Map pluginMetricsMap; + + private Random random; + + private KafkaTopicMetrics topicMetrics; + + private double bytesConsumed; + private double recordsConsumed; + private double bytesConsumedRate; + private double recordsConsumedRate; + private double recordsLagMax; + private double recordsLeadMin; + private double commitRate; + private double joinRate; + private double incomingByteRate; + private double outgoingByteRate; + + @Mock + private Counter bytesConsumedCounter; + + @Mock + private Counter recordsConsumedCounter; + private double bytesConsumedCount; + private double recordsConsumedCount; + + @BeforeEach + void setUp() { + topicName = RandomStringUtils.randomAlphabetic(8); + bytesConsumed = 0.0; + recordsConsumed = 0.0; + bytesConsumedRate = 0.0; + recordsConsumedRate = 0.0; + recordsLagMax = 0.0; + recordsLeadMin = Double.MAX_VALUE; + commitRate = 0.0; + joinRate = 0.0; + incomingByteRate = 0.0; + outgoingByteRate = 0.0; + + bytesConsumedCount = 0.0; + recordsConsumedCount = 0.0; + + random = new Random(); + pluginMetrics = mock(PluginMetrics.class); + pluginMetricsMap = new HashMap<>(); + doAnswer((i) -> { + ToDoubleFunction f = (ToDoubleFunction)i.getArgument(2); + Object arg = (Object)i.getArgument(1); + String name = (String)i.getArgument(0); + pluginMetricsMap.put(name, f); + return f.applyAsDouble(arg); + }).when(pluginMetrics).gauge(any(String.class), any(Object.class), any()); + bytesConsumedCounter = mock(Counter.class); + recordsConsumedCounter = mock(Counter.class); + + doAnswer((i) -> { + String arg = (String)i.getArgument(0); + if (arg.contains("Bytes")) { + return bytesConsumedCounter; + } else { + return recordsConsumedCounter; + } + }).when(pluginMetrics).counter(any(String.class)); + doAnswer((i) -> { + bytesConsumedCount += (double)i.getArgument(0); + return null; + }).when(bytesConsumedCounter).increment(any(Double.class)); + doAnswer((i) -> { + recordsConsumedCount += (double)i.getArgument(0); + return null; + }).when(recordsConsumedCounter).increment(any(Double.class)); + } + + public KafkaTopicMetrics createObjectUnderTest() { + return new KafkaTopicMetrics(topicName, pluginMetrics); + } + + private KafkaTestMetric getMetric(final String name, final double value, Map tags) { + MetricName metricName = new MetricName(name, "group", "metric", tags); + return new KafkaTestMetric(value, metricName); + } + + + private void populateKafkaMetrics(Map metrics, double numAssignedPartitions) { + Integer tmpBytesConsumed = random.nextInt() % 100 + 1; + if (tmpBytesConsumed < 0) { + tmpBytesConsumed = -tmpBytesConsumed; + } + bytesConsumed += tmpBytesConsumed; + Integer tmpRecordsConsumed = random.nextInt() % 10 + 1; + if (tmpRecordsConsumed < 0) { + tmpRecordsConsumed = -tmpRecordsConsumed; + } + recordsConsumed += tmpRecordsConsumed; + + double tmpBytesConsumedRate = random.nextDouble()*100; + bytesConsumedRate += tmpBytesConsumedRate; + + double tmpRecordsConsumedRate = random.nextDouble()*10; + recordsConsumedRate += tmpRecordsConsumedRate; + + double tmpRecordsLagMax = random.nextDouble()*2; + recordsLagMax = Math.max(recordsLagMax, tmpRecordsLagMax); + + double tmpRecordsLeadMin = random.nextDouble()*3; + recordsLeadMin = Math.min(recordsLeadMin, tmpRecordsLeadMin); + + double tmpCommitRate = random.nextDouble(); + commitRate += tmpCommitRate; + + double tmpJoinRate = random.nextDouble(); + joinRate += tmpJoinRate; + + double tmpIncomingByteRate = random.nextDouble(); + incomingByteRate += tmpIncomingByteRate; + + double tmpOutgoingByteRate = random.nextDouble(); + outgoingByteRate += tmpOutgoingByteRate; + + Map metricsMap = new HashMap<>(); + metricsMap.put("bytes-consumed-total", (double)tmpBytesConsumed); + metricsMap.put("records-consumed-total", (double)tmpRecordsConsumed); + metricsMap.put("bytes-consumed-rate", tmpBytesConsumedRate); + metricsMap.put("records-consumed-rate", tmpRecordsConsumedRate); + metricsMap.put("records-lag-max", tmpRecordsLagMax); + metricsMap.put("records-lead-min", tmpRecordsLeadMin); + metricsMap.put("commit-rate", tmpCommitRate); + metricsMap.put("join-rate", tmpJoinRate); + metricsMap.put("incoming-byte-rate", tmpIncomingByteRate); + metricsMap.put("outgoing-byte-rate", tmpOutgoingByteRate); + metricsMap.put("assigned-partitions", numAssignedPartitions); + + metricsMap.forEach((name, value) -> { + Map tags = new HashMap<>(); + if (name.contains("-total")) { + tags.put("topic", topicName); + } + KafkaTestMetric metric = getMetric(name, value, tags); + metrics.put(metric.metricName(), metric); + }); + } + + @ParameterizedTest + @ValueSource(ints = {1, 5, 10}) + //@ValueSource(ints = {2}) + public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) { + topicMetrics = createObjectUnderTest(); + for (int i = 0; i < numConsumers; i++) { + KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class); + topicMetrics.register(kafkaConsumer); + Map metrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(metrics); + populateKafkaMetrics(metrics, (i %2 == 1) ? 0.0 : 1.0); + topicMetrics.update(kafkaConsumer); + } + when(recordsConsumedCounter.count()).thenReturn(recordsConsumedCount); + when(bytesConsumedCounter.count()).thenReturn(bytesConsumedCount); + assertThat(topicMetrics.getNumberOfRecordsConsumed().count(), equalTo(recordsConsumed)); + assertThat(topicMetrics.getNumberOfBytesConsumed().count(), equalTo(bytesConsumed)); + pluginMetricsMap.forEach((k, v) -> { + double result = v.applyAsDouble(topicMetrics.getMetricValues()); + if (k.contains("bytesConsumedRate")) { + assertEquals(result, bytesConsumedRate, 0.01d); + } else if (k.contains("recordsConsumedRate")) { + assertEquals(result, recordsConsumedRate, 0.01d); + } else if (k.contains("recordsLagMax")) { + assertEquals(result, recordsLagMax, 0.01d); + } else if (k.contains("recordsLeadMin")) { + assertEquals(result, recordsLeadMin, 0.01d); + } else if (k.contains("commitRate")) { + assertEquals(result, commitRate, 0.01d); + } else if (k.contains("joinRate")) { + assertEquals(result, joinRate, 0.01d); + } else if (k.contains("incomingByteRate")) { + assertEquals(result, incomingByteRate, 0.01d); + } else if (k.contains("outgoingByteRate")) { + assertEquals(result, outgoingByteRate, 0.01d); + } else if (k.contains("numberOfNonConsumers")) { + int expectedValue = numConsumers/2; + assertThat(result, equalTo((double)expectedValue)); + } else { + assertThat(result, equalTo(k+": Unknown Metric")); + } + }); + + } + +}